time to test!

This commit is contained in:
brent s. 2020-05-10 03:48:50 -04:00
parent 84759cc0bc
commit 149064865f
Signed by: bts
GPG Key ID: 8C004C2F93481F6B
2 changed files with 181 additions and 0 deletions

View File

@ -0,0 +1,18 @@
# This is a sample INI file to use with he_ipv6.py.
# If you do not yet have an IPv6 Tunnelbroker.net allocation, you can get one (for free!) at:
# https://www.tunnelbroker.net/tunnel_detail.php?tid=584532
##
# This is the tunnel ID. To get it, log into your tunnelbroker.net account and click on the tunnel you wish to use.
# The tunnel ID is the numerical string in the URL. e.g.:
# https://www.tunnelbroker.net/tunnel_detail.php?tid=12345
# The tunnel ID would be 12345 in the above example.
[12345]
# This is your tunnelbroker.net username.
user = ipv6user
# This is your update key for the above user. You can find it in the "Advanced" tab.
update_key = xXxXxXxXxXxXxXXX
# This is the server IP ("IPv6 Tunnel Endpoints" section). *Be sure to use the IPv4 address!* ("Server IPv4 Address")
server = 192.0.2.1
# And these are all the allocations you wish to add to this machine. Be sure to add the prefix (e.g. /64, /48)!
# You can specify multiple allocations with a comma-separated list.
allocations = 2001:DB8:1::/64,2001:DB8:2::/64

163
utils/he_ipv6.py Executable file
View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3

import argparse
import configparser
import ipaddress
import os
import socket
##
import requests
import requests.auth
from pyroute2 import IPDB
from pyroute2 import IPRoute


# https://wiki.archlinux.org/index.php/IPv6_tunnel_broker_setup
# https://forums.he.net/index.php?topic=3153.0
# https://gist.github.com/pklaus/960672


class TunnelBroker(object):
ipget_url = 'https://ipv4.clientinfo.square-r00t.net/'
ipget_params = {'raw': '1'}
api_base = 'https://ipv4.tunnelbroker.net/nic/update'

def __init__(self, conf, tun_id = None, wan_ip = True, update = True, *args, **kwargs):
self.my_ip = None
self.user = None
self.update_key = None
self.ipr = None
self.iface_idx = None
self.wan = wan_ip
self.force_update = update
self.conf_file = os.path.abspath(os.path.expanduser(conf))
self._conf = configparser.ConfigParser(allow_no_value = True,
interpolation = configparser.ExtendedInterpolation())
self._conf.read(self.conf_file)
if len(self._conf.sections()) < 1:
raise RuntimeError('Config file has no sections/tunnels defined')
self.tun_id = tun_id
if self.tun_id and self.tun_id not in self._conf.sections():
raise ValueError('tun_id not a valid tunnel ID')
elif not self.tun_id:
self.tun_id = self._conf.sections()[0]
self.cfg = self._conf[self.tun_id]
self.server = ipaddress.ip_address(self.cfg['server'])
self.addrs = [ipaddress.ip_network(ip.strip()) for ip in self.cfg['allocations'].split(',')]
for k in ('user', 'update_key'):
setattr(self, k, self.cfg[k])

def _get_my_ip(self):
if self.wan:
req = requests.get(self.ipget_url, params = self.ipget_params)
if not req.ok:
raise RuntimeError('Could not fetch self IP')
self.my_ip = ipaddress.ip_address(req.json()['ip'])
else:
if not self.ipr:
self.ipr = IPRoute()
_defrt = self.ipr.get_default_routes(family = socket.AF_INET)
if len(_defrt) != 1: # This (probably) WILL fail on multipath systems.
raise RuntimeError('Could not determine default IPv4 route')
self.my_ip = ipaddress.ip_address(_defrt[0]['attrs']['RTA_PREFSRC'])
return(None)

def start(self):
if self.force_update:
self._get_my_ip()
self.update()
if not self.ipr:
self.ipr = IPRoute()
self.ipr.link('add',
ifname = 'sit-he-{0}'.format(self.tun_id),
kind = 'sit',
ipip_local = self.my_ip,
ipip_remote = self.server,
ipip_ttl = 255)
self.iface_idx = self.ipr.link_lookup(ifname = 'sit-he-{0}'.format(self.tun_id))[0]
self.ipr.link('set', index = self.iface_idx, state = 'up', mtu = 1480)
for a in self.addrs:
self.ipr.addr('add',
index = self.iface_idx,
address = str(a),
mask = a.prefixlen,
family = socket.AF_INET6)
self.ipr.route('add', dst = '::', mask = 0, oif = self.iface_idx, family = socket.AF_INET6)
self.ipr.close()
return(None)

def stop(self):
if not self.ipr:
self.ipr = IPRoute()
self.iface_idx = self.ipr.link_lookup(ifname = 'sit-he-{0}'.format(self.tun_id))[0]
self.ipr.link('set', index = self.iface_idx, state = 'down')
self.ipr.route('del', dst = '::', mask = 0, oif = self.iface_idx, family = socket.AF_INET6)
self.ipr.link('set', index = self.iface_idx, state = 'down')
self.ipr.link('del', index = self.iface_idx)
self.ipr.close()
return(None)

def update(self, oneshot = False):
self._get_my_ip()
auth_handler = requests.auth.HTTPBasicAuth(self.user, self.update_key)
req = requests.get(self.api_base,
params = {'hostname': self.tun_id,
'myip': self.my_ip},
auth = auth_handler)
if not req.ok:
raise RuntimeError('Could not update client IP in tunnel')
status = req.content.decode('utf-8').split()[0].strip()
if status.lower() not in ('good', 'nochg'):
raise RuntimeError('Client IP update returned failure')
if self.ipr and oneshot:
self.ipr.close()
self.ipr = None
return(None)


def parseArgs():
args = argparse.ArgumentParser(description = ('Dynamically update and enable/disable '
'Hurricane Electric Tunnelbroker'))
args.add_argument('-i', '--no-wan-ip',
dest = 'wan_ip',
action = 'store_false',
help = ('If specified, use the RFC1918 IP address assigned to this machine instead of the WAN '
'IP (necessary if this machine is behind NAT)'))
args.add_argument('-c', '--config',
dest = 'conf',
default = '~/.config/he_tunnelbroker.ini',
help = ('The path to the config. '
'Default: ~/.config/he_tunnelbroker.ini'))
args.add_argument('-t', '--tunnel-id',
dest = 'tun_id',
help = ('The tunnel profile ID/name to use in -c/--config. '
'Default is to use the first one found.'))
args.add_argument('-u', '--no-update',
dest = 'update',
action = 'store_false',
help = ('If specified, do not perform the automatic update for start operations. Has no effect '
'for other operations'))
args.add_argument('oper',
metavar = 'OPERATION',
choices = ('update', 'start', 'stop'),
help = ('The operation to perform. "update" is performed automatically by "start", '
'but otherwise will just update the IPv4 address on record at tunnelbroker'))
return(args)


def main():
args = parseArgs().parse_args()
import pprint
pprint.pprint(vars(args))
tb = TunnelBroker(**vars(args))
if args.oper == 'start':
tb.start()
elif args.oper == 'stop':
tb.stop()
elif args.oper == 'update':
tb.update(oneshot = True)
return(None)


if __name__ == '__main__':
main()