add ability to specify the mounts since the default policy doesn't expose them except for via the UI.

This commit is contained in:
brent s. 2020-03-30 01:59:36 -04:00
parent 7e839f7058
commit 731e0b4ce8
Signed by: bts
GPG Key ID: 8C004C2F93481F6B
4 changed files with 70 additions and 10 deletions

View File

@ -10,4 +10,12 @@
<unseal>WU9VUiBVTlNFQUwgU0hBUkQgSEVSRQo=</unseal> <unseal>WU9VUiBVTlNFQUwgU0hBUkQgSEVSRQo=</unseal>
</server> </server>
<authGpg>~/.vaultcreds.xml.gpg</authGpg> <authGpg>~/.vaultcreds.xml.gpg</authGpg>
<!-- Optional, but probably a *very* good idea if you don't have read/list access to /sys/mounts as you won't
be able to iterate through available mounts if not. -->
<!-- Default type if not specified is kv2. -->
<mounts>
<mount type="kv">secrets_legacy</mount>
<mount type="kv2">secrets</mount>
<mount type="cubbyhole">cubbyhole</mount>
</mounts>
</vaultpass> </vaultpass>

View File

@ -13,12 +13,14 @@ class PassMan(object):
client = None client = None
auth = None auth = None
uri = None uri = None
mount = None


def __init__(self, cfg = '~/.config/vaultpass.xml'): def __init__(self, cfg = '~/.config/vaultpass.xml'):
self.cfg = config.getConfig(cfg) self.cfg = config.getConfig(cfg)
self._getURI() self._getURI()
self.getClient() self.getClient()
self._checkSeal() self._checkSeal()
self._getMount()


def _checkSeal(self): def _checkSeal(self):
_logger.debug('Checking and attempting unseal if necessary and possible.') _logger.debug('Checking and attempting unseal if necessary and possible.')
@ -38,6 +40,10 @@ class PassMan(object):
raise RuntimeError('Unable to unseal') raise RuntimeError('Unable to unseal')
return(None) return(None)


def _getMount(self):
# TODO: mounts xml?
self.mount = mounts.MountHandler(self.client)

def _getURI(self): def _getURI(self):
uri = self.cfg.xml.find('.//uri') uri = self.cfg.xml.find('.//uri')
if uri is None: if uri is None:

View File

@ -21,6 +21,7 @@ class _AuthBase(object):
_logger.debug('Could not authenticate to {0} using {1}.'.format(self.uri, self.name)) _logger.debug('Could not authenticate to {0} using {1}.'.format(self.uri, self.name))
_logger.error('Could not authenticate') _logger.error('Could not authenticate')
raise RuntimeError('Could not authenticate') raise RuntimeError('Could not authenticate')
return(None)


def getClient(self): def getClient(self):
pass # Dummy/placeholder func. pass # Dummy/placeholder func.
@ -40,14 +41,18 @@ class _BasicAuthBase(_AuthBase):
self.setCreds() self.setCreds()


def setCreds(self): def setCreds(self):
self.username = self.xml.find('username').text self.username = self.xml.find('.//username').text
self.password = self.xml.find('password').text _logger.debug('Set username: {0}'.format(self.username))
_mntpt = self.xml.find('mountPoint') self.password = self.xml.find('.//password').text
_logger.debug('Set password: {0}'.format(self.password))
_mntpt = self.xml.find('.//mountPoint')
if _mntpt is not None: if _mntpt is not None:
self.mount = _mntpt.text self.mount = _mntpt.text
else: else:
self.mount = self.default_mountpoint self.mount = self.default_mountpoint
_logger.debug('Set mountpoint: {0}'.format(self.mount))
self.client = hvac.Client(url = self.uri) self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
return(None) return(None)




@ -62,10 +67,14 @@ class AppRole(_AuthBase):
self.getClient() self.getClient()


def getClient(self): def getClient(self):
self.role = self.xml.find('role').text self.role = self.xml.find('.//role').text
self.secret = self.xml.find('secret').text _logger.debug('Set role: {0}'.format(self.role))
self.secret = self.xml.find('.//secret').text
_logger.debug('Set secret: {0}'.format(self.secret))
self.client = hvac.Client(url = self.uri) self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
self.client.auth_approle(self.role, secret_id = self.secret) self.client.auth_approle(self.role, secret_id = self.secret)
_logger.debug('Attempted to authenticate client.')
self.authCheck() self.authCheck()
return(None) return(None)


@ -82,6 +91,7 @@ class LDAP(_BasicAuthBase):
self.client.auth.ldap.login(username = self.username, self.client.auth.ldap.login(username = self.username,
password = self.password, password = self.password,
mount_point = self.mount) mount_point = self.mount)
_logger.debug('Attempted to authenticate client.')
self.authCheck() self.authCheck()
return(None) return(None)


@ -141,8 +151,11 @@ class Token(_AuthBase):
self.token = self._getEnv(e) self.token = self._getEnv(e)
else: else:
self.token = self._getFile(a) self.token = self._getFile(a)
_logger.debug('Set token: {0}'.format(self.token))
self.client = hvac.Client(url = self.uri) self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
self.client.token = self.token self.client.token = self.token
_logger.debug('Applied token.')
self.authCheck() self.authCheck()
return(None) return(None)


@ -156,8 +169,14 @@ class UserPass(_BasicAuthBase):
self.getClient() self.getClient()


def getClient(self): def getClient(self):
self.client.auth.userpass.login(username = self.username, resp = self.client.auth.userpass.login(username = self.username,
password = self.password, password = self.password,
mount_point = self.mount) mount_point = self.mount)
_logger.debug('Attempted to authenticate client.')
try:
self.client.token = resp['auth']['client_token']
except KeyError:
# Auth failed. We'll let authCheck() handle the error.
pass
self.authCheck() self.authCheck()
return(None) return(None)

View File

@ -1,10 +1,37 @@
import logging
import re
import warnings
##
import hvac.exceptions


_logger = logging.getLogger()
_mount_re = re.compile(r'^(?P<mount>.*)/')


class MountHandler(object): class MountHandler(object):
internal_mounts = ('identity', 'sys')

def __init__(self, client, mounts_xml = None): def __init__(self, client, mounts_xml = None):
self.client = client self.client = client
self.mounts = {} self.xml = mounts_xml
self.mounts = []


def getSysMounts(self): def getSysMounts(self):
pass try:
for mount, mount_info in self.client.sys.list_mounted_secrets_engines()['data'].items():
r = _mount_re.search(mount)
if r:
mount = r.group('mount')
if mount in self.internal_mounts:
continue
self.mounts.append(mount)
_logger.debug('Added mountpoint to mounts list: {0}'.format(mount))
except hvac.exceptions.Forbidden:
_logger.warning('Client does not have permission to read /sys/mounts.')
# TODO: xml parsing

return(None)


def print(self): def print(self):
pass pass