diff --git a/TODO b/TODO index 8e7d1a1..543ca7b 100644 --- a/TODO +++ b/TODO @@ -1,11 +1,12 @@ - write classes/functions - XML-based config -x XML syntax ---- xregex btags - case-insensitive? this can be represented in-pattern: - xhttps://stackoverflow.com/a/9655186/733214 +--- x regex btags - case-insensitive? this can be represented in-pattern: + x https://stackoverflow.com/a/9655186/733214 +--- remove sources stuff - that should be in the guest definitions. -x configuration generator ---- xprint end result xml config to stderr for easier redirection? or print prompts to stderr and xml to stdout? --- xXSD for validation +--- x print end result xml config to stderr for easier redirection? or print prompts to stderr and xml to stdout? +-- x XSD for validation -- Flask app for generating config? -- TKinter (or pygame?) GUI? --- https://docs.python.org/3/faq/gui.html @@ -15,9 +16,12 @@ at the very least document all the functions and such so pydoc's happy. - locking + - for docs, 3.x (as of 3.10) was 2.4M. -- xNeed ability to write/parse mtree specs (or a similar equivalent) for applying ownerships/permissions to overlay files + +- x Need ability to write/parse mtree specs (or a similar equivalent) for applying ownerships/permissions to overlay files -- parsing is done. writing may? come later. +--- i think writing is mostly done/straightforward; still need to work on parsing mode octals for files - package for PyPI: diff --git a/bdisk/GPG.py b/bdisk/GPG.py index ad10df8..b56525b 100644 --- a/bdisk/GPG.py +++ b/bdisk/GPG.py @@ -1,8 +1,71 @@ +import copy import datetime import gpg +import operator import os -import psutil -import gpg.errors +import re +import utils # LOCAL +from functools import reduce +from gpg import gpgme + +# Reference material. +# http://files.au.adversary.org/crypto/GPGMEpythonHOWTOen.html +# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gpgme.git;a=tree;f=lang/python/examples;hb=HEAD +# https://www.gnupg.org/documentation/manuals/gpgme.pdf +# Support ECC? https://www.gnupg.org/faq/whats-new-in-2.1.html#ecc +# section 4.1, 4.2, 7.5.1, 7.5.5 in gpgme manual + +# These are static values. We include them in the parent so we don't define them every time a function is called. +# Key signature attributes. +_keysig_attrs = ('comment', 'email', 'expired', 'expires', 'exportable', 'invalid', 'keyid', 'name', 'notations', + 'pubkey_algo', 'revoked', 'sig_class', 'status', 'timestamp', 'uid') +# Data signature attributes. +_sig_attrs = ('chain_model', 'exp_timestamp', 'fpr', 'hash_algo', 'is_de_vs', 'key', 'notations', 'pka_address', + 'pka_trust', 'pubkey_algo', 'status', 'summary', 'timestamp', 'validity', 'validity_reason', + 'wrong_key_usage') + +# A regex that ignores signature verification validity errors we don't care about. +_valid_ignore = re.compile(('^(' + #'CHECKSUM|' + 'ELEMENT_NOT_FOUND|' + 'MISSING_VALUE|' + #'UNKNOWN_PACKET|' + 'UNSUPPORTED_CMS_OBJ|' + 'WRONG_SECKEY|' + '(' + 'DECRYPT|' + 'INV|' + 'NO|' + 'PIN|' + 'SOURCE' + ')_' + ')')) +# A function to build a list based on the above. +def _gen_valid_validities(): + # Strips out and minimizes the error output. + v = {} + for s in dir(gpg.constants.validity): + if _valid_ignore.search(s): + continue + val = getattr(gpg.constants.validity, s) + if not isinstance(val, int): + continue + v[s] = val + return(v) +_valid_validities = _gen_valid_validities() +def _get_sigstatus(status): + statuses = [] + for e in _valid_validities: + if ((status & _valid_validities[e]) == _valid_validities[e]): + statuses.append(e) + return(statuses) +def _get_sig_isgood(sigstat): + is_good = True + if not ((sigstat & gpg.constants.sigsum.GREEN) == gpg.constants.sigsum.GREEN): + is_good = False + if not ((sigstat & gpg.constants.sigsum.VALID) == gpg.constants.sigsum.VALID): + is_good = False + return(is_good) # This helps translate the input name from the conf to a string compatible with the gpg module. @@ -21,52 +84,94 @@ def _epoch_helper(epoch): return(abs(int(d.total_seconds()))) # Returns a positive integer even if negative... #return(int(d.total_seconds())) -# http://files.au.adversary.org/crypto/GPGMEpythonHOWTOen.html -# https://www.gnupg.org/documentation/manuals/gpgme.pdf -# Support ECC? https://www.gnupg.org/faq/whats-new-in-2.1.html#ecc -# section 4.1, 4.2, 7.5.1, 7.5.5 in gpgme manual -# Please select what kind of key you want: -# (1) RSA and RSA (default) - 1024-4096 bits -# (2) DSA and Elgamal - 768-3072 -# (3) DSA (sign only) - 768-3072 -# (4) RSA (sign only) - 1024-4096 -# (7) DSA (set your own capabilities) - 768-3072 -# (8) RSA (set your own capabilities) - 1024-4096 -# (9) ECC and ECC - (see below) -# (10) ECC (sign only) - (see below) -# (11) ECC (set your own capabilities) - (see below) -# Your selection? 9 -# Please select which elliptic curve you want: -# (2) NIST P-256 -# (3) NIST P-384 -# (4) NIST P-521 -# (5) Brainpool P-256 -# (6) Brainpool P-384 -# (7) Brainpool P-512 -# Your selection? 10 -# Please select which elliptic curve you want: -# (1) Curve 25519 -# (3) NIST P-256 -# (4) NIST P-384 -# (5) NIST P-521 -# (6) Brainpool P-256 -# (7) Brainpool P-384 -# (8) Brainpool P-512 -# (9) secp256k1 -# gpgme key creation: -#g = gpg.Context() -#mainkey = g.create_key('test key via python ', algorithm = 'rsa4096', expires = False, -# #certify = True, -# certify = False, -# sign = False, -# authenticate = False, -# encrypt = False) -#key = g.get_key(mainkey.fpr, secret = True) -#subkey = g.create_subkey(key, algorithm = 'rsa4096', expires = False, -# sign = True, -# #certify = False, -# encrypt = False, -# authenticate = False) +# _KeyEditor and _getEditPrompt are used to interactively edit keys -- notably currently used for editing trusts +# (since there's no way to edit trust otherwise). +# https://www.gnupg.org/documentation/manuals/gpgme/Advanced-Key-Editing.html +# https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py +# https://searchcode.com/codesearch/view/20535820/ +# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS +# You can get the prompt identifiers and status indicators without grokking the source +# by first interactively performing the type of edit(s) you want to do with this command: +# gpg --expert --status-fd 2 --command-fd 2 --edit-key +# Per: +# https://lists.gnupg.org/pipermail/gnupg-users/2002-April/012630.html +# https://lists.gt.net/gnupg/users/9544 +# https://raymii.org/s/articles/GPG_noninteractive_batch_sign_trust_and_send_gnupg_keys.html +class _KeyEditor(object): + def __init__(self, optmap): + self.replied_once = False # This is used to handle the first prompt vs. the last + self.optmap = optmap + + def editKey(self, status, args, out): + result = None + out.seek(0, 0) + def mapDict(m, d): + return(reduce(operator.getitem, m, d)) + if args == 'keyedit.prompt' and self.replied_once: + result = 'quit' + elif status == 'KEY_CONSIDERED': + result = None + self.replied_once = False + elif status == 'GET_LINE': + self.replied_once = True + _ilist = args.split('.') + result = mapDict(_ilist, self.optmap['prompts']) + if not result: + result = None + return(result) + +def _getEditPrompt(key, trust, cmd, uid = None): + if not uid: + uid = key.uids[0] + # This mapping defines the default "answers" to the gpgme key editing. + # https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py + # https://searchcode.com/codesearch/view/20535820/ + # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS + # You can get the prompt identifiers and status indicators without grokking the source + # by first interactively performing the type of edit(s) you want to do with this command: + # gpg --status-fd 2 --command-fd 2 --edit-key + if trust >= gpg.constants.validity.FULL: # For tsigning, it only prompts for two trust levels: + _loctrust = 2 # "I trust fully" + else: + _loctrust = 1 # "I trust marginally" + # TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here. + # "Full" trust is a pretty big thing. + if trust >= gpg.constants.validity.FULL: + _locdepth = 2 # Allow +1 level of trust extension + else: + _locdepth = 1 # Only trust this key + # The check level. + # (0) I will not answer. (default) + # (1) I have not checked at all. + # (2) I have done casual checking. + # (3) I have done very careful checking. + # Since we're running this entirely non-interactively, we really should use 1. + _chk_lvl = 1 + _map = { + # Valid commands + 'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list', + 'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref', + 'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save', + 'quit'], + # Prompts served by the interactive session, and a map of their responses. + # It's expanded in the parent call, but the prompt is actually in the form of e.g.: + # keyedit.save (we expand that to a list and use that list as a "path" in the below dict) + # We *could* just use a flat dict of full prompt to constants, but this is a better visual segregation & + # prevents unnecessary duplication. + 'prompts': { + 'edit_ownertrust': {'value': str(trust), # Pulled at time of call + 'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes + 'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted + 'pklist': {'user_id': {'enter': uid.uid}}, # Prompt for a user ID - can we use the full uid string? (tsign) + 'sign_uid': {'class': str(_chk_lvl), # The certification/"check" level + 'okay': 'yes'}, # Are you sure that you want to sign this key with your key..." + 'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above + 'trust_depth': str(_locdepth), # The "depth" of the trust signature. + 'trust_regexp': None}, # We can "Restrict" trust to certain domains if we wanted. + 'keyedit': {'prompt': cmd, # Initiate trust editing (or whatever) + 'save': {'okay': 'yes'}}}} # Save if prompted + return(_map) + class GPGHandler(object): @@ -79,6 +184,16 @@ class GPGHandler(object): else: self._check_home() self.ctx = self.GetContext(home_dir = self.home) + self._orig_kl_mode = self.ctx.get_keylist_mode() + self.mykey = None + self.subkey = None + if self.key_id: + self.mykey = self.ctx.get_key(self.key_id, secret = True) + for s in self.mykey.subkeys: + if s.can_sign: + self.subkey = s + self.ctx.signers = [self.mykey] + break def _check_home(self, home = None): if not home: @@ -110,30 +225,19 @@ class GPGHandler(object): if not _exists: raise PermissionError('We need a GnuPG home directory we can ' 'write to') + # TODO: write gpg.conf, parse existing one and write changes if needed. + # Should use SHA512 etc. See: + # https://spin.atomicobject.com/2013/11/24/secure-gpg-keys-guide/ + # https://github.com/BetterCrypto/Applied-Crypto-Hardening/blob/master/src/configuration/GPG/GnuPG/gpg.conf + # https://riseup.net/en/security/message-security/openpgp/best-practices + # And explicitly set keyservers if present in params. return() def GetContext(self, **kwargs): ctx = gpg.Context(**kwargs) return(ctx) - def KillStaleAgent(self): - # Is this even necessary since I switched to the native gpg module instead of the gpgme one? - _process_list = [] - # TODO: optimize; can I search by proc name? - for p in psutil.process_iter(): - if (p.name() in ('gpg-agent', 'dirmngr') and \ - p.uids()[0] == os.getuid()): - pd = psutil.Process(p.pid).as_dict() - # TODO: convert these over -# for d in (chrootdir, dlpath): -# if pd['cwd'].startswith('{0}'.format(d)): -# plst.append(p.pid) -# if len(plst) >= 1: -# for p in plst: -# psutil.Process(p).terminate() - def CreateKey(self, name, algo, keysize, email = None, comment = None, passwd = None, key = None, expiry = None): - algo = _algmaps[algo].format(keysize = keysize) userid = name userid += ' ({0})'.format(comment) if comment else '' userid += ' <{0}>'.format(email) if email else '' @@ -141,75 +245,249 @@ class GPGHandler(object): expires = False else: expires = True - self.ctx.create_key(userid, - algorithm = algo, - expires = expires, - expires_in = _epoch_helper(expiry), - sign = True) - # Even if expires is False, it still parses the expiry... - # except OverflowError: # Only trips if expires is True and a negative expires occurred. - # raise ValueError(('Expiration epoch must be 0 (to disable) or a future time! ' - # 'The specified epoch ({0}, {1}) is in the past ' - # '(current time is {2}, {3}).').format(expiry, - # str(datetime.datetime.utcfromtimestamp(expiry)), - # datetime.datetime.utcnow().timestamp(), - # str(datetime.datetime.utcnow()))) - return(k) - # We can't use self.ctx.create_key; it's a little limiting. - # It's a fairly thin wrapper to .op_createkey() (the C GPGME API gpgme_op_createkey) anyways. - flags = (gpg.constants.create.SIGN | - gpg.constants.create.CERT) - if not expiry: - flags = (flags | gpg.constants.create.NOEXPIRE) - if not passwd: - flags = (flags | gpg.constants.create.NOPASSWD) + params = {'algorithm': _algmaps[algo].format(keysize = keysize), + 'expires': expires, + 'expires_in': (_epoch_helper(expiry) if expires else 0), + 'sign': True, + 'passphrase': passwd} + if not key: + self.mykey = self.ctx.get_key(self.ctx.create_key(userid, **params).fpr) + self.subkey = self.mykey.subkeys[0] else: - # Thanks, gpg/core.py#Context.create_key()! - sys_pinentry = gpg.constants.PINENTRY_MODE_DEFAULT - old_pass_cb = getattr(self, '_passphrase_cb', None) - self.ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK - def passphrase_cb(hint, desc, prev_bad, hook = None): - return(passwd) - self.ctx.set_passphrase_cb(passphrase_cb) - try: - if not key: - try: - self.ctx.op_createkey(userid, algo, 0, 0, flags) - k = self.ctx.get_key(self.ctx.op_genkey_result().fpr, secret = True) - else: - if not isinstance(key, gpg.gpgme._gpgme_key): - key = self.ctx.get_key(key) - if not key: - raise ValueError('Key {0} does not exist'.format()) - #self.ctx.op_createsubkey(key, ) - finally: - if not passwd: - self.ctx.pinentry_mode = sys_pinentry - if old_pass_cb: - self.ctx.set_passphrase_cb(*old_pass_cb[1:]) - return(k) + if not self.mykey: + self.mykey = self.ctx.get_key(self.ctx.create_key(userid, **params).fpr) + self.subkey = self.ctx.get_key(self.ctx.create_subkey(self.mykey, **params).fpr) + self.ctx.signers = [self.subkey] + return() - def GetSigs(self, data_in): + def ListSigs(self, sig_data): key_ids = [] # Currently as of May 13, 2018 there's no way using the GPGME API to do - # the equivalent of the CLI's --list-packets. - # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/ - # 059708.html - # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/ - # 059715.html - # We use the "workaround in: - # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/ - # 059711.html + # the equivalent of the CLI's --list-packets. https://dev.gnupg.org/T3734 + # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059708.html + # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059715.html + # We use the "workaround" in: + # https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059711.html try: - self.ctx.verify(data_in) + self.ctx.verify(sig_data) except gpg.errors.BadSignatures as sig_except: for line in [i.strip() for i in str(sig_except).splitlines()]: l = [i.strip() for i in line.split(':')] key_ids.append(l[0]) return(key_ids) - def CheckSigs(self, keys, sig_data): + def GetSigs(self, data_in, sig_data = None, verify_keys = None): + signers = [] + if verify_keys: + # Raises gpg.errors.BadSignatures if any are invalid. + # Unlike Verify below, this will raise an exception. + signers = verify_keys + if sig_data: + # Detached sig + sig = self.ctx.verify(data_in, signature = sig_data, verify = signers) + else: + # Cleartext? or "normal" signatures (embedded) + sig = self.ctx.verify(data_in, verify = signers) + return(sig) + + def GetKeysigs(self, pubkey): + sigs = {} + fpr = (pubkey if isinstance(pubkey, str) else pubkey.fpr) + keys = list(self.ctx.keylist(fpr, mode = (gpg.constants.keylist.mode.LOCAL | gpg.constants.keylist.mode.SIGS))) + for idx1, k in enumerate(keys): + sigs[k.fpr] = {} + for idx2, u in enumerate(k.uids): + sigs[k.fpr][u.uid] = {} + for idx3, sig in enumerate(u.signatures): + signer = getattr(sig, 'keyid') + sigs[k.fpr][u.uid][signer] = {} + for a in _keysig_attrs: + if a == 'keyid': + continue + sigs[k.fpr][u.uid][signer][a] = getattr(sig, a) + return(sigs) + + def CheckSigs(self, sig, sigkeys = None): + # sig should be a GetSigs result. + is_valid = True + # See self.CheckSigs(). + # https://www.gnupg.org/documentation/manuals/gpgme/Verify.html + # https://github.com/micahflee/torbrowser-launcher/issues/262#issuecomment-284342876 + sig = sig[1] + result = {} + _keys = [s.fpr.upper() for s in sig.signatures] + if sigkeys: + if isinstance(sigkeys, str): + sigkeys = [sigkeys.upper()] + elif isinstance(sigkeys, list): + _sigkeys = [] + for s in sigkeys[:]: + if isinstance(s, str): + _sigkeys.append(s.upper()) + elif isinstance(s, gpgme._gpgme_key): + _sigkeys.append(s.fpr) + else: + continue + sigkeys = _sigkeys + elif isinstance(sigkeys, gpgme._gpgme_key): + sigkeys = [sigkeys.fpr] + else: + raise ValueError('sigkeys must be a key fingerprint or a key object (or a list of those).') + if not set(sigkeys).issubset(_keys): + raise ValueError('All specified keys were not present in the signature.') + for s in sig.signatures: + fpr = getattr(s, 'fpr') + result[fpr] = {} + for a in _sig_attrs: + if a == 'fpr': + continue + result[fpr][a] = getattr(s, a) + # Now we do some logic to determine if the sig is "valid". + # Note that we can get confidence level by &'ing "validity" attr against gpg.constants.validity.* + # Or just doing a <, >, <=, etc. operation since it's a sequential list of constants levels, not bitwise. + # For now, we just check if it's valid or not, not "how valid" it is (how much we can trust it). + _status = s.summary + if not _get_sig_isgood(_status): + result[fpr]['valid'] = False + else: + result[fpr]['valid'] = True + if sigkeys: + for k in sigkeys: + if (k not in result) or (not result[k]['valid']): + is_valid = False + break + else: # is_valid is satisfied by at LEAST one valid sig. + is_valid = any([k[1]['valid'] for k in result]) + return(is_valid, result) + + def Sign(self, data_in, ascii = True, mode = 'detached', notations = None): + # notations is a list of dicts via notation format: + # {: {'value': 'some string', 'flags': BITWISE_OR_FLAGS}} + # See RFC 4880 § 5.2.3.16 for valid user namespace format. + if mode.startswith('d'): + mode = gpg.constants.SIG_MODE_DETACH + elif mode.startswith('c'): + mode = gpg.constants.SIG_MODE_CLEAR + elif mode.startswith('n'): + mode = gpg.constants.SIG_MODE_NORMAL + self.ctx.armor = ascii + if not isinstance(data_in, bytes): + if isinstance(data_in, str): + data_in = data_in.encode('utf-8') + else: + # We COULD try serializing to JSON here, or converting to a pickle object, + # or testing for other classes, etc. But we don't. + # TODO? + data_in = repr(data_in).encode('utf-8') + data_in = gpg.Data(data_in) + if notations: + for n in notations: + if not utils.valid().gpgsigNotation(n): + raise ValueError('Malformatted notation: {0}'.format(n)) + for ns in n: + self.ctx.sig_notation_add(ns, n[ns]['value'], n[ns]['flags']) + # data_in *always* must be a bytes (or bytes-like?) object. + # It will *always* return a bytes object. + sig = self.ctx.sign(data_in, mode = mode) + # And we need to clear the sig notations, otherwise they'll apply to the next signature this context makes. + self.ctx.sig_notation_clear() + return(sig) + + def ImportPubkey(self, pubkey): + fpr = (pubkey if isinstance(pubkey, str) else pubkey.fpr) try: - self.ctx.verify(sig_data) - except: - pass # TODO + self.ctx.get_key(fpr) + return() # already imported + except gpg.errors.KeyNotFound: + pass + _dflt_klm = self.ctx.get_keylist_mode() + self.ctx.set_keylist_mode(gpg.constants.keylist.mode.EXTERN) + if isinstance(pubkey, gpgme._gpgme_key): + self.ctx.op_import_keys([pubkey]) + elif isinstance(pubkey, str): + if not utils.valid().gpgkeyID(pubkey): + raise ValueError('{0} is not a valid key or fingerprint'.format(pubkey)) + pubkey = self.ctx.get_key(fpr) + self.ctx.op_import_keys([pubkey]) + self.ctx.set_keylist_mode(_dflt_klm) + self.SignKey(pubkey) + return() + + def ImportPubkeyFromFile(self, pubkey_data): + _fpath = os.path.abspath(os.path.expanduser(pubkey_data)) + if os.path.isfile(_fpath): + with open(_fpath, 'rb') as f: + k = self.ctx.key_import(f.read()) + else: + k = self.ctx.key_import(pubkey_data) + pubkey = self.ctx.get_key(k) + self.SignKey(pubkey) + return() + + def SignKey(self, pubkey, local = False, notations = None): + # notations is a list of dicts via notation format: + # {: {'value': 'some string', 'flags': BITWISE_OR_FLAGS}} + # See RFC 4880 § 5.2.3.16 for valid user namespace format. + if isinstance(pubkey, gpgme._gpgme_key): + pass + elif isinstance(pubkey, str): + if not utils.valid().gpgkeyID(pubkey): + raise ValueError('{0} is not a valid fingerprint'.format(pubkey)) + else: + pubkey = self.ctx.get_key(pubkey) + if notations: + for n in notations: + if not utils.valid().gpgsigNotation(n): + raise ValueError('Malformatted notation: {0}'.format(n)) + for ns in n: + self.ctx.sig_notation_add(ns, n[ns]['value'], n[ns]['flags']) + self.ctx.key_sign(pubkey, local = local) + self.TrustKey(pubkey) + # And we need to clear the sig notations, otherwise they'll apply to the next signature this context makes. + self.ctx.sig_notation_clear() + return() + + def TrustKey(self, pubkey, trust = gpg.constants.validity.FULL): + # We use full as the default because signatures aren't considered valid otherwise. + # TODO: we need a way of maybe reverting/rolling back any changes we do? + output = gpg.Data() + _map = _getEditPrompt(pubkey, trust, 'trust') + self.ctx.interact(pubkey, _KeyEditor(_map).editKey, sink = output, fnc_value = output) + output.seek(0, 0) + return() + + def ExportPubkey(self, fpr, ascii = True, sigs = False): + orig_armor = self.ctx.armor + self.ctx.armor = ascii + if sigs: + export_mode = 0 + else: + export_mode = gpg.constants.EXPORT_MODE_MINIMAL # default is 0; minimal strips signatures + kb = gpg.Data() + self.ctx.op_export_keys([self.ctx.get_key(fpr)], export_mode, kb) + kb.seek(0, 0) + self.ctx.armor = orig_armor + return(kb.read()) + + def DeleteKey(self, pubkey): + if isinstance(pubkey, gpgme._gpgme_key): + pass + elif isinstance(pubkey, str): + if not utils.valid().gpgkeyID(pubkey): + raise ValueError('{0} is not a valid fingerprint'.format(pubkey)) + else: + pubkey = self.ctx.get_key(pubkey) + self.ctx.op_delete(pubkey, False) + return() + + def Verify(self, sig_data, data): + # This is a more "flat" version of CheckSigs. + # First we need to parse the sig(s) and import the key(s) to our keyring. + signers = self.ListSigs(sig_data) + for signer in signers: + self.ImportPubkey(signer) + try: + self.ctx.verify(data, signature = sig_data, verify = signers) + return(True) + except gpg.errors.BadSignatures as err: + return(False) diff --git a/bdisk/confgen.py b/bdisk/confgen.py index a321ed4..a8138b8 100755 --- a/bdisk/confgen.py +++ b/bdisk/confgen.py @@ -3,13 +3,14 @@ # Ironically enough, I think building a GUI for this would be *cleaner*. # Go figure. -import confparse import datetime import getpass import os -import utils import uuid import lxml.etree +import utils # LOCAL +import confparse # LOCAL + detect = utils.detect() generate = utils.generate() diff --git a/bdisk/confparse.py b/bdisk/confparse.py index 56abec9..c7cfcae 100644 --- a/bdisk/confparse.py +++ b/bdisk/confparse.py @@ -2,9 +2,10 @@ import copy import os import pprint import re -import utils import lxml.etree from urllib.parse import urlparse +import utils # LOCAL + etree = lxml.etree detect = utils.detect() @@ -125,6 +126,7 @@ class Conf(object): ptrn = _item.format(**self.xml_suppl.btags['regex']) else: ptrn = None + # TODO: remove all this shit! we switch to just a mirror url. _source_item['fname'] = detect.remote_files( '/'.join((_source['mirror'], _source['rootpath'])), @@ -182,7 +184,7 @@ class Conf(object): self.cfg['build']['optimize'] = transform.xml2py(_optimize) for path in build.xpath('./paths/*'): self.cfg['build']['paths'][path.tag] = path.text - self.cfg['build']['basedistro'] = build.get('basedistro', 'archlinux') + self.cfg['build']['guests'] = build.get('guests', 'archlinux') # iso and ipxe are their own basic profile elements, but we group them # in here because 1.) they're related, and 2.) they're simple to # import. This may change in the future if they become more complex. diff --git a/bdisk/download.py b/bdisk/download.py new file mode 100644 index 0000000..280db5d --- /dev/null +++ b/bdisk/download.py @@ -0,0 +1,48 @@ +import requests + + +class Download(object): + def __init__(self, url, progress = True, offset = None, chunksize = 1024): + self.cnt_len = None + self.head = requests.head(url, allow_redirects = True).headers + self.req_headers = {} + self.range = False + self.url = url + self.offset = offset + self.chunksize = chunksize + self.progress = progress + if 'accept-ranges' in self.head: + if self.head['accept-ranges'].lower() != 'none': + self.range = True + if 'content-length' in self.head: + try: + self.cnt_len = int(self.head['content-length']) + except TypeError: + pass + if self.cnt_len and self.offset and self.range: + if not self.offset <= self.cnt_len: + raise ValueError(('The offset requested ({0}) is greater than ' + 'the content-length value').format(self.offset, self.cnt_len)) + self.req_headers['range'] = 'bytes={0}-'.format(self.offset) + + def fetch(self): + if not self.progress: + self.req = requests.get(self.url, allow_redirects = True, headers = self.req_headers) + self.bytes_obj = self.req.content + else: + self.req = requests.get(self.url, allow_redirects = True, stream = True, headers = self.req_headers) + self.bytes_obj = bytes() + _bytelen = 0 + # TODO: better handling for logging instead of print()s? + for chunk in self.req.iter_content(chunk_size = self.chunksize): + self.bytes_obj += chunk + if self.cnt_len: + print('\033[F') + print('{0:.2f}'.format((_bytelen / float(self.head['content-length'])) * 100), + end = '%', + flush = True) + _bytelen += self.chunksize + else: + print('.', end = '') + print() + return(self.bytes_obj) diff --git a/bdisk/env_prep.py b/bdisk/env_prep.py index 002db3e..b034031 100644 --- a/bdisk/env_prep.py +++ b/bdisk/env_prep.py @@ -1,14 +1,14 @@ import hashlib import importlib # needed for the guest-os-specific stuff... import os -from . import utils +import download # LOCAL from urllib.parse import urljoin def hashsum_downloader(url, filename = None): # TODO: support "latest" and "regex" flags? or remove from specs (since the tarball can be specified by these)? - # move that to the utils.DOwnload() class? - d = utils.Download(url, progress = False) + # move that to the download.Download() class? + d = download.Download(url, progress = False) hashes = {os.path.basename(k):v for (v, k) in [line.split() for line in d.fetch().decode('utf-8').splitlines()]} if filename: if filename in hashes: @@ -19,19 +19,26 @@ def hashsum_downloader(url, filename = None): class Prepper(object): - def __init__(self, dirs, sources, gpg = None): - # dirs is a ConfParse.cfg['build']['paths'] dict of dirs - self.CreateDirs(dirs) - # TODO: set up GPG env here so we can use it to import sig key and verify sources - for idx, s in enumerate(sources): + # Prepare sources, destinations, etc. + def __init__(self, cfg): + self.cfg = cfg + self.CreateDirs(self.cfg['build']['paths']) + if 'handler' not in self.cfg['gpg'] or not self.cfg['gpg']['handler']: + if self.cfg['gpg']['gnupghome']: + os.environ['GNUPGHOME'] = self.cfg['gpg']['gnupghome'] + from . import GPG + self.cfg['gpg']['handler'] = GPG.GPGHandler(gnupg_homedir = self.cfg['gpg']['gnupghome'], + key_id = self.cfg['gpg']['keyid']) + self.gpg = self.cfg['gpg']['handler'] + for idx, s in enumerate(self.cfg['sources']): self._download(idx) def CreateDirs(self, dirs): for d in dirs: os.makedirs(d, exist_ok = True) + os.chmod(d, 0o700) return() - def _download(self, source_idx): download = True _source = self.cfg['sources'][source_idx] @@ -58,10 +65,12 @@ class Prepper(object): if _hash.hexdigest().lower() != _source['checksum']['value'].lower(): return(False) return(True) - def _sig_verify(gpg_instance): # TODO: move to utils.valid()? or just use as part of the bdisk.GPG module? - pass + def _sig_verify(): # TODO: move to utils.valid()? + if 'sig' in _source: + pass + return(True) if os.path.isfile(_tarball): download = _hash_verify() download = _sig_verify() if download: - d = utils.Download(_remote_tarball) + d = download.Download(_remote_tarball) diff --git a/bdisk/basedistro/antergos.py b/bdisk/guests/antergos.py similarity index 100% rename from bdisk/basedistro/antergos.py rename to bdisk/guests/antergos.py diff --git a/bdisk/basedistro/arch.py b/bdisk/guests/arch.py similarity index 100% rename from bdisk/basedistro/arch.py rename to bdisk/guests/arch.py diff --git a/bdisk/basedistro/archlinux.py b/bdisk/guests/archlinux.py similarity index 64% rename from bdisk/basedistro/archlinux.py rename to bdisk/guests/archlinux.py index c57f006..4c92d82 100644 --- a/bdisk/basedistro/archlinux.py +++ b/bdisk/guests/archlinux.py @@ -1,14 +1,47 @@ #!/usr/bin/env python3 -# Supported initsys values: -# systemd -# Possible future inclusions: -# openrc -# runit -# sinit -# s6 -# shepherd -initsys = 'systemd' +from .. import download # LOCAL # do i need to escalate two levels up? +import os +from .. import utils + +# TODO: can this be trimmed down? +prereqs = ['arch-install-scripts', 'archiso', 'bzip2', 'coreutils', 'customizepkg-scripting', 'cronie', 'dhclient', + 'dhcp', 'dhcpcd', 'dosfstools', 'dropbear', 'efibootmgr', 'efitools', 'efivar', 'file', 'findutils', + 'iproute2', 'iputils', 'libisoburn', 'localepurge', 'lz4', 'lzo', 'lzop', 'mkinitcpio-nbd', + 'mkinitcpio-nfs-utils', 'mkinitcpio-utils', 'nbd', 'ms-sys', 'mtools', 'net-tools', 'netctl', + 'networkmanager', 'pv', 'python', 'python-pyroute2', 'rsync', 'sed', 'shorewall', 'squashfs-tools', + 'sudo', 'sysfsutils', 'syslinux', 'traceroute', 'vi'] + +class Manifest(object): + def __init__(self, cfg): + self.cfg = cfg + self.name = 'archlinux' + self.version = None # rolling release + self.release = None # rolling release + # https://www.archlinux.org/master-keys/ + # Pierre Schmitz. https://www.archlinux.org/people/developers/#pierre + self.gpg_authorities = ['4AA4767BBC9C4B1D18AE28B77F2D434B9741E8AC'] + self.tarball = None + self.sig = None + self.checksum = {'sha1': None, + 'md5': None} + self._get_filename() + + def _get_filename(self): + # TODO: cache this info + webroot = 'iso/latest' + for m in self.cfg['mirrors']: + uri = os.path.join(m, webroot) + try: + self.tarball = utils.detect().remote_files(uri, ptrn = ('archlinux-' + 'bootstrap-' + '[0-9]{4}\.' + '[0-9]{2}\.' + '[0-9]{2}-' + 'x86_64\.tar\.gz$'))[0] + except Exception as e: + pass + def extern_prep(cfg, cur_arch = 'x86_64'): import os diff --git a/bdisk/basedistro/manjaro.py b/bdisk/guests/manjaro.py similarity index 100% rename from bdisk/basedistro/manjaro.py rename to bdisk/guests/manjaro.py diff --git a/bdisk/iPXE.py b/bdisk/iPXE.py index fc1c561..7c3fe82 100644 --- a/bdisk/iPXE.py +++ b/bdisk/iPXE.py @@ -1 +1 @@ -import GIT +import GIT # LOCAL diff --git a/bdisk/main.py b/bdisk/main.py index 9a54f67..1fa7f4c 100644 --- a/bdisk/main.py +++ b/bdisk/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 import argparse -import confparse +import confparse # LOCAL """The primary user interface for BDisk. If we are running interactively, parse arguments first, then initiate a BDisk session.""" diff --git a/bdisk/utils.py b/bdisk/utils.py index 5280ee1..d4cb306 100644 --- a/bdisk/utils.py +++ b/bdisk/utils.py @@ -3,29 +3,29 @@ import _io import copy import crypt -import GPG +import GPG # LOCAL import getpass import hashid import hashlib import iso3166 import os import pprint -import prompt_strings +import prompt_strings # LOCAL import re import string import uuid import validators import zlib -import requests import lxml.etree import lxml.objectify from bs4 import BeautifulSoup from collections import OrderedDict from dns import resolver +from download import Download # LOCAL from email.utils import parseaddr as emailparse from passlib.context import CryptContext as cryptctx from urllib.parse import urlparse -from urllib.request import urlopen + # Supported by all versions of GNU/Linux shadow passlib_schemes = ['des_crypt', 'md5_crypt', 'sha256_crypt', 'sha512_crypt'] @@ -43,53 +43,6 @@ crypt_map = {'sha512': crypt.METHOD_SHA512, 'des': crypt.METHOD_CRYPT} -class Download(object): - def __init__(self, url, progress = True, offset = None, chunksize = 1024): - self.cnt_len = None - self.head = requests.head(url, allow_redirects = True).headers - self.req_headers = {} - self.range = False - self.url = url - self.offset = offset - self.chunksize = chunksize - self.progress = progress - if 'accept-ranges' in self.head: - if self.head['accept-ranmges'].lower() != 'none': - self.range = True - if 'content-length' in self.head: - try: - self.cnt_len = int(self.head['content-length']) - except TypeError: - pass - if self.cnt_len and self.offset and self.range: - if not self.offset <= self.cnt_len: - raise ValueError(('The offset requested ({0}) is greater than ' - 'the content-length value').format(self.offset, self.cnt_len)) - self.req_headers['range'] = 'bytes={0}-'.format(self.offset) - - def fetch(self): - if not self.progress: - self.req = requests.get(self.url, allow_redirects = True, headers = self.req_headers) - self.bytes_obj = self.req.content - else: - self.req = requests.get(self.url, allow_redirects = True, stream = True, headers = self.req_headers) - self.bytes_obj = bytes() - _bytelen = 0 - # TODO: better handling for logging instead of print()s? - for chunk in self.req.iter_content(chunk_size = self.chunksize): - self.bytes_obj += chunk - if self.cnt_len: - print('\033[F') - print('{0:.2f}'.format((_bytelen / float(self.head['content-length'])) * 100), - end = '%', - flush = True) - _bytelen += self.chunksize - else: - print('.', end = '') - print() - return(self.bytes_obj) - - class XPathFmt(string.Formatter): def get_field(self, field_name, args, kwargs): vals = self.get_value(field_name, args, kwargs), field_name @@ -159,15 +112,15 @@ class detect(object): # But we CAN sort by filename. if 'latest' in flags: urls = sorted(list(set(urls))) - urls = urls[-1] - else: - urls = urls[0] + # urls = urls[-1] + # else: + # urls = urls[0] return(urls) def gpgkeyID_from_url(self, url): data = Download(url, progress = False).bytes_obj g = GPG.GPGHandler() - key_ids = g.get_sigs(data) + key_ids = g.GetSigs(data) del(g) return(key_ids) @@ -758,17 +711,17 @@ class valid(object): return(False) return(True) - def dns(self, addr): - pass + def dns(self, record): + return (not isinstance(validators.domain(record)), validators.utils.ValidationFailure) def connection(self, conninfo): # conninfo should ideally be (host, port) - pass + pass # TODO def email(self, addr): - return( + return ( not isinstance(validators.email(emailparse(addr)[1]), - validators.utils.ValidationFailure)) + validators.utils.ValidationFailure)) def gpgkeyID(self, key_id): # Condense fingerprints into normalized 40-char "full" key IDs. @@ -783,6 +736,33 @@ class valid(object): return(False) return(True) + def gpgsigNotation(self, notations): + # RFC 4880 § 5.2.3.16 + # A valid notation fmt: {'name@domain.tld': {'value': 'some str', 'flags': 1}} + if not isinstance(notations, dict): + return(False) + for n in notations: + # namespace + s = n.split('@') + if not len(s) != 2: + return(False) # IETF namespaces not supported by GPGME? + dom = s[1] + if not self.dns(dom): + return(False) + # flags + # TODO: is there a better way to do this? basically confirm a value is a valid bitmask? + flags = sorted([const for const in vars(GPG.gpg.constants.sig.notation).values() if isinstance(const, int)]) + if not isinstance(n['flags'], int): + return(False) + if not n['flags'] >= flags[0]: # at LEAST the lowest flag + return(False) + if not n['flags'] <= sum(flags): # at MOST the highest sum of all flags + return(False) + # values + if not isinstance(n['value'], str): # per RFC, non-text data for values currently is not supported + return(False) + return(True) + def integer(self, num): try: int(num)