password generation
This commit is contained in:
parent
01cdfa79b4
commit
062dd46c27
@ -17,6 +17,7 @@ from . import constants
|
||||
from . import gpg_handler
|
||||
from . import mounts
|
||||
from . import pass_import
|
||||
from . import pwgen
|
||||
from . import QR
|
||||
|
||||
|
||||
@ -63,9 +64,10 @@ class VaultPass(object):
|
||||
return(False)
|
||||
|
||||
def _getHandler(self, mount, func = 'read', *args, **kwargs):
|
||||
if func not in ('read', 'write', 'list', 'delete', 'destroy'):
|
||||
funcs = ('read', 'write', 'list', 'delete', 'destroy')
|
||||
if func not in funcs:
|
||||
_logger.error('Invalid func')
|
||||
_logger.debug('Invalid func; must be one of: read, write, list, delete, destroy')
|
||||
_logger.debug('Invalid func; must be one of: {0}'.format(', '.join(funcs)))
|
||||
raise ValueError('Invalid func')
|
||||
mtype = self.mount.getMountType(mount)
|
||||
handler = None
|
||||
@ -168,22 +170,29 @@ class VaultPass(object):
|
||||
self.deleteSecret(oldpath, mount, force = force)
|
||||
return(None)
|
||||
|
||||
def createSecret(self, secret_dict, path, mount_name, *args, **kwargs):
|
||||
mtype = self.mount.mounts.get(mount_name)
|
||||
handler = None
|
||||
def createSecret(self, secret_dict, path, mount, force = False, *args, **kwargs):
|
||||
mtype = self.mount.mounts.get(mount)
|
||||
if not mtype:
|
||||
_logger.error('Could not determine mount type')
|
||||
_logger.debug('Could not determine mount type for mount {0}'.format(mount_name))
|
||||
_logger.debug('Could not determine mount type for mount {0}'.format(mount))
|
||||
raise RuntimeError('Could not determine mount type')
|
||||
args = {'path': path,
|
||||
'mount_point': mount_name,
|
||||
'mount_point': mount,
|
||||
'secret': secret_dict}
|
||||
if mtype == 'cubbyhole':
|
||||
handler = self.mount.cubbyhandler.write_secret
|
||||
elif mtype == 'kv1':
|
||||
handler = self.client.secrets.kv.v1.create_or_update_secret
|
||||
elif mtype == 'kv2':
|
||||
handler = self.client.secrets.kv.v2.create_or_update_secret
|
||||
path_exists = self._pathExists(path, mount)
|
||||
if path_exists:
|
||||
for k in secret_dict.keys():
|
||||
kpath = '/'.join(path, k)
|
||||
exists = self._pathExists(kpath, mount, is_secret = True)
|
||||
if exists:
|
||||
_logger.warning('A secret named {0} at {1}:{2} exists.'.format(k, mount, path))
|
||||
if not force:
|
||||
_logger.error('Cannot create secret; a name already exists.')
|
||||
raise ValueError('Cannot create secret; a name already exists.')
|
||||
if path_exists:
|
||||
handler = self._getHandler(mount, func = 'update')
|
||||
else:
|
||||
handler = self._getHandler(mount, func = 'write')
|
||||
resp = handler(**args)
|
||||
return(resp)
|
||||
|
||||
@ -227,6 +236,7 @@ class VaultPass(object):
|
||||
def generateSecret(self,
|
||||
path,
|
||||
mount,
|
||||
kname = None,
|
||||
symbols = True,
|
||||
clip = False,
|
||||
seconds = constants.CLIP_TIMEOUT,
|
||||
@ -236,8 +246,27 @@ class VaultPass(object):
|
||||
qr = False,
|
||||
force = False,
|
||||
length = constants.GENERATED_LENGTH,
|
||||
printme = False,
|
||||
*args, **kwargs):
|
||||
pass # TODO
|
||||
charset = {'simple': chars_plain,
|
||||
'complex': chars}
|
||||
pg_args = {'length': length,
|
||||
'chars': charset,
|
||||
'charset': ('complex' if symbols else 'simple')}
|
||||
pg = pwgen.genPass(**pg_args)
|
||||
pg.genPW()
|
||||
passwd = pg.pw
|
||||
if not kname:
|
||||
lpath = path.split('/')
|
||||
kname = lpath[-1]
|
||||
path = '/'.join(lpath[0:-1])
|
||||
args = {'secret_dict': {kname: passwd},
|
||||
'path': path,
|
||||
'mount': mount,
|
||||
'force': force}
|
||||
self.createSecret(**args)
|
||||
self.getSecret(path, mount, kname = kname, clip = clip, qr = qr, seconds = seconds, printme = printme)
|
||||
return(passwd)
|
||||
|
||||
def getClient(self):
|
||||
auth_xml = self.cfg.xml.find('.//auth')
|
||||
@ -320,7 +349,7 @@ class VaultPass(object):
|
||||
# But that breaks compat with Pass' behaviour.
|
||||
if printme:
|
||||
print('Now displaying generated QR code. Please close the viewer when done saving/scanning to '
|
||||
'securely clean up the generated file...')
|
||||
'securely clean up the generated file and continue...')
|
||||
cmd = subprocess.run(['xdg-open', fpath], stdout = subprocess.PIPE, stderr = subprocess.PIPE)
|
||||
if cmd.returncode != 0:
|
||||
_logger.error('xdg-open returned non-zero status code')
|
||||
@ -339,7 +368,7 @@ class VaultPass(object):
|
||||
qrdata.seek(0, 0)
|
||||
del(qrdata)
|
||||
if clip not in (False, None):
|
||||
clipboard.pasteClipboard(data, seconds = seconds, clipboard = clipboard, printme = printme)
|
||||
clipboard.pasteClipboard(data, seconds = seconds, printme = printme)
|
||||
return(data)
|
||||
|
||||
def initVault(self, *args, **kwargs):
|
||||
|
@ -114,7 +114,7 @@ def parseArgs():
|
||||
metavar = 'NAME_PATTERN',
|
||||
help = ('List secrets\' paths whose names match the regex NAME_PATTERN'))
|
||||
# GENERATE
|
||||
# vp.generateSecret()
|
||||
# vp.generateSecret(printme = True)
|
||||
# TODO: feature parity with passgen (spaces? etc.)
|
||||
gen.add_argument('-n', '--no-symbols',
|
||||
dest = 'symbols',
|
||||
|
@ -30,28 +30,38 @@ class CubbyHandler(object):
|
||||
# Alias function
|
||||
return(self.write_secret(*args, **kwargs))
|
||||
|
||||
def list_secrets(self, path, mount_point = 'cubbyhole'):
|
||||
def list_secrets(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
||||
path = path.lstrip('/')
|
||||
uri = '/v1/{0}/{1}'.format(mount_point, path)
|
||||
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
||||
resp = self.client._adapter.list(url = uri)
|
||||
return(resp.json())
|
||||
|
||||
def read_secret(self, path, mount_point = 'cubbyhole'):
|
||||
def read_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
||||
path = path.lstrip('/')
|
||||
# uri = '/v1/{0}/{1}'.format(mount_point, path)
|
||||
uri = '{0}/{1}'.format(mount_point, path)
|
||||
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
||||
resp = self.client._adapter.get(url = uri)
|
||||
return(resp.json())
|
||||
|
||||
def remove_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
||||
path = path.lstrip('/')
|
||||
uri = '{0}/{1}'.format(mount_point, path)
|
||||
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
||||
resp = self.client._adapter.delete(url = uri)
|
||||
return(resp.json())
|
||||
|
||||
def update_secret(self, secret, path, mount_point = 'cubbyhole', *args, **kwargs):
|
||||
existing = self.read_secret(path, mount_point)
|
||||
data = existing.get('data')
|
||||
if not data:
|
||||
resp = self.write_secret(path, secret, mount_point = mount_point)
|
||||
else:
|
||||
data.update(secret)
|
||||
self.remove_secret(path, mount_point)
|
||||
resp = self.write_secret(path, data, mount_point)
|
||||
return(resp)
|
||||
|
||||
def write_secret(self, path, secret, mount_point = 'cubbyhole', *args, **kwargs):
|
||||
path = path.lstrip('/')
|
||||
args = {'path': '/'.join((mount_point, path))}
|
||||
args = {'path': 'v1/{0}'.format('/'.join((mount_point, path)))}
|
||||
for k, v in secret.items():
|
||||
if k in args.keys():
|
||||
_logger.error('Cannot use reserved secret name')
|
||||
|
204
vaultpass/pwgen.py
Normal file
204
vaultpass/pwgen.py
Normal file
@ -0,0 +1,204 @@
|
||||
# Thanks to https://gist.github.com/stantonk/7268449
|
||||
# See also:
|
||||
# http://stackoverflow.com/questions/5480131/will-python-systemrandom-os-urandom-always-have-enough-entropy-for-good-crypto
|
||||
import argparse
|
||||
import random
|
||||
import re
|
||||
import warnings
|
||||
##
|
||||
from . import constants
|
||||
##
|
||||
try:
|
||||
import passlib.context
|
||||
import passlib.hash
|
||||
has_passlib = True
|
||||
except ImportError:
|
||||
# TODO: adler32 and crc32 via zlib module?
|
||||
import hashlib
|
||||
has_passlib = False
|
||||
|
||||
|
||||
if has_passlib:
|
||||
supported_hashes = tuple(i for i in dir(passlib.hash) if not i.startswith('_'))
|
||||
else:
|
||||
supported_hashes = tuple(hashlib.algorithms_available)
|
||||
|
||||
# By default, complex is symbols and mixed-case alphanumeric. simple is mixed-case alphanumeric.
|
||||
charsets = {'simple': constants.ALPHANUM_PASS_CHARS,
|
||||
'complex': constants.ALL_PASS_CHARS}
|
||||
|
||||
|
||||
class genPass(object):
|
||||
def __init__(self,
|
||||
case = None,
|
||||
charset = 'complex',
|
||||
chars = None,
|
||||
passlen = 32,
|
||||
quotes = True,
|
||||
backslashes = True,
|
||||
human = False,
|
||||
hashes = None,
|
||||
*args,
|
||||
**kwargs):
|
||||
if not chars:
|
||||
chars = charsets
|
||||
self.charselect = chars
|
||||
self.charset = charset
|
||||
self.hashnames = hashes
|
||||
self.hashes = {}
|
||||
self.hasher = None
|
||||
self.pw = None
|
||||
self.chars = None
|
||||
self.case = case
|
||||
self.quotes = quotes
|
||||
self.passlen = passlen
|
||||
self.backslashes = backslashes
|
||||
self.human = human
|
||||
self.buildCharSet()
|
||||
|
||||
def buildCharSet(self):
|
||||
self.chars = self.charselect[self.charset]
|
||||
if not self.quotes:
|
||||
self.chars = re.sub('["\']', '', self.chars)
|
||||
if not self.backslashes:
|
||||
self.chars = re.sub('\\\\', '', self.chars)
|
||||
if self.human:
|
||||
_dupechars = ['`', "'", '|', 'l', 'I', 'i', 'l', '1', 'o', '0', 'O']
|
||||
self.chars = ''.join(sorted(list(set(self.chars) - set(_dupechars))))
|
||||
if self.case == 'upper':
|
||||
self.chars = self.chars.upper()
|
||||
elif self.case == 'lower':
|
||||
self.chars = self.chars.lower()
|
||||
self.chars = ''.join(sorted(list(set(self.chars))))
|
||||
return(None)
|
||||
|
||||
def buildHashers(self):
|
||||
if self.hashnames:
|
||||
if not isinstance(self.hashnames, list):
|
||||
_hashes = [self.hashnames]
|
||||
for h in self.hashnames:
|
||||
if h not in supported_hashes:
|
||||
warnings.warn('Hash algorithm {0} is not a supported hash algorithm'.format(h))
|
||||
continue
|
||||
self.hashes[h] = None
|
||||
if has_passlib:
|
||||
self.hasher = passlib.context.CryptContext(schemes = list(self.hashes.keys()))
|
||||
else:
|
||||
self.hasher = {}
|
||||
for h in self.hashnames:
|
||||
self.hasher[h] = getattr(hashlib, h)
|
||||
return(None)
|
||||
|
||||
def generate(self):
|
||||
self.genPW()
|
||||
self.genHash()
|
||||
return(None)
|
||||
|
||||
def genPW(self):
|
||||
self.pw = ''
|
||||
for _ in range(self.passlen):
|
||||
self.pw += random.SystemRandom().choice(self.chars)
|
||||
return(None)
|
||||
|
||||
def genHash(self):
|
||||
self.buildHashers()
|
||||
if not self.hashes or not self.hasher:
|
||||
return(None)
|
||||
if not self.pw:
|
||||
self.genPW()
|
||||
for h in self.hashes.keys():
|
||||
if has_passlib:
|
||||
if h.endswith('_crypt'):
|
||||
try:
|
||||
self.hashes[h] = self.hasher.hash(self.pw, scheme = h, rounds = 5000)
|
||||
except TypeError:
|
||||
self.hashes[h] = self.hasher.hash(self.pw, scheme = h)
|
||||
else:
|
||||
self.hashes[h] = self.hasher.hash(self.pw, scheme = h)
|
||||
else:
|
||||
_hasher = self.hasher[h]
|
||||
_hasher.update(self.pw.encode('utf-8'))
|
||||
self.hashes[h] = _hasher.hexdigest()
|
||||
return(None)
|
||||
|
||||
|
||||
def parseArgs():
|
||||
args = argparse.ArgumentParser(description = 'A password generator.')
|
||||
args.add_argument('-t', '--type',
|
||||
dest = 'charset',
|
||||
choices = ['simple', 'complex'], # chars in genPass
|
||||
default = 'complex',
|
||||
help = ('Whether to generate "simple" (no symbols, '
|
||||
'safer for e.g. databases) password(s) or more complex ones. The default is "complex"'))
|
||||
args.add_argument('-l', '--length',
|
||||
dest = 'passlen',
|
||||
metavar = 'LENGTH',
|
||||
type = int,
|
||||
default = 32,
|
||||
help = ('The length of the password(s) to generate. The default is 32'))
|
||||
args.add_argument('-c', '--count',
|
||||
dest = 'passcount',
|
||||
metavar = 'COUNT',
|
||||
type = int,
|
||||
default = 1,
|
||||
help = ('The number of passwords to generate. The default is 1'))
|
||||
args.add_argument('-q', '--no-quotes',
|
||||
dest = 'quotes',
|
||||
action = 'store_false',
|
||||
help = ('If specified, strip out quotation marks (both " and \') from the passwords. '
|
||||
'Only relevant if -t/--type is complex, as simple types don\'t contain these'))
|
||||
args.add_argument('-b', '--no-backslashes',
|
||||
dest = 'backslashes',
|
||||
action = 'store_false',
|
||||
help = ('If specified, strip out backslashes. Only relevant if -t/--type is complex, as '
|
||||
'simple types don\'t contain these'))
|
||||
args.add_argument('-m', '--human',
|
||||
dest = 'human',
|
||||
action = 'store_true',
|
||||
help = ('If specified, make the passwords easier to read by human eyes (i.e. no 1 and l, '
|
||||
'o or O or 0, etc.)'))
|
||||
caseargs = args.add_mutually_exclusive_group()
|
||||
caseargs.add_argument('-L', '--lower',
|
||||
dest = 'case',
|
||||
action = 'store_const',
|
||||
const = 'lower',
|
||||
help = 'If specified, make password all lowercase')
|
||||
caseargs.add_argument('-U', '--upper',
|
||||
dest = 'case',
|
||||
action = 'store_const',
|
||||
const = 'upper',
|
||||
help = 'If specified, make password all UPPERCASE')
|
||||
args.add_argument('-H', '--hash',
|
||||
action = 'append',
|
||||
metavar = 'HASH_NAME',
|
||||
dest = 'hashes',
|
||||
help = ('If specified, also generate hashes for the generated password. '
|
||||
'Pass this argument multiple times for multiple hash types. Use -HL/--hash-list for '
|
||||
'supported hash algorithms'))
|
||||
args.add_argument('-HL', '--hash-list',
|
||||
dest = 'only_hashlist',
|
||||
action = 'store_true',
|
||||
help = ('Print the list of supported hash types/algorithms and quit'))
|
||||
return(args)
|
||||
|
||||
|
||||
def main():
|
||||
args = vars(parseArgs().parse_args())
|
||||
if args['only_hashlist']:
|
||||
print('SUPPORTED HASH ALGORITHMS:\n')
|
||||
print(' *', '\n * '.join(supported_hashes))
|
||||
return(None)
|
||||
for _ in range(0, args['passcount']):
|
||||
p = genPass(**args)
|
||||
p.generate()
|
||||
print(p.pw)
|
||||
if p.hashes:
|
||||
print('\nHASHES:')
|
||||
for h, val in p.hashes.items():
|
||||
print('{0}: {1}'.format(h, val))
|
||||
print()
|
||||
return(None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user