checking in before i do some major restructuring of wifi stuff in the xml/xsd
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import binascii
|
||||
import ipaddress
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
##
|
||||
from passlib.crypto.digest import pbkdf2_hmac
|
||||
from pyroute2 import IPDB
|
||||
##
|
||||
import aif.utils
|
||||
@@ -47,7 +49,22 @@ def convertIpTuples(addr_xmlobj):
|
||||
return((addr, net, gw))
|
||||
|
||||
|
||||
def convertWifiCrypto(crypto_xmlobj):
|
||||
def convertPSK(ssid, passphrase):
|
||||
try:
|
||||
passphrase = passphrase.encode('utf-8').decode('ascii').strip('\r').strip('\n')
|
||||
except UnicodeDecodeError:
|
||||
raise ValueError('passphrase must be an ASCII string')
|
||||
if len(ssid) > 32:
|
||||
raise ValueError('ssid must be <= 32 characters')
|
||||
if not 7 < len(passphrase) < 64:
|
||||
raise ValueError('passphrase must be >= 8 and <= 32 characters')
|
||||
raw_psk = pbkdf2_hmac('sha1', str(passphrase), str(ssid), 4096, 32)
|
||||
hex_psk = binascii.hexlify(raw_psk)
|
||||
str_psk = hex_psk.decode('utf-8')
|
||||
return(str_psk)
|
||||
|
||||
|
||||
def convertWifiCrypto(crypto_xmlobj, ssid):
|
||||
crypto = {'type': crypto_xmlobj.find('type').text.strip()}
|
||||
# if crypto['type'] in ('wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
@@ -61,7 +78,8 @@ def convertWifiCrypto(crypto_xmlobj):
|
||||
creds = crypto_xmlobj.find('creds')
|
||||
crypto['auth'] = {'type': creds.attrib.get('type', 'psk').strip()}
|
||||
if crypto['auth']['type'] == 'psk':
|
||||
crypto['auth']['psk'] = creds.text
|
||||
crypto['auth']['passphrase'] = creds.text.strip('\r').strip('\n')
|
||||
crypto['auth']['psk'] = convertPSK(ssid, creds.text)
|
||||
# TODO: enterprise support
|
||||
return(crypto)
|
||||
|
||||
@@ -211,3 +229,8 @@ class BaseConnection(object):
|
||||
if addrset not in self.routes[addrtype]:
|
||||
self.routes[addrtype].append(addrset)
|
||||
return()
|
||||
|
||||
def _writeConnCfg(self, chroot_base = None):
|
||||
# Dummy method.
|
||||
pass
|
||||
return()
|
||||
|
||||
@@ -274,6 +274,7 @@ class Wireless(Connection):
|
||||
def __init__(self, iface_xml):
|
||||
super().__init__(iface_xml)
|
||||
self.connection_type = 'wireless'
|
||||
self.packages.add('wpa_supplicant')
|
||||
self._initCfg()
|
||||
self._initConnCfg()
|
||||
|
||||
@@ -291,8 +292,7 @@ class Wireless(Connection):
|
||||
self._cfg['BASE']['AP'] = bssid
|
||||
crypto = self.xml.find('encryption')
|
||||
if crypto:
|
||||
self.packages.add('wpa_supplicant')
|
||||
crypto = aif.network._common.convertWifiCrypto(crypto)
|
||||
crypto = aif.network._common.convertWifiCrypto(crypto, self.xml.attrib['essid'])
|
||||
# if crypto['type'] in ('wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
# TODO: WPA2 enterprise
|
||||
|
||||
19
aif/network/networkd.conf.j2
Normal file
19
aif/network/networkd.conf.j2
Normal file
@@ -0,0 +1,19 @@
|
||||
# Generated by AIF-NG.
|
||||
{%- for section_name, section_items in cfg.items() %}
|
||||
{%- if section_items|isList %}
|
||||
{#- We *only* use lists-of-dicts because they should always render to their own sections.
|
||||
INI doesn't support nesting, thankfully. #}
|
||||
{%- for i in section_items %}
|
||||
[{{ section_name }}]
|
||||
{%- for k, v in i.items() %}
|
||||
{{ k }}={{ v }}
|
||||
{%- endfor %}
|
||||
{% endfor %}
|
||||
{%- else %}
|
||||
{#- It's a single-level dict. #}
|
||||
[{{ section_name }}]
|
||||
{%- for k, v in section_items.items() %}
|
||||
{{ k }}={{ v }}
|
||||
{%- endfor %}
|
||||
{%- endif %}
|
||||
{% endfor %}
|
||||
@@ -1,8 +1,157 @@
|
||||
import ipaddress
|
||||
import socket
|
||||
import os
|
||||
##
|
||||
# We have to use Jinja2 because while there are ways to *parse* an INI with duplicate keys
|
||||
# (https://stackoverflow.com/a/38286559/733214), there's no way to *write* an INI with them using configparser.
|
||||
# So we use Jinja2 logic.
|
||||
import jinja2
|
||||
##
|
||||
import aif.utils
|
||||
import aif.network._common
|
||||
|
||||
|
||||
class Connection(aif.network._common.BaseConnection):
|
||||
def __init__(self, iface_xml):
|
||||
super().__init__(iface_xml)
|
||||
self.provider_type = 'systemd-networkd'
|
||||
self.packages = set()
|
||||
self.services = {
|
||||
('/usr/lib/systemd/system/systemd-networkd.service'): ('etc/systemd/system/'
|
||||
'multi-user.target.wants/'
|
||||
'systemd-networkd.service'),
|
||||
('/usr/lib/systemd/system/systemd-networkd.service'): ('etc/systemd/system/'
|
||||
'dbus-org.freedesktop.network1.service'),
|
||||
('/usr/lib/systemd/system/systemd-networkd.socket'): ('etc/systemd/system/'
|
||||
'sockets.target.wants/systemd-networkd.socket'),
|
||||
('/usr/lib/systemd/system/systemd-networkd.socket'): ('etc/systemd/system/'
|
||||
'network-online.target.wants/'
|
||||
'systemd-networkd-wait-online.service'),
|
||||
# We include these *even if* self.auto['resolvers'][*] are false.
|
||||
('/usr/lib/systemd/system/systemd-resolved.service'): ('etc/systemd/system/'
|
||||
'dbus-org.freedesktop.resolve1.service'),
|
||||
('/usr/lib/systemd/system/systemd-resolved.service'): ('etc/systemd/'
|
||||
'system/multi-user.target.wants/'
|
||||
'systemd-resolved.service')}
|
||||
self._wpasupp = {}
|
||||
self._initJ2()
|
||||
|
||||
def _initCfg(self):
|
||||
if self.device == 'auto':
|
||||
self.device = aif.network._common.getDefIface(self.connection_type)
|
||||
self._cfg = {'Match': {'Name': self.device},
|
||||
'Network': {'Description': ('A {0} profile for {1} '
|
||||
'(generated by AIF-NG)').format(self.connection_type,
|
||||
self.device),
|
||||
'DefaultRouteOnDevice': ('true' if self.is_defroute else 'false'),
|
||||
# This (may) get modified by logic below.
|
||||
'IPv6AcceptRA': 'false',
|
||||
'LinkLocalAddressing': 'no'}}
|
||||
if self.domain:
|
||||
self._cfg['Network']['Domains'] = self.domain
|
||||
if self.resolvers:
|
||||
self._cfg['Network']['DNS'] = [str(ip) for ip in self.resolvers]
|
||||
if all((self.auto['addresses']['ipv4'], self.auto['addresses']['ipv6'])):
|
||||
self._cfg['Network']['IPv6AcceptRA'] = 'true'
|
||||
self._cfg['Network']['LinkLocalAddressing'] = 'ipv6'
|
||||
self._cfg['Network']['DHCP'] = 'yes'
|
||||
elif self.auto['addresses']['ipv4'] and not self.auto['addresses']['ipv6']:
|
||||
self._cfg['Network']['DHCP'] = 'ipv4'
|
||||
elif (not self.auto['addresses']['ipv4']) and self.auto['addresses']['ipv6']:
|
||||
self._cfg['Network']['IPv6AcceptRA'] = 'true'
|
||||
self._cfg['Network']['LinkLocalAddressing'] = 'ipv6'
|
||||
self._cfg['Network']['DHCP'] = 'ipv6'
|
||||
else:
|
||||
self._cfg['Network']['DHCP'] = 'no'
|
||||
if any((self.auto['addresses']['ipv4'], self.auto['routes']['ipv4'], self.auto['resolvers']['ipv4'])):
|
||||
t = 'ipv4'
|
||||
self._cfg['DHCPv4'] = {'UseDNS': ('true' if self.auto['resolvers'][t] else 'false'),
|
||||
'UseRoutes': ('true' if self.auto['routes'][t] else 'false')}
|
||||
if any((self.auto['addresses']['ipv6'], self.auto['routes']['ipv6'], self.auto['resolvers']['ipv6'])):
|
||||
t = 'ipv6'
|
||||
self._cfg['Network']['IPv6AcceptRA'] = 'true'
|
||||
self._cfg['DHCPv6'] = {'UseDNS': ('true' if self.auto['resolvers'][t] else 'false')}
|
||||
for t in ('ipv4', 'ipv6'):
|
||||
if self.addrs[t]:
|
||||
if t == 'ipv6':
|
||||
self._cfg['Network']['LinkLocalAddressing'] = 'ipv6'
|
||||
if 'Address' not in self._cfg.keys():
|
||||
self._cfg['Address'] = []
|
||||
for addr, net, gw in self.addrs[t]:
|
||||
a = {'Address': '{0}/{1}'.format(str(addr), str(net.prefixlen))}
|
||||
self._cfg['Address'].append(a)
|
||||
if self.routes[t]:
|
||||
if 'Route' not in self._cfg.keys():
|
||||
self._cfg['Route'] = []
|
||||
for route, net, gw in self.routes[t]:
|
||||
r = {'Gateway': str(gw),
|
||||
'Destination': '{0}/{1}'.format(str(route), str(net.prefixlen))}
|
||||
self._cfg['Route'].append(r)
|
||||
if self._cfg['Network']['IPv6AcceptRA'] == 'true':
|
||||
self._cfg['Network']['LinkLocalAddressing'] = 'ipv6'
|
||||
if 'IPv6AcceptRA' not in self._cfg.keys():
|
||||
self._cfg['IPv6AcceptRA'] = {'UseDNS': ('true' if self.auto['resolvers']['ipv6'] else 'false')}
|
||||
self._initConnCfg()
|
||||
return()
|
||||
|
||||
def _initJ2(self):
|
||||
self.j2_env = jinja2.Environment(loader = jinja2.FileSystemLoader(searchpath = './'))
|
||||
self.j2_env.filters.update(aif.utils.j2_filters)
|
||||
self.j2_tpl = self.j2_env.get_template('networkd.conf.j2')
|
||||
return()
|
||||
|
||||
def writeConf(self, chroot_base):
|
||||
cfgroot = os.path.join(chroot_base, 'etc', 'systemd', 'network')
|
||||
cfgfile = os.path.join(cfgroot, self.id)
|
||||
os.makedirs(cfgroot, exist_ok = True)
|
||||
os.chown(cfgroot, 0, 0)
|
||||
os.chmod(cfgroot, 0o0755)
|
||||
with open(cfgfile, 'w') as fh:
|
||||
fh.write(self.j2_tpl.render(cfg = self._cfg))
|
||||
os.chmod(cfgfile, 0o0644)
|
||||
os.chown(cfgfile, 0, 0)
|
||||
self._writeConnCfg(chroot_base)
|
||||
return()
|
||||
|
||||
|
||||
class Ethernet(Connection):
|
||||
def __init__(self, iface_xml):
|
||||
super().__init__(iface_xml)
|
||||
self.connection_type = 'ethernet'
|
||||
self._initCfg()
|
||||
|
||||
|
||||
class Wireless(Connection):
|
||||
def __init__(self, iface_xml):
|
||||
super().__init__(iface_xml)
|
||||
self.connection_type = 'wireless'
|
||||
self.packages.add('wpa_supplicant')
|
||||
self.services['src'] = 'dest'
|
||||
self._initCfg()
|
||||
|
||||
def _initConnCfg(self):
|
||||
self._wpasupp['ssid'] = self.xml.attrib['essid']
|
||||
hidden = aif.utils.xmlBool(self.xml.attrib.get('hidden', 'false'))
|
||||
if hidden:
|
||||
self._wpasupp['scan_ssid'] = 1
|
||||
try:
|
||||
bssid = self.xml.attrib.get('bssid').strip()
|
||||
except AttributeError:
|
||||
bssid = None
|
||||
if bssid:
|
||||
bssid = aif.network._common.canonizeEUI(bssid)
|
||||
self._cfg['BASE']['AP'] = bssid
|
||||
crypto = self.xml.find('encryption')
|
||||
if crypto:
|
||||
crypto = aif.network._common.convertWifiCrypto(crypto, self._cfg['BASE']['ESSID'])
|
||||
# if crypto['type'] in ('wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
# TODO: WPA2 enterprise
|
||||
self._cfg['BASE']['Security'] = 'wpa'
|
||||
# if crypto['type'] in ('wep', 'wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
self._cfg['BASE']['Key'] = crypto['auth']['psk']
|
||||
return()
|
||||
|
||||
def _writeConnCfg(self, chroot_base):
|
||||
cfgroot = os.path.join(chroot_base, 'etc', 'wpa_supplicant')
|
||||
cfgbase = os.path.join(cfgroot, 'wpa_supplicant.conf')
|
||||
cfgfile = os.path.join(cfgroot, self.id)
|
||||
|
||||
@@ -145,7 +145,7 @@ class Wireless(Connection):
|
||||
if crypto:
|
||||
self.packages.add('wpa_supplicant')
|
||||
self._cfg['wifi-security'] = {}
|
||||
crypto = aif.network._common.convertWifiCrypto(crypto)
|
||||
crypto = aif.network._common.convertWifiCrypto(crypto, self._cfg['wifi']['ssid'])
|
||||
# if crypto['type'] in ('wpa', 'wpa2', 'wpa3'):
|
||||
if crypto['type'] in ('wpa', 'wpa2'):
|
||||
# TODO: WPA2 enterprise
|
||||
|
||||
Reference in New Issue
Block a user