This repository has been archived on 2022-01-23. You can view files and clone it, but cannot push or open issues or pull requests.

298 lines
13 KiB
Raw Normal View History

import copy
import logging
import re
2020-04-03 18:27:43 -04:00
import shutil
import time
import warnings
import dpath.util #
import hvac.exceptions
from . import constants
_logger = logging.getLogger()
_mount_re = re.compile(r'^(?P<mount>.*)/$')
_subpath_re = re.compile(r'^/?(?P<path>.*)/$')
_kv_re = re.compile(r'^kv(?:-v)?(?P<version>[0-9]+)$')
# TODO: for all write operations, modify handler call to first check if path exists and patch if it does?
class CubbyHandler(object):
# There is no upstream support for directly reading cubby. So we do it ourselves.
# TODO: custom class/handler?
def __init__(self, client):
self.client = client
2020-04-07 02:46:19 -04:00
def create_or_update_secret(self, *args, **kwargs):
# Alias function
return(self.write_secret(*args, **kwargs))
2020-04-12 22:54:03 -04:00
def list_secrets(self, path, mount_point = 'cubbyhole', *args, **kwargs):
path = path.lstrip('/')
2020-04-12 22:54:03 -04:00
uri = 'v1/{0}/{1}'.format(mount_point, path)
resp = self.client._adapter.list(url = uri)
2020-04-12 22:54:03 -04:00
def read_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
path = path.lstrip('/')
2020-04-12 22:54:03 -04:00
uri = 'v1/{0}/{1}'.format(mount_point, path)
resp = self.client._adapter.get(url = uri)
2020-04-09 15:47:16 -04:00
def remove_secret(self, path, mount_point = 'cubbyhole', *args, **kwargs):
path = path.lstrip('/')
2020-04-12 22:54:03 -04:00
uri = 'v1/{0}/{1}'.format(mount_point, path)
2020-04-09 15:47:16 -04:00
resp = self.client._adapter.delete(url = uri)
2020-04-12 22:54:03 -04:00
def update_secret(self, secret, path, mount_point = 'cubbyhole', *args, **kwargs):
existing = self.read_secret(path, mount_point)
data = existing.get('data')
if not data:
resp = self.write_secret(path, secret, mount_point = mount_point)
self.remove_secret(path, mount_point)
resp = self.write_secret(path, data, mount_point)
2020-04-07 02:46:19 -04:00
def write_secret(self, path, secret, mount_point = 'cubbyhole', *args, **kwargs):
path = path.lstrip('/')
2020-04-12 22:54:03 -04:00
args = {'path': 'v1/{0}'.format('/'.join((mount_point, path)))}
for k, v in secret.items():
if k in args.keys():
_logger.error('Cannot use reserved secret name')
_logger.debug('Cannot use secret name {0} as it is reserved'.format(k))
raise ValueError('Cannot use reserved secret name')
args[k] = v
resp = self.client.write(**args)
2020-03-29 23:13:41 -04:00
class MountHandler(object):
internal_mounts = ('identity', 'sys')
2020-03-29 23:13:41 -04:00
def __init__(self, client, mounts_xml = None):
self.client = client
self.cubbyhandler = CubbyHandler(self.client)
self.xml = mounts_xml
self.mounts = {}
self.paths = {}
2020-03-29 23:13:41 -04:00
def createMount(self, mount_name, mount_type = 'kv2'):
orig_mtype = mount_type
if mount_type not in constants.SUPPORTED_ENGINES:
_logger.error('Invalid mount type')
_logger.debug(('The mount type {0} is invalid. '
'It must be one of: {1}').format(mount_type, ', '.join(constants.SUPPORTED_ENGINES)))
raise ValueError('Invalid mount type')
options = {}
r =
if r:
mount_type = 'kv'
options['version'] = r.groupdict()['version']
created = False
path = mount_name,
description = 'Created automatically by VaultPass',
options = options)
created = True
except hvac.exceptions.InvalidPath as e:
_logger.error('Invalid path')
_logger.debug('The mount path {0} (type {1}) is invalid: {2}'.format(mount_name, orig_mtype, e))
raise ValueError('Invalid path')
except hvac.exceptions.InvalidRequest as e:
_logger.error('Invalid request; does mount already exist?')
_logger.debug(('The creation of mount path {0} (type {1}) generated an invalid request: '
'{2}. Does it already exist?').format(mount_name, orig_mtype, e))
# Due to how KV2 is created, we can hit a timing/race condition.
if orig_mtype == 'kv2' and created:
def getMountType(self, mount):
if not self.mounts:
mtype = self.mounts.get(mount)
if not mtype:
_logger.error('Mount not found in defined mounts')
_logger.debug('The mount {0} was not found in the defined mounts.'.format(mount))
raise ValueError('Mount not found in defined mounts')
2020-04-07 02:46:19 -04:00
def getPath(self, path, mount):
relpath = path.lstrip('/')
fullpath = '/'.join((mount, relpath))
if not self.paths:
obj = dpath.util.get(self.paths, fullpath, None)
def getSecretNames(self, path, mount, version = None):
reader = None
mtype = self.getMountType(mount)
secrets_list = []
keypath = ['data']
args = {'path': path,
'mount_point': mount}
if mtype == 'cubbyhole':
reader = self.cubbyhandler.read_secret
elif mtype == 'kv1':
reader = self.client.secrets.kv.v1.read_secret
elif mtype == 'kv2':
if not any(((version is None), isinstance(version, int))):
_logger.error('version parameter must be an integer or None')
_logger.debug('The version parameter ({0}) must be an integer or None'.format(version))
raise ValueError('version parameter must be an integer or None')
reader = self.client.secrets.kv.v2.read_secret_version
args['version'] = version
keypath = ['data', 'data']
data = reader(**args)
# secrets_list = list(data.get('data', {}).keys())
secrets_list = list(dpath.util.get(data, keypath, {}).keys())
except (KeyError, TypeError):
secrets_list = []
def getSecretsTree(self, path = '/', mounts = None, version = None):
if not mounts:
mounts = self.mounts
if isinstance(mounts, dict):
mounts = list(mounts.keys())
if not isinstance(mounts, list):
mounts = [mounts]
for mount in mounts:
mtype = self.getMountType(mount)
handler = None
args = {'path': path,
'mount_point': mount}
relpath = path.replace('//', '/').lstrip('/')
fullpath = '/'.join((mount, relpath)).replace('//', '/').lstrip('/')
if mtype == 'cubbyhole':
handler = self.cubbyhandler
elif mtype == 'kv1':
handler = self.client.secrets.kv.v1
elif mtype == 'kv2':
if not any(((version is None), isinstance(version, int))):
_logger.error('version parameter must be an integer or None')
_logger.debug('The version parameter ({0}) must be an integer or None'.format(version))
raise ValueError('version parameter must be an integer or None')
handler = self.client.secrets.kv.v2
if mount not in self.paths.keys():
self.paths[mount] = {}
2020-04-01 11:37:21 -04:00
_logger.debug('Fetching path {0} on mount {1}...'.format(path, mount))
paths = handler.list_secrets(**args)
except hvac.exceptions.InvalidPath:
2020-04-01 11:37:21 -04:00
# It's a secret name or doesn't exist.
_logger.debug('Path {0} on mount {1} is a secret, not a subdir.'.format(path, mount)), fullpath, self.getSecretNames(path, mount, version = version))
# if 'data' not in paths.keys() or 'keys' not in paths['data'].keys():
paths_list = paths['data']['keys']
except (KeyError, TypeError):
_logger.warning('Mount has no secrets/subdirs')
_logger.debug('The mount {0} has no secrets or subdirectories'.format(mount))
warnings.warn('Mount has no secrets/subdirs')
for p in paths_list:
p_relpath = '/'.join((relpath, p)).replace('//', '/').lstrip('/')
p_fullpath = '/'.join((fullpath, p)).replace('//', '/').lstrip('/')
_logger.debug(('Recursing getSecretsTree. '
'path={0} '
'fullpath={1} '
'relpath={2} '
'p={3} '
'p_relpath={4} '
self.getSecretsTree(path = p_relpath, mounts = mount)
2020-03-29 23:13:41 -04:00
def getSysMounts(self):
for mount, mount_info in self.client.sys.list_mounted_secrets_engines()['data'].items():
r =
if r:
mount ='mount')
if mount in self.internal_mounts:
# Get the mount type.
mtype = mount_info['type']
if mtype == 'kv':
mntopts = mount_info['options']
if mntopts and isinstance(mntopts, dict):
mver = mntopts.get('version')
if mver == '2':
mtype = 'kv2'
elif mver == '1':
mtype = 'kv1'
self.mounts[mount] = mtype
_logger.debug('Added mountpoint {0} to mounts list with type {1}'.format(mount, mtype))
except hvac.exceptions.Forbidden:
_logger.warning('Client does not have permission to read /sys/mounts.')
# TODO: should I blindly merge in instead?
if self.xml:
for mount in self.xml.findall('.//mount'):
mname = mount.text
mtype = mount.attrib.get('type', 'kv2')
if mname not in self.mounts.keys():
self.mounts[mname] = mtype
_logger.debug('Added mountpoint {0} to mounts list with type {1}'.format(mount, mtype))
2020-03-29 23:13:41 -04:00
2020-04-14 14:33:33 -04:00
def printer(self, path = '/', mounts = None, output = None, indent = 4):
2020-04-03 18:27:43 -04:00
# def treePrint(obj, s = 'Password Store\n', level = 0):
# prefix = '├──'
# leading_prefix = '│'
# last_prefix = '└──'
# pass
# return(s)
if output:
output = output.lower()
2020-04-14 14:33:33 -04:00
if output and output not in constants.SUPPORTED_OUTPUT_FORMATS:
_logger.error('Invalid output format')
2020-04-14 14:33:33 -04:00
_logger.debug(('The output parameter ("{0}") must be one of: '
'{0}, or None').format(output, ', '.join(constants.SUPPORTED_OUTPUT_FORMATS)))
raise ValueError('Invalid output format')
if output in ('pretty', 'yaml', 'json'):
if not any(((indent is None), isinstance(indent, int))):
_logger.error('indent parameter must be an integer or None')
_logger.debug('The indent parameter ({0}) must be an integer or None'.format(indent))
raise ValueError('indent parameter must be an integer or None')
if not self.paths:
2020-04-14 14:33:33 -04:00
_paths = {}
if not mounts:
mounts = self.mounts.keys()
for m in mounts:
_paths[m] = self.getPath(path, m)
if output == 'json':
import json
2020-04-14 14:33:33 -04:00
return(json.dumps(_paths, indent = indent))
elif output == 'yaml':
import yaml #
# import pyaml #
2020-04-14 14:33:33 -04:00
return(yaml.dump(_paths, indent = indent))
elif output == 'pretty':
import pprint
if indent is None:
indent = 1
2020-04-14 14:33:33 -04:00
return(pprint.pformat(_paths, indent = indent, width = shutil.get_terminal_size((80, 20)).columns))
# elif output == 'tree':
2020-04-02 14:55:26 -04:00
# import tree # TODO? Wayyy later.
elif not output:
2020-04-14 14:33:33 -04:00