aif-ng/aif/disk/luks_fallback.py

332 lines
16 KiB
Python
Raw Permalink Normal View History

import logging
import os
import re
import secrets
import subprocess
import tempfile
import uuid
##
import parse
from lxml import etree
##
import aif.disk.block_fallback as block
import aif.disk.lvm_fallback as lvm
import aif.disk.mdadm_fallback as mdadm
_logger = logging.getLogger(__name__)
class LuksSecret(object):
def __init__(self, *args, **kwargs):
self.passphrase = None
self.size = 4096
self.path = None
2019-12-22 11:59:49 -05:00
_logger.info('Instantiated {0}.'.format(type(self).__name__))
class LuksSecretPassphrase(LuksSecret):
def __init__(self, passphrase):
super().__init__()
self.passphrase = passphrase
class LuksSecretFile(LuksSecret):
# TODO: might do a little tweaking in a later release to support *reading from* bytes.
def __init__(self, path, passphrase = None, bytesize = 4096):
super().__init__()
self.path = os.path.realpath(path)
2019-12-22 11:59:49 -05:00
_logger.debug('Path canonized: {0} => {1}'.format(path, self.path))
self.passphrase = passphrase
self.size = bytesize # only used if passphrase == None
self._genSecret()
def _genSecret(self):
if not self.passphrase:
# TODO: is secrets.token_bytes safe for *persistent* random data?
self.passphrase = secrets.token_bytes(self.size)
if not isinstance(self.passphrase, bytes):
self.passphrase = self.passphrase.encode('utf-8')
2019-12-22 11:59:49 -05:00
_logger.debug('Secret generated.')
return(None)
class LUKS(object):
2019-11-06 07:33:15 -05:00
def __init__(self, luks_xml, partobj):
self.xml = luks_xml
2019-12-22 11:59:49 -05:00
_logger.debug('luks_xml: {0}'.format(etree.tostring(self.xml, with_tail = False).decode('utf-8')))
self.id = self.xml.attrib['id']
self.name = self.xml.attrib['name']
self.device = partobj
self.source = self.device.devpath
self.secrets = []
self.created = False
self.locked = True
if not isinstance(self.device, (block.Disk,
block.Partition,
lvm.LV,
mdadm.Array)):
2019-12-22 11:59:49 -05:00
_logger.error(('partobj must be of type '
'aif.disk.block.Disk, '
'aif.disk.block.Partition, '
'aif.disk.lvm.LV, or'
'aif.disk.mdadm.Array.'))
raise TypeError('Invalid partobj type')
self.devpath = '/dev/mapper/{0}'.format(self.name)
self.info = None
def addSecret(self, secretobj):
if not isinstance(secretobj, LuksSecret):
2019-12-22 11:59:49 -05:00
_logger.error('secretobj must be of type '
'aif.disk.luks.LuksSecret '
'(aif.disk.luks.LuksSecretPassphrase or '
'aif.disk.luks.LuksSecretFile).')
raise TypeError('Invalid secretobj type')
self.secrets.append(secretobj)
return(None)
def createSecret(self, secrets_xml = None):
2019-12-22 11:59:49 -05:00
_logger.info('Compiling secrets.')
if not secrets_xml: # Find all of them from self
2019-12-22 11:59:49 -05:00
_logger.debug('No secrets_xml specified; fetching from configuration block.')
for secret_xml in self.xml.findall('secrets'):
_logger.debug('secret_xml: {0}'.format(etree.tostring(secret_xml, with_tail = False).decode('utf-8')))
secretobj = None
secrettypes = set()
2019-12-22 11:59:49 -05:00
for s in secret_xml.iterchildren():
_logger.debug('secret_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
# This is safe, because a valid config only has at most one of both types.
2019-12-22 11:59:49 -05:00
kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text, # path
2019-12-22 11:59:49 -05:00
passphrase = secret_xml.find('passphrase').text,
bytesize = kf.attrib.get('size', 4096)) # TECHNICALLY should be a no-op.
elif 'passphrase' in secrettypes:
2019-12-22 11:59:49 -05:00
secretobj = LuksSecretPassphrase(secret_xml.find('passphrase').text)
elif 'keyFile' in secrettypes:
2019-12-22 11:59:49 -05:00
kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text,
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
else:
2019-12-22 11:59:49 -05:00
_logger.debug('A secrets_xml was specified.')
secretobj = None
secrettypes = set()
for s in secrets_xml.iterchildren():
2019-12-22 11:59:49 -05:00
_logger.debug('secrets_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
# This is safe, because a valid config only has at most one of both types.
kf = secrets_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text, # path
passphrase = secrets_xml.find('passphrase').text,
bytesize = kf.attrib.get('size', 4096)) # TECHNICALLY should be a no-op.
elif 'passphrase' in secrettypes:
secretobj = LuksSecretPassphrase(secrets_xml.find('passphrase').text)
elif 'keyFile' in secrettypes:
kf = secrets_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text,
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
2019-12-22 11:59:49 -05:00
_logger.debug('Secrets compiled.')
return(None)
def create(self):
if self.created:
return(None)
2019-12-22 11:59:49 -05:00
_logger.info('Creating LUKS volume on {0}'.format(self.source))
if not self.secrets:
2019-12-22 11:59:49 -05:00
_logger.error('Cannot create a LUKS volume with no secrets added.')
raise RuntimeError('Cannot create a LUKS volume with no secrets')
for idx, secret in enumerate(self.secrets):
if idx == 0:
# TODO: add support for custom parameters for below?
2019-12-22 11:59:49 -05:00
cmd_str = ['cryptsetup',
'--batch-mode',
'luksFormat',
'--type', 'luks2',
'--key-file', '-',
self.source]
cmd = subprocess.run(cmd_str,
input = secret.passphrase,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to encrypt successfully')
else:
2019-12-22 11:59:49 -05:00
# TODO: does the key-file need to be the same path in the installed system?
tmpfile = tempfile.mkstemp()
with open(tmpfile[1], 'wb') as fh:
fh.write(secret.passphrase)
2019-12-22 11:59:49 -05:00
cmd_str = ['cryptsetup',
'--batch-mode',
'luksAdd',
'--type', 'luks2',
'--key-file', '-',
self.source,
tmpfile[1]]
cmd = subprocess.run(cmd_str,
input = self.secrets[0].passphrase,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
os.remove(tmpfile[1])
2019-12-22 11:59:49 -05:00
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to encrypt successfully')
self.created = True
return(None)
def lock(self):
if not self.created:
raise RuntimeError('Cannot lock a LUKS volume before it is created')
if self.locked:
return(None)
2019-12-22 11:59:49 -05:00
cmd_str = ['cryptsetup',
'--batch-mode',
'luksClose',
self.name]
cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to lock successfully')
self.locked = True
return(None)
def unlock(self, passphrase = None):
if not self.created:
raise RuntimeError('Cannot unlock a LUKS volume before it is created')
if not self.locked:
return(None)
2019-12-22 11:59:49 -05:00
cmd_str = ['cryptsetup',
'--batch-mode',
'luksOpen',
'--key-file', '-',
self.source,
self.name]
cmd = subprocess.run(cmd_str, input = self.secrets[0].passphrase)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to unlock successfully')
self.locked = False
return(None)
def updateInfo(self):
if self.locked:
raise RuntimeError('Must be unlocked to gather info')
info = {}
2019-12-22 11:59:49 -05:00
cmd_str = ['cryptsetup',
'--batch-mode',
'luksDump',
self.source]
cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
if cmd.returncode != 0:
_logger.warning('Command returned non-zero status')
_logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
for a in ('stdout', 'stderr'):
x = getattr(cmd, a)
if x:
_logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
raise RuntimeError('Failed to fetch info successfully')
_info = cmd.stdout.decode('utf-8')
k = None
# I wish there was a better way to do this but I sure as heck am not writing a regex to do it.
2019-12-22 11:59:49 -05:00
# https://gitlab.com/cryptsetup/cryptsetup/issues/511
# https://pypi.org/project/parse/
_tpl = ('LUKS header information\nVersion: {header_ver}\nEpoch: {epoch_ver}\n'
'Metadata area: {metadata_pos} [bytes]\nKeyslots area: {keyslots_pos} [bytes]\n'
'UUID: {uuid}\nLabel: {label}\nSubsystem: {subsystem}\n'
'Flags: {flags}\n\nData segments:\n 0: crypt\n '
'offset: {offset_bytes} [bytes]\n length: {crypt_length}\n '
'cipher: {crypt_cipher}\n sector: {sector_size} [bytes]\n\nKeyslots:\n 0: luks2\n '
'Key: {key_size} bits\n Priority: {priority}\n '
'Cipher: {keyslot_cipher}\n Cipher key: {cipher_key_size} bits\n '
'PBKDF: {pbkdf}\n Time cost: {time_cost}\n Memory: {memory}\n '
'Threads: {threads}\n Salt: {key_salt} \n AF stripes: {af_stripes}\n '
'AF hash: {af_hash}\n Area offset:{keyslot_offset} [bytes]\n '
'Area length:{keyslot_length} [bytes]\n Digest ID: {keyslot_id}\nTokens:\nDigests:\n '
'0: pbkdf2\n Hash: {token_hash}\n Iterations: {token_iterations}\n '
'Salt: {token_salt}\n Digest: {token_digest}\n\n')
info = parse.parse(_tpl, _info).named
for k, v in info.items():
# Technically we can do this in the _tpl string, but it's hard to visually parse.
if k in ('af_stripes', 'cipher_key_size', 'epoch_ver', 'header_ver', 'key_size', 'keyslot_id',
'keyslot_length', 'keyslot_offset', 'keyslots_pos', 'memory', 'metadata_pos', 'offset_bytes',
'sector_size', 'threads', 'time_cost', 'token_iterations'):
v = int(v)
elif k in ('key_salt', 'token_digest', 'token_salt'):
v = bytes.fromhex(re.sub(r'\s+', '', v))
elif k in ('label', 'subsystem'):
if re.search(r'\(no\s+', v.lower()):
v = None
elif k == 'flags':
if v.lower() == '(no flags)':
v = []
else:
2019-12-22 11:59:49 -05:00
# Is this pace-separated or comma-separated? TODO.
v = [i.strip() for i in v.split() if i.strip() != '']
elif k == 'uuid':
v = uuid.UUID(hex = v)
self.info = info
2019-12-22 11:59:49 -05:00
_logger.debug('Rendered updated info: {0}'.format(self.inf))
return(None)
2019-12-22 11:59:49 -05:00
def writeConf(self, chroot_base, init_hook = True):
_logger.info('Generating crypttab.')
if not self.secrets:
2019-12-22 11:59:49 -05:00
_logger.error('Secrets must be added before the configuration can be written.')
raise RuntimeError('Missing secrets')
conf = os.path.join(chroot_base, 'etc', 'crypttab')
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
disk_uuid = None
uuid_dir = '/dev/disk/by-uuid'
for u in os.listdir(uuid_dir):
d = os.path.join(uuid_dir, u)
if os.path.realpath(d) == self.source:
disk_uuid = u
if disk_uuid:
identifer = 'UUID={0}'.format(disk_uuid)
else:
# This is *not* ideal, but better than nothing.
identifer = self.source
primary_key = self.secrets[0]
luksinfo = '{0}\t{1}\t{2}\tluks'.format(self.name,
identifer,
(primary_key.path if primary_key.path else '-'))
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
2019-12-22 11:59:49 -05:00
if init_hook:
_logger.debug('Symlinked initramfs crypttab.')
os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs'))
_logger.debug('Generated crypttab line: {0}'.format(luksinfo))
return(None)