just need to bind args to funcs and finish docs. meat is done.
This commit is contained in:
parent
018384dfce
commit
606196a022
5
pass.py
5
pass.py
@ -15,8 +15,9 @@ def main():
|
||||
if args.oper == 'version':
|
||||
print('{0} {1}'.format(vaultpass.constants.NAME,
|
||||
vaultpass.constants.VERSION))
|
||||
import pprint
|
||||
pprint.pprint(vars(args))
|
||||
args.initialize = (True if args.oper == 'init' else False)
|
||||
args.verify_cfg = (True if args.oper == 'verify' else False)
|
||||
vp = vaultpass.VaultPass(**vars(args))
|
||||
return(None)
|
||||
|
||||
|
||||
|
@ -1 +0,0 @@
|
||||
../vaultpass
|
@ -7,10 +7,10 @@ _logger = logging.getLogger()
|
||||
try:
|
||||
import qrcode
|
||||
has_qrcode = True
|
||||
_logger.warning(('Could not import qrcode; '
|
||||
'library required for QR code generation'))
|
||||
except ImportError:
|
||||
has_qrcode = False
|
||||
_logger.warning(('Could not import qrcode; '
|
||||
'library required for QR code generation'))
|
||||
try:
|
||||
import qrcode.image.svg
|
||||
has_qrcode_svg = True
|
||||
|
@ -2,6 +2,8 @@ import getpass
|
||||
import logging
|
||||
import tempfile
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
@ -19,7 +21,6 @@ from . import constants
|
||||
from . import editor
|
||||
from . import gpg_handler
|
||||
from . import mounts
|
||||
from . import pass_import
|
||||
from . import pwgen
|
||||
from . import QR
|
||||
|
||||
@ -30,9 +31,22 @@ class VaultPass(object):
|
||||
uri = None
|
||||
mount = None
|
||||
|
||||
def __init__(self, initialize = False, cfg = '~/.config/vaultpass.xml'):
|
||||
def __init__(self,
|
||||
initialize = False,
|
||||
cfg = '~/.config/vaultpass.xml',
|
||||
verify_cfg = True,
|
||||
loglevel = constants.DEFAULT_LOGLEVEL,
|
||||
*args,
|
||||
**kwargs):
|
||||
rootlogger = logging.getLogger()
|
||||
if loglevel != constants.DEFAULT_LOGLEVEL:
|
||||
if not isinstance(loglevel, int):
|
||||
# We need to convert it from the name to the int.
|
||||
loglevel = getattr(logging, loglevel.upper())
|
||||
if loglevel != constants.DEFAULT_LOGLEVEL: # And again in case we transformed it above.
|
||||
rootlogger.setLevel(loglevel)
|
||||
self.initialize = initialize
|
||||
self.cfg = config.getConfig(cfg)
|
||||
self.cfg = config.getConfig(cfg, validate = verify_cfg)
|
||||
self._getURI()
|
||||
self.getClient()
|
||||
if not self.initialize:
|
||||
@ -140,8 +154,39 @@ class VaultPass(object):
|
||||
force = False,
|
||||
gpghome = constants.GPG_HOMEDIR,
|
||||
pass_dir = constants.PASS_DIR,
|
||||
flat = False,
|
||||
*args, **kwargs):
|
||||
pass # TODO
|
||||
pass_dir = os.path.abspath(os.path.expanduser(pass_dir))
|
||||
gpg = gpg_handler.GPG(home = gpghome)
|
||||
kname_re = re.compile(r'^(?P<kname>[^/]+)\.(gpg|asc)$')
|
||||
for root, dirs, files in os.walk(pass_dir):
|
||||
rel_root = pathlib.Path(root).relative_to(pass_dir)
|
||||
for f in files:
|
||||
r = kname_re.search(f)
|
||||
if not r:
|
||||
continue
|
||||
kname = r.groupdict()['kname']
|
||||
dcryptdata = gpg.decrypt(os.path.join(root, f)).decode('utf-8')
|
||||
if flat:
|
||||
path = os.path.dirname(rel_root)
|
||||
data = {kname: dcryptdata}
|
||||
self.createSecret(data, path, mount, force = force)
|
||||
else:
|
||||
data = {}
|
||||
k = None
|
||||
v = ''
|
||||
for line in dcryptdata.splitlines():
|
||||
l = [i.strip() for i in line.split(':', 1) if i.strip() != '']
|
||||
if len(l) == 1:
|
||||
v += '\n{0}'.format(l[0])
|
||||
elif len(l) == 0:
|
||||
continue
|
||||
else:
|
||||
data[k] = v
|
||||
k = l[0]
|
||||
v = l[1]
|
||||
self.createSecret(data, path = '/'.join((rel_root, kname)), mount = mount, force = force)
|
||||
return(None)
|
||||
|
||||
def copySecret(self, oldpath, newpath, mount, newmount = None, force = False, remove_old = False, *args, **kwargs):
|
||||
mtype = self.mount.getMountType(mount)
|
||||
@ -497,7 +542,22 @@ class VaultPass(object):
|
||||
return(data)
|
||||
|
||||
def searchSecrets(self, pattern, mount, *args, **kwargs):
|
||||
pass # TODO
|
||||
print('This may take a while...')
|
||||
ptrn = re.compile(pattern)
|
||||
self.mount.getSecretsTree(mounts = mount)
|
||||
for p in self.mount.flatpaths:
|
||||
data = self.getSecret(p, mount)
|
||||
if data:
|
||||
for k, v in data.items():
|
||||
if ptrn.search(v):
|
||||
print('/'.join((mount, p, k)))
|
||||
return(None)
|
||||
|
||||
def searchSecretNames(self, pattern, mount, *args, **kwargs):
|
||||
pass # TODO
|
||||
ptrn = re.compile(pattern)
|
||||
self.mount.getSecretsTree(mounts = mount)
|
||||
for p in self.mount.flatpaths:
|
||||
n = p.split('/')[-1]
|
||||
if ptrn.search(n):
|
||||
print(p)
|
||||
return(None)
|
||||
|
@ -14,12 +14,25 @@ def parseArgs():
|
||||
action = 'version',
|
||||
version = '{0} {1}'.format(constants.NAME, constants.VERSION))
|
||||
args.add_argument('-c', '--config',
|
||||
dest = 'cfg',
|
||||
default = '~/.config/vaultpass.xml',
|
||||
help = ('The path to your configuration file. Default: ~/.config/vaultpass.xml'))
|
||||
args.add_argument('-l', '--loglevel',
|
||||
dest = 'loglevel',
|
||||
default = constants.DEFAULT_LOGLEVEL,
|
||||
help = ('The log level. Default: {0}').format(constants.DEFAULT_LOGLEVEL_NAME))
|
||||
# I can't get this to change in the logger root. TODO.
|
||||
# args.add_argument('-L', '--logfile',
|
||||
# dest = 'logfile',
|
||||
# default = constants.DEFAULT_LOGFILE,
|
||||
# help = ('The file to use for logging. '
|
||||
# 'Default: {0}').format(constants.DEFAULT_LOGFILE))
|
||||
args.add_argument('-m', '--mount',
|
||||
dest = 'mount',
|
||||
default = 'secret',
|
||||
help = ('The mount to use in OPERATION. If not specified, assume a mount named "secret"'))
|
||||
default = constants.SELECTED_DEFAULT_MOUNT,
|
||||
help = (('The mount to use in OPERATION. '
|
||||
'If not specified, assume a mount named '
|
||||
'"{0}"').format(constants.SELECTED_DEFAULT_MOUNT)))
|
||||
# I wish argparse supported default subcommands. It doesn't as of python 3.8.
|
||||
subparser = args.add_subparsers(help = ('Operation to perform'),
|
||||
metavar = 'OPERATION',
|
||||
@ -79,6 +92,9 @@ def parseArgs():
|
||||
importvault = subparser.add_parser('import',
|
||||
description = ('Import your existing Pass into Vault'),
|
||||
help = ('Import your existing Pass into Vault'))
|
||||
verify = subparser.add_parser('verify',
|
||||
description = ('Verify the validity and syntax of your configuration file'),
|
||||
help = ('Verify the validity and syntax of your configuration file'))
|
||||
# CP/COPY
|
||||
# vp.copySecret()
|
||||
cp.add_argument('-f', '--force',
|
||||
@ -460,10 +476,16 @@ def parseArgs():
|
||||
importvault.add_argument('-f', '--force',
|
||||
dest = 'force',
|
||||
action = 'store_true',
|
||||
help = ('If specified, overwrite the destination in Vault.'))
|
||||
help = ('If specified, overwrite the destination in Vault'))
|
||||
importvault.add_argument('-F', '--flat',
|
||||
action = 'store_true',
|
||||
help = ('Being that this is already a very tenuous process, this allows a bit more '
|
||||
'flexibility - passing -F/--flat indicates that the content itself rather than '
|
||||
'the path is significant (see the README for more information)'))
|
||||
importvault.add_argument('mount',
|
||||
metavar = 'MOUNT_NAME',
|
||||
help = 'The mount name in Vault to import into (Pass\' hierarchy will be recreated). '
|
||||
'This mount MUST exist first and MUST be KV2 if auth is provided that does not '
|
||||
'have CREATE access on /sys/mounts!')
|
||||
# VERIFY has no args.
|
||||
return(args)
|
||||
|
@ -345,5 +345,5 @@ def getConfig(cfg_ref, validate = True, populate_defaults = True, xsd_path = Non
|
||||
break
|
||||
if cfgobj:
|
||||
_logger.info('Parsing configuration.')
|
||||
cfgobj.main(validate = validate, populate_defaults = populate_defaults)
|
||||
cfgobj.main(validate = validate, populate_defaults = (populate_defaults if validate else False))
|
||||
return(cfgobj)
|
||||
|
@ -1,4 +1,5 @@
|
||||
import os
|
||||
import logging
|
||||
import string
|
||||
|
||||
# These are static.
|
||||
@ -7,6 +8,10 @@ VERSION = '0.0.1'
|
||||
SUPPORTED_ENGINES = ('kv1', 'kv2', 'cubbyhole')
|
||||
# SUPPORTED_OUTPUT_FORMATS = ('pretty', 'yaml', 'json', 'tree')
|
||||
SUPPORTED_OUTPUT_FORMATS = ('pretty', 'yaml', 'json')
|
||||
DEFAULT_LOGFILE = os.path.abspath(os.path.expanduser('~/.cache/vaultpass/vaultpass.log'))
|
||||
DEFAULT_LOGLEVEL_NAME = 'WARNING'
|
||||
DEFAULT_LOGLEVEL = getattr(logging, DEFAULT_LOGLEVEL_NAME)
|
||||
DEFAULT_MOUNT = 'secret'
|
||||
ALPHA_LOWER_PASS_CHARS = string.ascii_lowercase
|
||||
ALPHA_UPPER_PASS_CHARS = string.ascii_uppercase
|
||||
ALPHA_PASS_CHARS = ALPHA_LOWER_PASS_CHARS + ALPHA_UPPER_PASS_CHARS
|
||||
@ -39,6 +44,7 @@ if not os.environ.get('NO_VAULTPASS_ENVS'):
|
||||
EDITOR = os.environ.get('EDITOR', EDITOR)
|
||||
SELECTED_GPG_HOMEDIR = os.environ.get('GNUPGHOME', GPG_HOMEDIR)
|
||||
PASS_DIR = os.environ.get('PASSWORD_STORE_DIR', PASS_DIR)
|
||||
SELECTED_DEFAULT_MOUNT = os.environ.get('VAULTPASS_DEFMNT', DEFAULT_MOUNT)
|
||||
|
||||
# These are made more sane.
|
||||
PASS_DIR = os.path.abspath(os.path.expanduser(PASS_DIR))
|
||||
|
@ -2,6 +2,8 @@ import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
##
|
||||
from . import constants
|
||||
##
|
||||
try:
|
||||
# https://www.freedesktop.org/software/systemd/python-systemd/journal.html#journalhandler-class
|
||||
from systemd import journal
|
||||
@ -10,23 +12,20 @@ except ImportError:
|
||||
_has_journald = False
|
||||
|
||||
|
||||
logfile = os.path.abspath(os.path.expanduser('~/.cache/vaultpass/vaultpass.log'))
|
||||
|
||||
|
||||
def prepLogfile(path = logfile):
|
||||
def prepLogfile(path = constants.DEFAULT_LOGFILE):
|
||||
path = os.path.abspath(os.path.expanduser(path))
|
||||
# Set up the permissions beforehand.
|
||||
os.makedirs(os.path.dirname(logfile), exist_ok = True, mode = 0o0700)
|
||||
os.makedirs(os.path.dirname(path), exist_ok = True, mode = 0o0700)
|
||||
if not os.path.isfile(path):
|
||||
# "Touch" it so the next command doesn't fail.
|
||||
with open(path, 'w') as fh:
|
||||
fh.write('')
|
||||
os.chmod(logfile, 0o0600)
|
||||
os.chmod(path, 0o0600)
|
||||
return(path)
|
||||
|
||||
|
||||
_cfg_args = {'handlers': [],
|
||||
'level': logging.DEBUG} # TEMPORARY FOR TESTING
|
||||
'level': constants.DEFAULT_LOGLEVEL}
|
||||
if _has_journald:
|
||||
# There were some weird changes somewhere along the line.
|
||||
try:
|
||||
|
@ -81,6 +81,7 @@ class MountHandler(object):
|
||||
self.xml = mounts_xml
|
||||
self.mounts = {}
|
||||
self.paths = {}
|
||||
self.flatpaths = set()
|
||||
self.getSysMounts()
|
||||
|
||||
def createMount(self, mount_name, mount_type = 'kv2'):
|
||||
@ -184,6 +185,9 @@ class MountHandler(object):
|
||||
_logger.debug('The version parameter ({0}) must be an integer or None'.format(version))
|
||||
raise ValueError('version parameter must be an integer or None')
|
||||
handler = self.client.secrets.kv.v2
|
||||
self.flatpaths.add(mount)
|
||||
flatpath = path.rstrip('/')
|
||||
self.flatpaths.add('/'.join((mount, flatpath)))
|
||||
if mount not in self.paths.keys():
|
||||
self.paths[mount] = {}
|
||||
try:
|
||||
|
@ -1 +0,0 @@
|
||||
import os
|
Reference in New Issue
Block a user