302 lines
13 KiB
Python
302 lines
13 KiB
Python
import copy
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import time
|
|
import warnings
|
|
##
|
|
import dpath.util # https://pypi.org/project/dpath/
|
|
import hvac.exceptions
|
|
##
|
|
from . import constants
|
|
|
|
|
|
_logger = logging.getLogger()
|
|
_mount_re = re.compile(r'^(?P<mount>.*)/$')
|
|
_subpath_re = re.compile(r'^/?(?P<path>.*)/$')
|
|
_kv_re = re.compile(r'^kv(?:-v)?(?P<version>[0-9]+)$')
|
|
|
|
|
|
# TODO: for all write operations, modify handler call to first check if path exists and patch if it does?
|
|
|
|
|
|
class CubbyHandler(object):
|
|
# There is no upstream support for directly reading cubby. So we do it ourselves.
|
|
# TODO: custom class/handler? https://hvac.readthedocs.io/en/stable/advanced_usage.html#custom-requests-http-adapter
|
|
def __init__(self, client):
|
|
self.client = client
|
|
|
|
def create_or_update_secret(self, *args, **kwargs):
|
|
# Alias function
|
|
return(self.write_secret(*args, **kwargs))
|
|
|
|
def list_secrets(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
|
path = path.lstrip('/')
|
|
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
|
resp = self.client._adapter.list(url = uri)
|
|
return(resp.json())
|
|
|
|
def read_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
|
path = path.lstrip('/')
|
|
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
|
resp = self.client._adapter.get(url = uri)
|
|
return(resp.json())
|
|
|
|
def remove_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
|
|
path = path.lstrip('/')
|
|
uri = 'v1/{0}/{1}'.format(mount_point, path)
|
|
resp = self.client._adapter.delete(url = uri)
|
|
return(resp.json())
|
|
|
|
def update_secret(self, secret, path, mount_point = 'cubbyhole', *args, **kwargs):
|
|
existing = self.read_secret(path, mount_point)
|
|
data = existing.get('data')
|
|
if not data:
|
|
resp = self.write_secret(path, secret, mount_point = mount_point)
|
|
else:
|
|
data.update(secret)
|
|
self.remove_secret(path, mount_point)
|
|
resp = self.write_secret(path, data, mount_point)
|
|
return(resp)
|
|
|
|
def write_secret(self, path, secret, mount_point = 'cubbyhole', *args, **kwargs):
|
|
path = path.lstrip('/')
|
|
args = {'path': 'v1/{0}'.format('/'.join((mount_point, path)))}
|
|
for k, v in secret.items():
|
|
if k in args.keys():
|
|
_logger.error('Cannot use reserved secret name')
|
|
_logger.debug('Cannot use secret name {0} as it is reserved'.format(k))
|
|
raise ValueError('Cannot use reserved secret name')
|
|
args[k] = v
|
|
resp = self.client.write(**args)
|
|
return(resp)
|
|
|
|
|
|
class MountHandler(object):
|
|
internal_mounts = ('identity', 'sys')
|
|
|
|
def __init__(self, client, mounts_xml = None):
|
|
self.client = client
|
|
self.cubbyhandler = CubbyHandler(self.client)
|
|
self.xml = mounts_xml
|
|
self.mounts = {}
|
|
self.paths = {}
|
|
self.flatpaths = set()
|
|
self.getSysMounts()
|
|
|
|
def createMount(self, mount_name, mount_type = 'kv2'):
|
|
orig_mtype = mount_type
|
|
if mount_type not in constants.SUPPORTED_ENGINES:
|
|
_logger.error('Invalid mount type')
|
|
_logger.debug(('The mount type {0} is invalid. '
|
|
'It must be one of: {1}').format(mount_type, ', '.join(constants.SUPPORTED_ENGINES)))
|
|
raise ValueError('Invalid mount type')
|
|
options = {}
|
|
r = _kv_re.search(mount_type)
|
|
if r:
|
|
mount_type = 'kv'
|
|
options['version'] = r.groupdict()['version']
|
|
created = False
|
|
try:
|
|
self.client.sys.enable_secrets_engine(mount_type,
|
|
path = mount_name,
|
|
description = 'Created automatically by VaultPass',
|
|
options = options)
|
|
created = True
|
|
except hvac.exceptions.InvalidPath as e:
|
|
_logger.error('Invalid path')
|
|
_logger.debug('The mount path {0} (type {1}) is invalid: {2}'.format(mount_name, orig_mtype, e))
|
|
raise ValueError('Invalid path')
|
|
except hvac.exceptions.InvalidRequest as e:
|
|
_logger.error('Invalid request; does mount already exist?')
|
|
_logger.debug(('The creation of mount path {0} (type {1}) generated an invalid request: '
|
|
'{2}. Does it already exist?').format(mount_name, orig_mtype, e))
|
|
# Due to how KV2 is created, we can hit a timing/race condition.
|
|
if orig_mtype == 'kv2' and created:
|
|
time.sleep(2)
|
|
return(created)
|
|
|
|
def getMountType(self, mount):
|
|
if not self.mounts:
|
|
self.getSysMounts()
|
|
mtype = self.mounts.get(mount)
|
|
if not mtype:
|
|
_logger.error('Mount not found in defined mounts')
|
|
_logger.debug('The mount {0} was not found in the defined mounts.'.format(mount))
|
|
raise ValueError('Mount not found in defined mounts')
|
|
return(mtype)
|
|
|
|
def getPath(self, path, mount):
|
|
relpath = path.lstrip('/')
|
|
fullpath = '/'.join((mount, relpath))
|
|
if not self.paths:
|
|
self.getSecretsTree()
|
|
obj = dpath.util.get(self.paths, fullpath, None)
|
|
return(obj)
|
|
|
|
def getSecretNames(self, path, mount, version = None):
|
|
reader = None
|
|
mtype = self.getMountType(mount)
|
|
secrets_list = []
|
|
keypath = ['data']
|
|
args = {'path': path,
|
|
'mount_point': mount}
|
|
if mtype == 'cubbyhole':
|
|
reader = self.cubbyhandler.read_secret
|
|
elif mtype == 'kv1':
|
|
reader = self.client.secrets.kv.v1.read_secret
|
|
elif mtype == 'kv2':
|
|
if not any(((version is None), isinstance(version, int))):
|
|
_logger.error('version parameter must be an integer or None')
|
|
_logger.debug('The version parameter ({0}) must be an integer or None'.format(version))
|
|
raise ValueError('version parameter must be an integer or None')
|
|
reader = self.client.secrets.kv.v2.read_secret_version
|
|
args['version'] = version
|
|
keypath = ['data', 'data']
|
|
data = reader(**args)
|
|
try:
|
|
# secrets_list = list(data.get('data', {}).keys())
|
|
secrets_list = list(dpath.util.get(data, keypath, {}).keys())
|
|
except (KeyError, TypeError):
|
|
secrets_list = []
|
|
return(secrets_list)
|
|
|
|
def getSecretsTree(self, path = '/', mounts = None, version = None):
|
|
if not mounts:
|
|
mounts = self.mounts
|
|
if isinstance(mounts, dict):
|
|
mounts = list(mounts.keys())
|
|
if not isinstance(mounts, list):
|
|
mounts = [mounts]
|
|
for mount in mounts:
|
|
mtype = self.getMountType(mount)
|
|
handler = None
|
|
args = {'path': path,
|
|
'mount_point': mount}
|
|
relpath = path.replace('//', '/').lstrip('/')
|
|
fullpath = '/'.join((mount, relpath)).replace('//', '/').lstrip('/')
|
|
if mtype == 'cubbyhole':
|
|
handler = self.cubbyhandler
|
|
elif mtype == 'kv1':
|
|
handler = self.client.secrets.kv.v1
|
|
elif mtype == 'kv2':
|
|
if not any(((version is None), isinstance(version, int))):
|
|
_logger.error('version parameter must be an integer or None')
|
|
_logger.debug('The version parameter ({0}) must be an integer or None'.format(version))
|
|
raise ValueError('version parameter must be an integer or None')
|
|
handler = self.client.secrets.kv.v2
|
|
self.flatpaths.add(mount)
|
|
flatpath = path.rstrip('/')
|
|
self.flatpaths.add('/'.join((mount, flatpath)))
|
|
if mount not in self.paths.keys():
|
|
self.paths[mount] = {}
|
|
try:
|
|
_logger.debug('Fetching path {0} on mount {1}...'.format(path, mount))
|
|
paths = handler.list_secrets(**args)
|
|
except hvac.exceptions.InvalidPath:
|
|
# It's a secret name or doesn't exist.
|
|
_logger.debug('Path {0} on mount {1} is a secret, not a subdir.'.format(path, mount))
|
|
dpath.util.new(self.paths, fullpath, self.getSecretNames(path, mount, version = version))
|
|
continue
|
|
# if 'data' not in paths.keys() or 'keys' not in paths['data'].keys():
|
|
try:
|
|
paths_list = paths['data']['keys']
|
|
except (KeyError, TypeError):
|
|
_logger.warning('Mount has no secrets/subdirs')
|
|
_logger.debug('The mount {0} has no secrets or subdirectories'.format(mount))
|
|
warnings.warn('Mount has no secrets/subdirs')
|
|
continue
|
|
for p in paths_list:
|
|
p_relpath = '/'.join((relpath, p)).replace('//', '/').lstrip('/')
|
|
p_fullpath = '/'.join((fullpath, p)).replace('//', '/').lstrip('/')
|
|
_logger.debug(('Recursing getSecretsTree. '
|
|
'path={0} '
|
|
'fullpath={1} '
|
|
'relpath={2} '
|
|
'p={3} '
|
|
'p_relpath={4} '
|
|
'p_fullpath={5}').format(path,
|
|
fullpath,
|
|
relpath,
|
|
p,
|
|
p_relpath,
|
|
p_fullpath))
|
|
self.getSecretsTree(path = p_relpath, mounts = mount)
|
|
return(None)
|
|
|
|
def getSysMounts(self):
|
|
try:
|
|
for mount, mount_info in self.client.sys.list_mounted_secrets_engines()['data'].items():
|
|
r = _mount_re.search(mount)
|
|
if r:
|
|
mount = r.group('mount')
|
|
if mount in self.internal_mounts:
|
|
continue
|
|
# Get the mount type.
|
|
mtype = mount_info['type']
|
|
if mtype == 'kv':
|
|
mntopts = mount_info['options']
|
|
if mntopts and isinstance(mntopts, dict):
|
|
mver = mntopts.get('version')
|
|
if mver == '2':
|
|
mtype = 'kv2'
|
|
elif mver == '1':
|
|
mtype = 'kv1'
|
|
self.mounts[mount] = mtype
|
|
_logger.debug('Added mountpoint {0} to mounts list with type {1}'.format(mount, mtype))
|
|
except hvac.exceptions.Forbidden:
|
|
_logger.warning('Client does not have permission to read /sys/mounts.')
|
|
# TODO: should I blindly merge in instead?
|
|
if self.xml:
|
|
for mount in self.xml.findall('.//mount'):
|
|
mname = mount.text
|
|
mtype = mount.attrib.get('type', 'kv2')
|
|
if mname not in self.mounts.keys():
|
|
self.mounts[mname] = mtype
|
|
_logger.debug('Added mountpoint {0} to mounts list with type {1}'.format(mount, mtype))
|
|
return(None)
|
|
|
|
def printer(self, path = '/', mounts = None, output = None, indent = 4):
|
|
# def treePrint(obj, s = 'Password Store\n', level = 0):
|
|
# prefix = '├──'
|
|
# leading_prefix = '│'
|
|
# last_prefix = '└──'
|
|
# pass
|
|
# return(s)
|
|
if output:
|
|
output = output.lower()
|
|
if output and output not in constants.SUPPORTED_OUTPUT_FORMATS:
|
|
_logger.error('Invalid output format')
|
|
_logger.debug(('The output parameter ("{0}") must be one of: '
|
|
'{0}, or None').format(output, ', '.join(constants.SUPPORTED_OUTPUT_FORMATS)))
|
|
raise ValueError('Invalid output format')
|
|
if output in ('pretty', 'yaml', 'json'):
|
|
if not any(((indent is None), isinstance(indent, int))):
|
|
_logger.error('indent parameter must be an integer or None')
|
|
_logger.debug('The indent parameter ({0}) must be an integer or None'.format(indent))
|
|
raise ValueError('indent parameter must be an integer or None')
|
|
if not self.paths:
|
|
self.getSecretsTree()
|
|
_paths = {}
|
|
if not mounts:
|
|
mounts = self.mounts.keys()
|
|
for m in mounts:
|
|
_paths[m] = self.getPath(path, m)
|
|
if output == 'json':
|
|
import json
|
|
return(json.dumps(_paths, indent = indent))
|
|
elif output == 'yaml':
|
|
import yaml # https://pypi.org/project/PyYAML/
|
|
# import pyaml # https://pypi.python.org/pypi/pyaml
|
|
return(yaml.dump(_paths, indent = indent))
|
|
elif output == 'pretty':
|
|
import pprint
|
|
if indent is None:
|
|
indent = 1
|
|
return(pprint.pformat(_paths, indent = indent, width = shutil.get_terminal_size((80, 20)).columns))
|
|
# elif output == 'tree':
|
|
# import tree # TODO? Wayyy later.
|
|
elif not output:
|
|
return(str(_paths))
|
|
return(None)
|