#!/usr/bin/env python3 import argparse import configparser import ipaddress import os import socket ## import requests import requests.auth from pyroute2 import IPDB from pyroute2 import IPRoute # TODO: add checking to see if we're already configured. # https://wiki.archlinux.org/index.php/IPv6_tunnel_broker_setup # https://forums.he.net/index.php?topic=3153.0 # https://gist.github.com/pklaus/960672 class TunnelBroker(object): ipget_url = 'https://ipv4.clientinfo.square-r00t.net/' ipget_params = {'raw': '1'} api_base = 'https://ipv4.tunnelbroker.net/nic/update' def __init__(self, conf, tun_id = None, wan_ip = True, update = True, *args, **kwargs): self.my_ip = None self.user = None self.update_key = None self.ipr = None self.iface_idx = None self.wan = wan_ip self.force_update = update self.conf_file = os.path.abspath(os.path.expanduser(conf)) self._conf = configparser.ConfigParser(allow_no_value = True, interpolation = configparser.ExtendedInterpolation()) self._conf.read(self.conf_file) if len(self._conf.sections()) < 1: raise RuntimeError('Config file has no sections/tunnels defined') self.tun_id = tun_id if self.tun_id and self.tun_id not in self._conf.sections(): raise ValueError('tun_id not a valid tunnel ID') elif not self.tun_id: self.tun_id = self._conf.sections()[0] self.cfg = self._conf[self.tun_id] self.server = ipaddress.ip_address(self.cfg['server']) self.addrs = [ipaddress.ip_network(ip.strip()) for ip in self.cfg['allocations'].split(',')] for k in ('user', 'update_key'): setattr(self, k, self.cfg[k]) def _get_my_ip(self): if self.wan: req = requests.get(self.ipget_url, params = self.ipget_params) if not req.ok: raise RuntimeError('Could not fetch self IP') self.my_ip = ipaddress.ip_address(req.json()['ip']) else: if not self.ipr: self.ipr = IPRoute() _defrt = self.ipr.get_default_routes(family = socket.AF_INET) if len(_defrt) != 1: # This (probably) WILL fail on multipath systems. raise RuntimeError('Could not determine default IPv4 route') self.my_ip = ipaddress.ip_address(_defrt[0]['attrs']['RTA_PREFSRC']) return(None) def start(self): if self.force_update: self._get_my_ip() self.update() if not self.ipr: self.ipr = IPRoute() self.ipr.link('add', ifname = 'sit-he-{0}'.format(self.tun_id), kind = 'sit', ipip_local = self.my_ip, ipip_remote = self.server, ipip_ttl = 255) self.iface_idx = self.ipr.link_lookup(ifname = 'sit-he-{0}'.format(self.tun_id))[0] self.ipr.link('set', index = self.iface_idx, state = 'up', mtu = 1480) for a in self.addrs: self.ipr.addr('add', index = self.iface_idx, address = str(a), mask = a.prefixlen, family = socket.AF_INET6) self.ipr.route('add', dst = '::', mask = 0, oif = self.iface_idx, family = socket.AF_INET6) self.ipr.close() return(None) def stop(self): if not self.ipr: self.ipr = IPRoute() self.iface_idx = self.ipr.link_lookup(ifname = 'sit-he-{0}'.format(self.tun_id))[0] self.ipr.link('set', index = self.iface_idx, state = 'down') self.ipr.route('del', dst = '::', mask = 0, oif = self.iface_idx, family = socket.AF_INET6) self.ipr.link('set', index = self.iface_idx, state = 'down') self.ipr.link('del', index = self.iface_idx) self.ipr.close() return(None) def update(self, oneshot = False): self._get_my_ip() auth_handler = requests.auth.HTTPBasicAuth(self.user, self.update_key) req = requests.get(self.api_base, params = {'hostname': self.tun_id, 'myip': self.my_ip}, auth = auth_handler) if not req.ok: raise RuntimeError('Could not update client IP in tunnel') status = req.content.decode('utf-8').split()[0].strip() if status.lower() not in ('good', 'nochg'): raise RuntimeError('Client IP update returned failure') if self.ipr and oneshot: self.ipr.close() self.ipr = None return(None) def parseArgs(): args = argparse.ArgumentParser(description = ('Dynamically update and enable/disable ' 'Hurricane Electric Tunnelbroker')) args.add_argument('-i', '--no-wan-ip', dest = 'wan_ip', action = 'store_false', help = ('If specified, use the RFC1918 IP address assigned to this machine instead of the WAN ' 'IP (necessary if this machine is behind NAT)')) args.add_argument('-c', '--config', dest = 'conf', default = '~/.config/he_tunnelbroker.ini', help = ('The path to the config. ' 'Default: ~/.config/he_tunnelbroker.ini')) args.add_argument('-t', '--tunnel-id', dest = 'tun_id', help = ('The tunnel profile ID/name to use in -c/--config. ' 'Default is to use the first one found.')) args.add_argument('-u', '--no-update', dest = 'update', action = 'store_false', help = ('If specified, do not perform the automatic update for start operations. Has no effect ' 'for other operations')) args.add_argument('oper', metavar = 'OPERATION', choices = ('update', 'start', 'stop'), help = ('The operation to perform. "update" is performed automatically by "start", ' 'but otherwise will just update the IPv4 address on record at tunnelbroker')) return(args) def main(): args = parseArgs().parse_args() import pprint pprint.pprint(vars(args)) tb = TunnelBroker(**vars(args)) if args.oper == 'start': tb.start() elif args.oper == 'stop': tb.stop() elif args.oper == 'update': tb.update(oneshot = True) return(None) if __name__ == '__main__': main()