summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrent s <bts@square-r00t.net>2019-12-22 11:59:49 -0500
committerbrent s <bts@square-r00t.net>2019-12-22 11:59:49 -0500
commit48ab7f953ffa0368d688ad28aca8cf635fe7e3f3 (patch)
tree8786834b053a018079cae31b69f6b39bd9dc795b
parenta65ef8232afb2e5137cf8f405042cfbfbd1e5ea9 (diff)
downloadAIF-NG-48ab7f953ffa0368d688ad28aca8cf635fe7e3f3.tar.xz
luks logging done
-rw-r--r--aif/disk/luks.py4
-rw-r--r--aif/disk/luks_fallback.py177
2 files changed, 127 insertions, 54 deletions
diff --git a/aif/disk/luks.py b/aif/disk/luks.py
index c7c5136..8d974f7 100644
--- a/aif/disk/luks.py
+++ b/aif/disk/luks.py
@@ -217,7 +217,6 @@ class LUKS(object):
_logger.error('Secrets must be added before the configuration can be written.')
raise RuntimeError('Missing secrets')
conf = os.path.join(chroot_base, 'etc', 'crypttab')
- initconf = '{0}.initramfs'.format(conf)
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
@@ -239,5 +238,8 @@ class LUKS(object):
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
+ if init_hook:
+ _logger.debug('Symlinked initramfs crypttab.')
+ os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs'))
_logger.debug('Generated crypttab line: {0}'.format(luksinfo))
return(None)
diff --git a/aif/disk/luks_fallback.py b/aif/disk/luks_fallback.py
index 49e2a42..7fe893a 100644
--- a/aif/disk/luks_fallback.py
+++ b/aif/disk/luks_fallback.py
@@ -22,6 +22,7 @@ class LuksSecret(object):
self.passphrase = None
self.size = 4096
self.path = None
+ _logger.info('Instantiated {0}.'.format(type(self).__name__))
class LuksSecretPassphrase(LuksSecret):
@@ -35,6 +36,7 @@ class LuksSecretFile(LuksSecret):
def __init__(self, path, passphrase = None, bytesize = 4096):
super().__init__()
self.path = os.path.realpath(path)
+ _logger.debug('Path canonized: {0} => {1}'.format(path, self.path))
self.passphrase = passphrase
self.size = bytesize # only used if passphrase == None
self._genSecret()
@@ -45,12 +47,14 @@ class LuksSecretFile(LuksSecret):
self.passphrase = secrets.token_bytes(self.size)
if not isinstance(self.passphrase, bytes):
self.passphrase = self.passphrase.encode('utf-8')
+ _logger.debug('Secret generated.')
return(None)
class LUKS(object):
def __init__(self, luks_xml, partobj):
self.xml = luks_xml
+ _logger.debug('luks_xml: {0}'.format(etree.tostring(self.xml, with_tail = False).decode('utf-8')))
self.id = self.xml.attrib['id']
self.name = self.xml.attrib['name']
self.device = partobj
@@ -62,48 +66,57 @@ class LUKS(object):
block.Partition,
lvm.LV,
mdadm.Array)):
- raise ValueError(('partobj must be of type '
- 'aif.disk.block.Disk, '
- 'aif.disk.block.Partition, '
- 'aif.disk.lvm.LV, or'
- 'aif.disk.mdadm.Array'))
+ _logger.error(('partobj must be of type '
+ 'aif.disk.block.Disk, '
+ 'aif.disk.block.Partition, '
+ 'aif.disk.lvm.LV, or'
+ 'aif.disk.mdadm.Array.'))
+ raise ValueError('Invalid partobj type')
self.devpath = '/dev/mapper/{0}'.format(self.name)
self.info = None
def addSecret(self, secretobj):
if not isinstance(secretobj, LuksSecret):
- raise ValueError('secretobj must be of type aif.disk.luks.LuksSecret '
- '(aif.disk.luks.LuksSecretPassphrase or '
- 'aif.disk.luks.LuksSecretFile)')
+ _logger.error('secretobj must be of type '
+ 'aif.disk.luks.LuksSecret '
+ '(aif.disk.luks.LuksSecretPassphrase or '
+ 'aif.disk.luks.LuksSecretFile).')
+ raise ValueError('Invalid secretobj type')
self.secrets.append(secretobj)
return(None)
def createSecret(self, secrets_xml = None):
+ _logger.info('Compiling secrets.')
if not secrets_xml: # Find all of them from self
- for secret in self.xml.findall('secrets'):
+ _logger.debug('No secrets_xml specified; fetching from configuration block.')
+ for secret_xml in self.xml.findall('secrets'):
+ _logger.debug('secret_xml: {0}'.format(etree.tostring(secret_xml, with_tail = False).decode('utf-8')))
secretobj = None
secrettypes = set()
- for s in secret.iterchildren():
+ for s in secret_xml.iterchildren():
+ _logger.debug('secret_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
# This is safe, because a valid config only has at most one of both types.
- kf = secret.find('keyFile')
+ kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text, # path
- passphrase = secret.find('passphrase').text,
+ passphrase = secret_xml.find('passphrase').text,
bytesize = kf.attrib.get('size', 4096)) # TECHNICALLY should be a no-op.
elif 'passphrase' in secrettypes:
- secretobj = LuksSecretPassphrase(secret.find('passphrase').text)
+ secretobj = LuksSecretPassphrase(secret_xml.find('passphrase').text)
elif 'keyFile' in secrettypes:
- kf = secret.find('keyFile')
+ kf = secret_xml.find('keyFile')
secretobj = LuksSecretFile(kf.text,
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
else:
+ _logger.debug('A secrets_xml was specified.')
secretobj = None
secrettypes = set()
for s in secrets_xml.iterchildren():
+ _logger.debug('secrets_xml child: {0}'.format(etree.tostring(s, with_tail = False).decode('utf-8')))
secrettypes.add(s.tag)
if all((('passphrase' in secrettypes),
('keyFile' in secrettypes))):
@@ -120,37 +133,63 @@ class LUKS(object):
passphrase = None,
bytesize = kf.attrib.get('size', 4096))
self.secrets.append(secretobj)
+ _logger.debug('Secrets compiled.')
return(None)
def create(self):
if self.created:
return(None)
+ _logger.info('Creating LUKS volume on {0}'.format(self.source))
if not self.secrets:
- raise RuntimeError('Cannot create a LUKS volume with no secrets added')
+ _logger.error('Cannot create a LUKS volume with no secrets added.')
+ raise RuntimeError('Cannot create a LUKS volume with no secrets')
for idx, secret in enumerate(self.secrets):
if idx == 0:
# TODO: add support for custom parameters for below?
- # TODO: logging
- cmd = ['cryptsetup',
- '--batch-mode',
- 'luksFormat',
- '--type', 'luks2',
- '--key-file', '-',
- self.source]
- subprocess.run(cmd, input = secret.passphrase)
+ cmd_str = ['cryptsetup',
+ '--batch-mode',
+ 'luksFormat',
+ '--type', 'luks2',
+ '--key-file', '-',
+ self.source]
+ cmd = subprocess.run(cmd_str,
+ input = secret.passphrase,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ if cmd.returncode != 0:
+ _logger.warning('Command returned non-zero status')
+ _logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
+ for a in ('stdout', 'stderr'):
+ x = getattr(cmd, a)
+ if x:
+ _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
+ raise RuntimeError('Failed to encrypt successfully')
else:
+ # TODO: does the key-file need to be the same path in the installed system?
tmpfile = tempfile.mkstemp()
with open(tmpfile[1], 'wb') as fh:
fh.write(secret.passphrase)
- cmd = ['cryptsetup',
- '--batch-mode',
- 'luksAdd',
- '--type', 'luks2',
- '--key-file', '-',
- self.source,
- tmpfile[1]]
- subprocess.run(cmd, input = self.secrets[0].passphrase)
+ cmd_str = ['cryptsetup',
+ '--batch-mode',
+ 'luksAdd',
+ '--type', 'luks2',
+ '--key-file', '-',
+ self.source,
+ tmpfile[1]]
+ cmd = subprocess.run(cmd_str,
+ input = self.secrets[0].passphrase,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
os.remove(tmpfile[1])
+ if cmd.returncode != 0:
+ _logger.warning('Command returned non-zero status')
+ _logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
+ for a in ('stdout', 'stderr'):
+ x = getattr(cmd, a)
+ if x:
+ _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
+ raise RuntimeError('Failed to encrypt successfully')
self.created = True
return(None)
@@ -159,12 +198,19 @@ class LUKS(object):
raise RuntimeError('Cannot lock a LUKS volume before it is created')
if self.locked:
return(None)
- # TODO: logging
- cmd = ['cryptsetup',
- '--batch-mode',
- 'luksClose',
- self.name]
- subprocess.run(cmd)
+ cmd_str = ['cryptsetup',
+ '--batch-mode',
+ 'luksClose',
+ self.name]
+ cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
+ if cmd.returncode != 0:
+ _logger.warning('Command returned non-zero status')
+ _logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
+ for a in ('stdout', 'stderr'):
+ x = getattr(cmd, a)
+ if x:
+ _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
+ raise RuntimeError('Failed to lock successfully')
self.locked = True
return(None)
@@ -173,13 +219,21 @@ class LUKS(object):
raise RuntimeError('Cannot unlock a LUKS volume before it is created')
if not self.locked:
return(None)
- cmd = ['cryptsetup',
- '--batch-mode',
- 'luksOpen',
- '--key-file', '-',
- self.source,
- self.name]
- subprocess.run(cmd, input = self.secrets[0].passphrase)
+ cmd_str = ['cryptsetup',
+ '--batch-mode',
+ 'luksOpen',
+ '--key-file', '-',
+ self.source,
+ self.name]
+ cmd = subprocess.run(cmd_str, input = self.secrets[0].passphrase)
+ if cmd.returncode != 0:
+ _logger.warning('Command returned non-zero status')
+ _logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
+ for a in ('stdout', 'stderr'):
+ x = getattr(cmd, a)
+ if x:
+ _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
+ raise RuntimeError('Failed to unlock successfully')
self.locked = False
return(None)
@@ -187,13 +241,23 @@ class LUKS(object):
if self.locked:
raise RuntimeError('Must be unlocked to gather info')
info = {}
- cmd = ['cryptsetup',
- '--batch-mode',
- 'luksDump',
- self.source]
- _info = subprocess.run(cmd, stdout = subprocess.PIPE).stdout.decode('utf-8')
+ cmd_str = ['cryptsetup',
+ '--batch-mode',
+ 'luksDump',
+ self.source]
+ cmd = subprocess.run(cmd_str, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
+ if cmd.returncode != 0:
+ _logger.warning('Command returned non-zero status')
+ _logger.debug('Exit status: {0}'.format(str(cmd.returncode)))
+ for a in ('stdout', 'stderr'):
+ x = getattr(cmd, a)
+ if x:
+ _logger.debug('{0}: {1}'.format(a.upper(), x.decode('utf-8').strip()))
+ raise RuntimeError('Failed to fetch info successfully')
+ _info = cmd.stdout.decode('utf-8')
k = None
# I wish there was a better way to do this but I sure as heck am not writing a regex to do it.
+ # https://gitlab.com/cryptsetup/cryptsetup/issues/511
# https://pypi.org/project/parse/
_tpl = ('LUKS header information\nVersion: {header_ver}\nEpoch: {epoch_ver}\n'
'Metadata area: {metadata_pos} [bytes]\nKeyslots area: {keyslots_pos} [bytes]\n'
@@ -225,17 +289,20 @@ class LUKS(object):
if v.lower() == '(no flags)':
v = []
else:
- # Space-separated or comma-separated? TODO.
+ # Is this pace-separated or comma-separated? TODO.
v = [i.strip() for i in v.split() if i.strip() != '']
elif k == 'uuid':
v = uuid.UUID(hex = v)
self.info = info
+ _logger.debug('Rendered updated info: {0}'.format(self.inf))
return(None)
- def writeConf(self, conf = '/etc/crypttab'):
+ def writeConf(self, chroot_base, init_hook = True):
+ _logger.info('Generating crypttab.')
if not self.secrets:
- raise RuntimeError('secrets must be added before the configuration can be written')
- conf = os.path.realpath(conf)
+ _logger.error('Secrets must be added before the configuration can be written.')
+ raise RuntimeError('Missing secrets')
+ conf = os.path.join(chroot_base, 'etc', 'crypttab')
with open(conf, 'r') as fh:
conflines = fh.read().splitlines()
# Get UUID
@@ -257,4 +324,8 @@ class LUKS(object):
if luksinfo not in conflines:
with open(conf, 'a') as fh:
fh.write('{0}\n'.format(luksinfo))
+ if init_hook:
+ _logger.debug('Symlinked initramfs crypttab.')
+ os.symlink('/etc/crypttab', os.path.join(chroot_base, 'etc', 'crypttab.initramfs'))
+ _logger.debug('Generated crypttab line: {0}'.format(luksinfo))
return(None)