checking in some stuff

This commit is contained in:
brent s. 2018-11-08 03:55:53 -05:00
parent 69b6ec60d0
commit a376bea0e9
13 changed files with 572 additions and 217 deletions

14
TODO
View File

@ -1,11 +1,12 @@
- write classes/functions
- XML-based config
-x XML syntax
--- xregex btags - case-insensitive? this can be represented in-pattern:
xhttps://stackoverflow.com/a/9655186/733214
--- x regex btags - case-insensitive? this can be represented in-pattern:
x https://stackoverflow.com/a/9655186/733214
--- remove sources stuff - that should be in the guest definitions.
-x configuration generator
--- xprint end result xml config to stderr for easier redirection? or print prompts to stderr and xml to stdout?
-- xXSD for validation
--- x print end result xml config to stderr for easier redirection? or print prompts to stderr and xml to stdout?
-- x XSD for validation
-- Flask app for generating config?
-- TKinter (or pygame?) GUI?
--- https://docs.python.org/3/faq/gui.html
@ -15,9 +16,12 @@
at the very least document all the functions and such so pydoc's happy.

- locking

- for docs, 3.x (as of 3.10) was 2.4M.
- xNeed ability to write/parse mtree specs (or a similar equivalent) for applying ownerships/permissions to overlay files

- x Need ability to write/parse mtree specs (or a similar equivalent) for applying ownerships/permissions to overlay files
-- parsing is done. writing may? come later.
--- i think writing is mostly done/straightforward; still need to work on parsing mode octals for files


- package for PyPI:

View File

@ -1,8 +1,71 @@
import copy
import datetime
import gpg
import operator
import os
import psutil
import gpg.errors
import re
import utils # LOCAL
from functools import reduce
from gpg import gpgme

# Reference material.
# http://files.au.adversary.org/crypto/GPGMEpythonHOWTOen.html
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gpgme.git;a=tree;f=lang/python/examples;hb=HEAD
# https://www.gnupg.org/documentation/manuals/gpgme.pdf
# Support ECC? https://www.gnupg.org/faq/whats-new-in-2.1.html#ecc
# section 4.1, 4.2, 7.5.1, 7.5.5 in gpgme manual

# These are static values. We include them in the parent so we don't define them every time a function is called.
# Key signature attributes.
_keysig_attrs = ('comment', 'email', 'expired', 'expires', 'exportable', 'invalid', 'keyid', 'name', 'notations',
'pubkey_algo', 'revoked', 'sig_class', 'status', 'timestamp', 'uid')
# Data signature attributes.
_sig_attrs = ('chain_model', 'exp_timestamp', 'fpr', 'hash_algo', 'is_de_vs', 'key', 'notations', 'pka_address',
'pka_trust', 'pubkey_algo', 'status', 'summary', 'timestamp', 'validity', 'validity_reason',
'wrong_key_usage')

# A regex that ignores signature verification validity errors we don't care about.
_valid_ignore = re.compile(('^('
#'CHECKSUM|'
'ELEMENT_NOT_FOUND|'
'MISSING_VALUE|'
#'UNKNOWN_PACKET|'
'UNSUPPORTED_CMS_OBJ|'
'WRONG_SECKEY|'
'('
'DECRYPT|'
'INV|'
'NO|'
'PIN|'
'SOURCE'
')_'
')'))
# A function to build a list based on the above.
def _gen_valid_validities():
# Strips out and minimizes the error output.
v = {}
for s in dir(gpg.constants.validity):
if _valid_ignore.search(s):
continue
val = getattr(gpg.constants.validity, s)
if not isinstance(val, int):
continue
v[s] = val
return(v)
_valid_validities = _gen_valid_validities()
def _get_sigstatus(status):
statuses = []
for e in _valid_validities:
if ((status & _valid_validities[e]) == _valid_validities[e]):
statuses.append(e)
return(statuses)
def _get_sig_isgood(sigstat):
is_good = True
if not ((sigstat & gpg.constants.sigsum.GREEN) == gpg.constants.sigsum.GREEN):
is_good = False
if not ((sigstat & gpg.constants.sigsum.VALID) == gpg.constants.sigsum.VALID):
is_good = False
return(is_good)


# This helps translate the input name from the conf to a string compatible with the gpg module.
@ -21,52 +84,94 @@ def _epoch_helper(epoch):
return(abs(int(d.total_seconds()))) # Returns a positive integer even if negative...
#return(int(d.total_seconds()))

# http://files.au.adversary.org/crypto/GPGMEpythonHOWTOen.html
# https://www.gnupg.org/documentation/manuals/gpgme.pdf
# Support ECC? https://www.gnupg.org/faq/whats-new-in-2.1.html#ecc
# section 4.1, 4.2, 7.5.1, 7.5.5 in gpgme manual
# Please select what kind of key you want:
# (1) RSA and RSA (default) - 1024-4096 bits
# (2) DSA and Elgamal - 768-3072
# (3) DSA (sign only) - 768-3072
# (4) RSA (sign only) - 1024-4096
# (7) DSA (set your own capabilities) - 768-3072
# (8) RSA (set your own capabilities) - 1024-4096
# (9) ECC and ECC - (see below)
# (10) ECC (sign only) - (see below)
# (11) ECC (set your own capabilities) - (see below)
# Your selection? 9
# Please select which elliptic curve you want:
# (2) NIST P-256
# (3) NIST P-384
# (4) NIST P-521
# (5) Brainpool P-256
# (6) Brainpool P-384
# (7) Brainpool P-512
# Your selection? 10
# Please select which elliptic curve you want:
# (1) Curve 25519
# (3) NIST P-256
# (4) NIST P-384
# (5) NIST P-521
# (6) Brainpool P-256
# (7) Brainpool P-384
# (8) Brainpool P-512
# (9) secp256k1
# gpgme key creation:
#g = gpg.Context()
#mainkey = g.create_key('test key via python <test2@test.com>', algorithm = 'rsa4096', expires = False,
# #certify = True,
# certify = False,
# sign = False,
# authenticate = False,
# encrypt = False)
#key = g.get_key(mainkey.fpr, secret = True)
#subkey = g.create_subkey(key, algorithm = 'rsa4096', expires = False,
# sign = True,
# #certify = False,
# encrypt = False,
# authenticate = False)
# _KeyEditor and _getEditPrompt are used to interactively edit keys -- notably currently used for editing trusts
# (since there's no way to edit trust otherwise).
# https://www.gnupg.org/documentation/manuals/gpgme/Advanced-Key-Editing.html
# https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py
# https://searchcode.com/codesearch/view/20535820/
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS
# You can get the prompt identifiers and status indicators without grokking the source
# by first interactively performing the type of edit(s) you want to do with this command:
# gpg --expert --status-fd 2 --command-fd 2 --edit-key <KEY_ID>
# Per:
# https://lists.gnupg.org/pipermail/gnupg-users/2002-April/012630.html
# https://lists.gt.net/gnupg/users/9544
# https://raymii.org/s/articles/GPG_noninteractive_batch_sign_trust_and_send_gnupg_keys.html
class _KeyEditor(object):
def __init__(self, optmap):
self.replied_once = False # This is used to handle the first prompt vs. the last
self.optmap = optmap

def editKey(self, status, args, out):
result = None
out.seek(0, 0)
def mapDict(m, d):
return(reduce(operator.getitem, m, d))
if args == 'keyedit.prompt' and self.replied_once:
result = 'quit'
elif status == 'KEY_CONSIDERED':
result = None
self.replied_once = False
elif status == 'GET_LINE':
self.replied_once = True
_ilist = args.split('.')
result = mapDict(_ilist, self.optmap['prompts'])
if not result:
result = None
return(result)

def _getEditPrompt(key, trust, cmd, uid = None):
if not uid:
uid = key.uids[0]
# This mapping defines the default "answers" to the gpgme key editing.
# https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py
# https://searchcode.com/codesearch/view/20535820/
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS
# You can get the prompt identifiers and status indicators without grokking the source
# by first interactively performing the type of edit(s) you want to do with this command:
# gpg --status-fd 2 --command-fd 2 --edit-key <KEY_ID>
if trust >= gpg.constants.validity.FULL: # For tsigning, it only prompts for two trust levels:
_loctrust = 2 # "I trust fully"
else:
_loctrust = 1 # "I trust marginally"
# TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here.
# "Full" trust is a pretty big thing.
if trust >= gpg.constants.validity.FULL:
_locdepth = 2 # Allow +1 level of trust extension
else:
_locdepth = 1 # Only trust this key
# The check level.
# (0) I will not answer. (default)
# (1) I have not checked at all.
# (2) I have done casual checking.
# (3) I have done very careful checking.
# Since we're running this entirely non-interactively, we really should use 1.
_chk_lvl = 1
_map = {
# Valid commands
'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list',
'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref',
'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save',
'quit'],
# Prompts served by the interactive session, and a map of their responses.
# It's expanded in the parent call, but the prompt is actually in the form of e.g.:
# keyedit.save (we expand that to a list and use that list as a "path" in the below dict)
# We *could* just use a flat dict of full prompt to constants, but this is a better visual segregation &
# prevents unnecessary duplication.
'prompts': {
'edit_ownertrust': {'value': str(trust), # Pulled at time of call
'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes
'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted
'pklist': {'user_id': {'enter': uid.uid}}, # Prompt for a user ID - can we use the full uid string? (tsign)
'sign_uid': {'class': str(_chk_lvl), # The certification/"check" level
'okay': 'yes'}, # Are you sure that you want to sign this key with your key..."
'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above
'trust_depth': str(_locdepth), # The "depth" of the trust signature.
'trust_regexp': None}, # We can "Restrict" trust to certain domains if we wanted.
'keyedit': {'prompt': cmd, # Initiate trust editing (or whatever)
'save': {'okay': 'yes'}}}} # Save if prompted
return(_map)



class GPGHandler(object):
@ -79,6 +184,16 @@ class GPGHandler(object):
else:
self._check_home()
self.ctx = self.GetContext(home_dir = self.home)
self._orig_kl_mode = self.ctx.get_keylist_mode()
self.mykey = None
self.subkey = None
if self.key_id:
self.mykey = self.ctx.get_key(self.key_id, secret = True)
for s in self.mykey.subkeys:
if s.can_sign:
self.subkey = s
self.ctx.signers = [self.mykey]
break

def _check_home(self, home = None):
if not home:
@ -110,30 +225,19 @@ class GPGHandler(object):
if not _exists:
raise PermissionError('We need a GnuPG home directory we can '
'write to')
# TODO: write gpg.conf, parse existing one and write changes if needed.
# Should use SHA512 etc. See:
# https://spin.atomicobject.com/2013/11/24/secure-gpg-keys-guide/
# https://github.com/BetterCrypto/Applied-Crypto-Hardening/blob/master/src/configuration/GPG/GnuPG/gpg.conf
# https://riseup.net/en/security/message-security/openpgp/best-practices
# And explicitly set keyservers if present in params.
return()

def GetContext(self, **kwargs):
ctx = gpg.Context(**kwargs)
return(ctx)

def KillStaleAgent(self):
# Is this even necessary since I switched to the native gpg module instead of the gpgme one?
_process_list = []
# TODO: optimize; can I search by proc name?
for p in psutil.process_iter():
if (p.name() in ('gpg-agent', 'dirmngr') and \
p.uids()[0] == os.getuid()):
pd = psutil.Process(p.pid).as_dict()
# TODO: convert these over
# for d in (chrootdir, dlpath):
# if pd['cwd'].startswith('{0}'.format(d)):
# plst.append(p.pid)
# if len(plst) >= 1:
# for p in plst:
# psutil.Process(p).terminate()

def CreateKey(self, name, algo, keysize, email = None, comment = None, passwd = None, key = None, expiry = None):
algo = _algmaps[algo].format(keysize = keysize)
userid = name
userid += ' ({0})'.format(comment) if comment else ''
userid += ' <{0}>'.format(email) if email else ''
@ -141,75 +245,249 @@ class GPGHandler(object):
expires = False
else:
expires = True
self.ctx.create_key(userid,
algorithm = algo,
expires = expires,
expires_in = _epoch_helper(expiry),
sign = True)
# Even if expires is False, it still parses the expiry...
# except OverflowError: # Only trips if expires is True and a negative expires occurred.
# raise ValueError(('Expiration epoch must be 0 (to disable) or a future time! '
# 'The specified epoch ({0}, {1}) is in the past '
# '(current time is {2}, {3}).').format(expiry,
# str(datetime.datetime.utcfromtimestamp(expiry)),
# datetime.datetime.utcnow().timestamp(),
# str(datetime.datetime.utcnow())))
return(k)
# We can't use self.ctx.create_key; it's a little limiting.
# It's a fairly thin wrapper to .op_createkey() (the C GPGME API gpgme_op_createkey) anyways.
flags = (gpg.constants.create.SIGN |
gpg.constants.create.CERT)
if not expiry:
flags = (flags | gpg.constants.create.NOEXPIRE)
if not passwd:
flags = (flags | gpg.constants.create.NOPASSWD)
params = {'algorithm': _algmaps[algo].format(keysize = keysize),
'expires': expires,
'expires_in': (_epoch_helper(expiry) if expires else 0),
'sign': True,
'passphrase': passwd}
if not key:
self.mykey = self.ctx.get_key(self.ctx.create_key(userid, **params).fpr)
self.subkey = self.mykey.subkeys[0]
else:
# Thanks, gpg/core.py#Context.create_key()!
sys_pinentry = gpg.constants.PINENTRY_MODE_DEFAULT
old_pass_cb = getattr(self, '_passphrase_cb', None)
self.ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook = None):
return(passwd)
self.ctx.set_passphrase_cb(passphrase_cb)
try:
if not key:
try:
self.ctx.op_createkey(userid, algo, 0, 0, flags)
k = self.ctx.get_key(self.ctx.op_genkey_result().fpr, secret = True)
else:
if not isinstance(key, gpg.gpgme._gpgme_key):
key = self.ctx.get_key(key)
if not key:
raise ValueError('Key {0} does not exist'.format())
#self.ctx.op_createsubkey(key, )
finally:
if not passwd:
self.ctx.pinentry_mode = sys_pinentry
if old_pass_cb:
self.ctx.set_passphrase_cb(*old_pass_cb[1:])
return(k)
if not self.mykey:
self.mykey = self.ctx.get_key(self.ctx.create_key(userid, **params).fpr)
self.subkey = self.ctx.get_key(self.ctx.create_subkey(self.mykey, **params).fpr)
self.ctx.signers = [self.subkey]
return()

def GetSigs(self, data_in):
def ListSigs(self, sig_data):
key_ids = []
# Currently as of May 13, 2018 there's no way using the GPGME API to do
# the equivalent of the CLI's --list-packets.
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/
# 059708.html
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/
# 059715.html
# We use the "workaround in:
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/
# 059711.html
# the equivalent of the CLI's --list-packets. https://dev.gnupg.org/T3734
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059708.html
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059715.html
# We use the "workaround" in:
# https://lists.gnupg.org/pipermail/gnupg-users/2018-January/059711.html
try:
self.ctx.verify(data_in)
self.ctx.verify(sig_data)
except gpg.errors.BadSignatures as sig_except:
for line in [i.strip() for i in str(sig_except).splitlines()]:
l = [i.strip() for i in line.split(':')]
key_ids.append(l[0])
return(key_ids)

def CheckSigs(self, keys, sig_data):
def GetSigs(self, data_in, sig_data = None, verify_keys = None):
signers = []
if verify_keys:
# Raises gpg.errors.BadSignatures if any are invalid.
# Unlike Verify below, this will raise an exception.
signers = verify_keys
if sig_data:
# Detached sig
sig = self.ctx.verify(data_in, signature = sig_data, verify = signers)
else:
# Cleartext? or "normal" signatures (embedded)
sig = self.ctx.verify(data_in, verify = signers)
return(sig)

def GetKeysigs(self, pubkey):
sigs = {}
fpr = (pubkey if isinstance(pubkey, str) else pubkey.fpr)
keys = list(self.ctx.keylist(fpr, mode = (gpg.constants.keylist.mode.LOCAL | gpg.constants.keylist.mode.SIGS)))
for idx1, k in enumerate(keys):
sigs[k.fpr] = {}
for idx2, u in enumerate(k.uids):
sigs[k.fpr][u.uid] = {}
for idx3, sig in enumerate(u.signatures):
signer = getattr(sig, 'keyid')
sigs[k.fpr][u.uid][signer] = {}
for a in _keysig_attrs:
if a == 'keyid':
continue
sigs[k.fpr][u.uid][signer][a] = getattr(sig, a)
return(sigs)

def CheckSigs(self, sig, sigkeys = None):
# sig should be a GetSigs result.
is_valid = True
# See self.CheckSigs().
# https://www.gnupg.org/documentation/manuals/gpgme/Verify.html
# https://github.com/micahflee/torbrowser-launcher/issues/262#issuecomment-284342876
sig = sig[1]
result = {}
_keys = [s.fpr.upper() for s in sig.signatures]
if sigkeys:
if isinstance(sigkeys, str):
sigkeys = [sigkeys.upper()]
elif isinstance(sigkeys, list):
_sigkeys = []
for s in sigkeys[:]:
if isinstance(s, str):
_sigkeys.append(s.upper())
elif isinstance(s, gpgme._gpgme_key):
_sigkeys.append(s.fpr)
else:
continue
sigkeys = _sigkeys
elif isinstance(sigkeys, gpgme._gpgme_key):
sigkeys = [sigkeys.fpr]
else:
raise ValueError('sigkeys must be a key fingerprint or a key object (or a list of those).')
if not set(sigkeys).issubset(_keys):
raise ValueError('All specified keys were not present in the signature.')
for s in sig.signatures:
fpr = getattr(s, 'fpr')
result[fpr] = {}
for a in _sig_attrs:
if a == 'fpr':
continue
result[fpr][a] = getattr(s, a)
# Now we do some logic to determine if the sig is "valid".
# Note that we can get confidence level by &'ing "validity" attr against gpg.constants.validity.*
# Or just doing a <, >, <=, etc. operation since it's a sequential list of constants levels, not bitwise.
# For now, we just check if it's valid or not, not "how valid" it is (how much we can trust it).
_status = s.summary
if not _get_sig_isgood(_status):
result[fpr]['valid'] = False
else:
result[fpr]['valid'] = True
if sigkeys:
for k in sigkeys:
if (k not in result) or (not result[k]['valid']):
is_valid = False
break
else: # is_valid is satisfied by at LEAST one valid sig.
is_valid = any([k[1]['valid'] for k in result])
return(is_valid, result)

def Sign(self, data_in, ascii = True, mode = 'detached', notations = None):
# notations is a list of dicts via notation format:
# {<namespace>: {'value': 'some string', 'flags': BITWISE_OR_FLAGS}}
# See RFC 4880 § 5.2.3.16 for valid user namespace format.
if mode.startswith('d'):
mode = gpg.constants.SIG_MODE_DETACH
elif mode.startswith('c'):
mode = gpg.constants.SIG_MODE_CLEAR
elif mode.startswith('n'):
mode = gpg.constants.SIG_MODE_NORMAL
self.ctx.armor = ascii
if not isinstance(data_in, bytes):
if isinstance(data_in, str):
data_in = data_in.encode('utf-8')
else:
# We COULD try serializing to JSON here, or converting to a pickle object,
# or testing for other classes, etc. But we don't.
# TODO?
data_in = repr(data_in).encode('utf-8')
data_in = gpg.Data(data_in)
if notations:
for n in notations:
if not utils.valid().gpgsigNotation(n):
raise ValueError('Malformatted notation: {0}'.format(n))
for ns in n:
self.ctx.sig_notation_add(ns, n[ns]['value'], n[ns]['flags'])
# data_in *always* must be a bytes (or bytes-like?) object.
# It will *always* return a bytes object.
sig = self.ctx.sign(data_in, mode = mode)
# And we need to clear the sig notations, otherwise they'll apply to the next signature this context makes.
self.ctx.sig_notation_clear()
return(sig)

def ImportPubkey(self, pubkey):
fpr = (pubkey if isinstance(pubkey, str) else pubkey.fpr)
try:
self.ctx.verify(sig_data)
except:
pass # TODO
self.ctx.get_key(fpr)
return() # already imported
except gpg.errors.KeyNotFound:
pass
_dflt_klm = self.ctx.get_keylist_mode()
self.ctx.set_keylist_mode(gpg.constants.keylist.mode.EXTERN)
if isinstance(pubkey, gpgme._gpgme_key):
self.ctx.op_import_keys([pubkey])
elif isinstance(pubkey, str):
if not utils.valid().gpgkeyID(pubkey):
raise ValueError('{0} is not a valid key or fingerprint'.format(pubkey))
pubkey = self.ctx.get_key(fpr)
self.ctx.op_import_keys([pubkey])
self.ctx.set_keylist_mode(_dflt_klm)
self.SignKey(pubkey)
return()

def ImportPubkeyFromFile(self, pubkey_data):
_fpath = os.path.abspath(os.path.expanduser(pubkey_data))
if os.path.isfile(_fpath):
with open(_fpath, 'rb') as f:
k = self.ctx.key_import(f.read())
else:
k = self.ctx.key_import(pubkey_data)
pubkey = self.ctx.get_key(k)
self.SignKey(pubkey)
return()

def SignKey(self, pubkey, local = False, notations = None):
# notations is a list of dicts via notation format:
# {<namespace>: {'value': 'some string', 'flags': BITWISE_OR_FLAGS}}
# See RFC 4880 § 5.2.3.16 for valid user namespace format.
if isinstance(pubkey, gpgme._gpgme_key):
pass
elif isinstance(pubkey, str):
if not utils.valid().gpgkeyID(pubkey):
raise ValueError('{0} is not a valid fingerprint'.format(pubkey))
else:
pubkey = self.ctx.get_key(pubkey)
if notations:
for n in notations:
if not utils.valid().gpgsigNotation(n):
raise ValueError('Malformatted notation: {0}'.format(n))
for ns in n:
self.ctx.sig_notation_add(ns, n[ns]['value'], n[ns]['flags'])
self.ctx.key_sign(pubkey, local = local)
self.TrustKey(pubkey)
# And we need to clear the sig notations, otherwise they'll apply to the next signature this context makes.
self.ctx.sig_notation_clear()
return()

def TrustKey(self, pubkey, trust = gpg.constants.validity.FULL):
# We use full as the default because signatures aren't considered valid otherwise.
# TODO: we need a way of maybe reverting/rolling back any changes we do?
output = gpg.Data()
_map = _getEditPrompt(pubkey, trust, 'trust')
self.ctx.interact(pubkey, _KeyEditor(_map).editKey, sink = output, fnc_value = output)
output.seek(0, 0)
return()

def ExportPubkey(self, fpr, ascii = True, sigs = False):
orig_armor = self.ctx.armor
self.ctx.armor = ascii
if sigs:
export_mode = 0
else:
export_mode = gpg.constants.EXPORT_MODE_MINIMAL # default is 0; minimal strips signatures
kb = gpg.Data()
self.ctx.op_export_keys([self.ctx.get_key(fpr)], export_mode, kb)
kb.seek(0, 0)
self.ctx.armor = orig_armor
return(kb.read())

def DeleteKey(self, pubkey):
if isinstance(pubkey, gpgme._gpgme_key):
pass
elif isinstance(pubkey, str):
if not utils.valid().gpgkeyID(pubkey):
raise ValueError('{0} is not a valid fingerprint'.format(pubkey))
else:
pubkey = self.ctx.get_key(pubkey)
self.ctx.op_delete(pubkey, False)
return()

def Verify(self, sig_data, data):
# This is a more "flat" version of CheckSigs.
# First we need to parse the sig(s) and import the key(s) to our keyring.
signers = self.ListSigs(sig_data)
for signer in signers:
self.ImportPubkey(signer)
try:
self.ctx.verify(data, signature = sig_data, verify = signers)
return(True)
except gpg.errors.BadSignatures as err:
return(False)

View File

@ -3,13 +3,14 @@
# Ironically enough, I think building a GUI for this would be *cleaner*.
# Go figure.

import confparse
import datetime
import getpass
import os
import utils
import uuid
import lxml.etree
import utils # LOCAL
import confparse # LOCAL


detect = utils.detect()
generate = utils.generate()

View File

@ -2,9 +2,10 @@ import copy
import os
import pprint
import re
import utils
import lxml.etree
from urllib.parse import urlparse
import utils # LOCAL


etree = lxml.etree
detect = utils.detect()
@ -125,6 +126,7 @@ class Conf(object):
ptrn = _item.format(**self.xml_suppl.btags['regex'])
else:
ptrn = None
# TODO: remove all this shit! we switch to just a mirror url.
_source_item['fname'] = detect.remote_files(
'/'.join((_source['mirror'],
_source['rootpath'])),
@ -182,7 +184,7 @@ class Conf(object):
self.cfg['build']['optimize'] = transform.xml2py(_optimize)
for path in build.xpath('./paths/*'):
self.cfg['build']['paths'][path.tag] = path.text
self.cfg['build']['basedistro'] = build.get('basedistro', 'archlinux')
self.cfg['build']['guests'] = build.get('guests', 'archlinux')
# iso and ipxe are their own basic profile elements, but we group them
# in here because 1.) they're related, and 2.) they're simple to
# import. This may change in the future if they become more complex.

48
bdisk/download.py Normal file
View File

@ -0,0 +1,48 @@
import requests


class Download(object):
def __init__(self, url, progress = True, offset = None, chunksize = 1024):
self.cnt_len = None
self.head = requests.head(url, allow_redirects = True).headers
self.req_headers = {}
self.range = False
self.url = url
self.offset = offset
self.chunksize = chunksize
self.progress = progress
if 'accept-ranges' in self.head:
if self.head['accept-ranges'].lower() != 'none':
self.range = True
if 'content-length' in self.head:
try:
self.cnt_len = int(self.head['content-length'])
except TypeError:
pass
if self.cnt_len and self.offset and self.range:
if not self.offset <= self.cnt_len:
raise ValueError(('The offset requested ({0}) is greater than '
'the content-length value').format(self.offset, self.cnt_len))
self.req_headers['range'] = 'bytes={0}-'.format(self.offset)

def fetch(self):
if not self.progress:
self.req = requests.get(self.url, allow_redirects = True, headers = self.req_headers)
self.bytes_obj = self.req.content
else:
self.req = requests.get(self.url, allow_redirects = True, stream = True, headers = self.req_headers)
self.bytes_obj = bytes()
_bytelen = 0
# TODO: better handling for logging instead of print()s?
for chunk in self.req.iter_content(chunk_size = self.chunksize):
self.bytes_obj += chunk
if self.cnt_len:
print('\033[F')
print('{0:.2f}'.format((_bytelen / float(self.head['content-length'])) * 100),
end = '%',
flush = True)
_bytelen += self.chunksize
else:
print('.', end = '')
print()
return(self.bytes_obj)

View File

@ -1,14 +1,14 @@
import hashlib
import importlib # needed for the guest-os-specific stuff...
import os
from . import utils
import download # LOCAL
from urllib.parse import urljoin


def hashsum_downloader(url, filename = None):
# TODO: support "latest" and "regex" flags? or remove from specs (since the tarball can be specified by these)?
# move that to the utils.DOwnload() class?
d = utils.Download(url, progress = False)
# move that to the download.Download() class?
d = download.Download(url, progress = False)
hashes = {os.path.basename(k):v for (v, k) in [line.split() for line in d.fetch().decode('utf-8').splitlines()]}
if filename:
if filename in hashes:
@ -19,19 +19,26 @@ def hashsum_downloader(url, filename = None):


class Prepper(object):
def __init__(self, dirs, sources, gpg = None):
# dirs is a ConfParse.cfg['build']['paths'] dict of dirs
self.CreateDirs(dirs)
# TODO: set up GPG env here so we can use it to import sig key and verify sources
for idx, s in enumerate(sources):
# Prepare sources, destinations, etc.
def __init__(self, cfg):
self.cfg = cfg
self.CreateDirs(self.cfg['build']['paths'])
if 'handler' not in self.cfg['gpg'] or not self.cfg['gpg']['handler']:
if self.cfg['gpg']['gnupghome']:
os.environ['GNUPGHOME'] = self.cfg['gpg']['gnupghome']
from . import GPG
self.cfg['gpg']['handler'] = GPG.GPGHandler(gnupg_homedir = self.cfg['gpg']['gnupghome'],
key_id = self.cfg['gpg']['keyid'])
self.gpg = self.cfg['gpg']['handler']
for idx, s in enumerate(self.cfg['sources']):
self._download(idx)

def CreateDirs(self, dirs):
for d in dirs:
os.makedirs(d, exist_ok = True)
os.chmod(d, 0o700)
return()


def _download(self, source_idx):
download = True
_source = self.cfg['sources'][source_idx]
@ -58,10 +65,12 @@ class Prepper(object):
if _hash.hexdigest().lower() != _source['checksum']['value'].lower():
return(False)
return(True)
def _sig_verify(gpg_instance): # TODO: move to utils.valid()? or just use as part of the bdisk.GPG module?
pass
def _sig_verify(): # TODO: move to utils.valid()?
if 'sig' in _source:
pass
return(True)
if os.path.isfile(_tarball):
download = _hash_verify()
download = _sig_verify()
if download:
d = utils.Download(_remote_tarball)
d = download.Download(_remote_tarball)

View File

@ -1,14 +1,47 @@
#!/usr/bin/env python3

# Supported initsys values:
# systemd
# Possible future inclusions:
# openrc
# runit
# sinit
# s6
# shepherd
initsys = 'systemd'
from .. import download # LOCAL # do i need to escalate two levels up?
import os
from .. import utils

# TODO: can this be trimmed down?
prereqs = ['arch-install-scripts', 'archiso', 'bzip2', 'coreutils', 'customizepkg-scripting', 'cronie', 'dhclient',
'dhcp', 'dhcpcd', 'dosfstools', 'dropbear', 'efibootmgr', 'efitools', 'efivar', 'file', 'findutils',
'iproute2', 'iputils', 'libisoburn', 'localepurge', 'lz4', 'lzo', 'lzop', 'mkinitcpio-nbd',
'mkinitcpio-nfs-utils', 'mkinitcpio-utils', 'nbd', 'ms-sys', 'mtools', 'net-tools', 'netctl',
'networkmanager', 'pv', 'python', 'python-pyroute2', 'rsync', 'sed', 'shorewall', 'squashfs-tools',
'sudo', 'sysfsutils', 'syslinux', 'traceroute', 'vi']

class Manifest(object):
def __init__(self, cfg):
self.cfg = cfg
self.name = 'archlinux'
self.version = None # rolling release
self.release = None # rolling release
# https://www.archlinux.org/master-keys/
# Pierre Schmitz. https://www.archlinux.org/people/developers/#pierre
self.gpg_authorities = ['4AA4767BBC9C4B1D18AE28B77F2D434B9741E8AC']
self.tarball = None
self.sig = None
self.checksum = {'sha1': None,
'md5': None}
self._get_filename()

def _get_filename(self):
# TODO: cache this info
webroot = 'iso/latest'
for m in self.cfg['mirrors']:
uri = os.path.join(m, webroot)
try:
self.tarball = utils.detect().remote_files(uri, ptrn = ('archlinux-'
'bootstrap-'
'[0-9]{4}\.'
'[0-9]{2}\.'
'[0-9]{2}-'
'x86_64\.tar\.gz$'))[0]
except Exception as e:
pass


def extern_prep(cfg, cur_arch = 'x86_64'):
import os

View File

@ -1 +1 @@
import GIT
import GIT # LOCAL

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3

import argparse
import confparse
import confparse # LOCAL

"""The primary user interface for BDisk. If we are running interactively,
parse arguments first, then initiate a BDisk session."""

View File

@ -3,29 +3,29 @@
import _io
import copy
import crypt
import GPG
import GPG # LOCAL
import getpass
import hashid
import hashlib
import iso3166
import os
import pprint
import prompt_strings
import prompt_strings # LOCAL
import re
import string
import uuid
import validators
import zlib
import requests
import lxml.etree
import lxml.objectify
from bs4 import BeautifulSoup
from collections import OrderedDict
from dns import resolver
from download import Download # LOCAL
from email.utils import parseaddr as emailparse
from passlib.context import CryptContext as cryptctx
from urllib.parse import urlparse
from urllib.request import urlopen


# Supported by all versions of GNU/Linux shadow
passlib_schemes = ['des_crypt', 'md5_crypt', 'sha256_crypt', 'sha512_crypt']
@ -43,53 +43,6 @@ crypt_map = {'sha512': crypt.METHOD_SHA512,
'des': crypt.METHOD_CRYPT}


class Download(object):
def __init__(self, url, progress = True, offset = None, chunksize = 1024):
self.cnt_len = None
self.head = requests.head(url, allow_redirects = True).headers
self.req_headers = {}
self.range = False
self.url = url
self.offset = offset
self.chunksize = chunksize
self.progress = progress
if 'accept-ranges' in self.head:
if self.head['accept-ranmges'].lower() != 'none':
self.range = True
if 'content-length' in self.head:
try:
self.cnt_len = int(self.head['content-length'])
except TypeError:
pass
if self.cnt_len and self.offset and self.range:
if not self.offset <= self.cnt_len:
raise ValueError(('The offset requested ({0}) is greater than '
'the content-length value').format(self.offset, self.cnt_len))
self.req_headers['range'] = 'bytes={0}-'.format(self.offset)

def fetch(self):
if not self.progress:
self.req = requests.get(self.url, allow_redirects = True, headers = self.req_headers)
self.bytes_obj = self.req.content
else:
self.req = requests.get(self.url, allow_redirects = True, stream = True, headers = self.req_headers)
self.bytes_obj = bytes()
_bytelen = 0
# TODO: better handling for logging instead of print()s?
for chunk in self.req.iter_content(chunk_size = self.chunksize):
self.bytes_obj += chunk
if self.cnt_len:
print('\033[F')
print('{0:.2f}'.format((_bytelen / float(self.head['content-length'])) * 100),
end = '%',
flush = True)
_bytelen += self.chunksize
else:
print('.', end = '')
print()
return(self.bytes_obj)


class XPathFmt(string.Formatter):
def get_field(self, field_name, args, kwargs):
vals = self.get_value(field_name, args, kwargs), field_name
@ -159,15 +112,15 @@ class detect(object):
# But we CAN sort by filename.
if 'latest' in flags:
urls = sorted(list(set(urls)))
urls = urls[-1]
else:
urls = urls[0]
# urls = urls[-1]
# else:
# urls = urls[0]
return(urls)

def gpgkeyID_from_url(self, url):
data = Download(url, progress = False).bytes_obj
g = GPG.GPGHandler()
key_ids = g.get_sigs(data)
key_ids = g.GetSigs(data)
del(g)
return(key_ids)

@ -758,17 +711,17 @@ class valid(object):
return(False)
return(True)

def dns(self, addr):
pass
def dns(self, record):
return (not isinstance(validators.domain(record)), validators.utils.ValidationFailure)

def connection(self, conninfo):
# conninfo should ideally be (host, port)
pass
pass # TODO

def email(self, addr):
return(
return (
not isinstance(validators.email(emailparse(addr)[1]),
validators.utils.ValidationFailure))
validators.utils.ValidationFailure))

def gpgkeyID(self, key_id):
# Condense fingerprints into normalized 40-char "full" key IDs.
@ -783,6 +736,33 @@ class valid(object):
return(False)
return(True)

def gpgsigNotation(self, notations):
# RFC 4880 § 5.2.3.16
# A valid notation fmt: {'name@domain.tld': {'value': 'some str', 'flags': 1}}
if not isinstance(notations, dict):
return(False)
for n in notations:
# namespace
s = n.split('@')
if not len(s) != 2:
return(False) # IETF namespaces not supported by GPGME?
dom = s[1]
if not self.dns(dom):
return(False)
# flags
# TODO: is there a better way to do this? basically confirm a value is a valid bitmask?
flags = sorted([const for const in vars(GPG.gpg.constants.sig.notation).values() if isinstance(const, int)])
if not isinstance(n['flags'], int):
return(False)
if not n['flags'] >= flags[0]: # at LEAST the lowest flag
return(False)
if not n['flags'] <= sum(flags): # at MOST the highest sum of all flags
return(False)
# values
if not isinstance(n['value'], str): # per RFC, non-text data for values currently is not supported
return(False)
return(True)

def integer(self, num):
try:
int(num)