This repository has been archived on 2022-01-23. You can view files and clone it, but cannot push or open issues or pull requests.
vaultpass/vaultpass/auth.py

189 lines
6.2 KiB
Python

import logging
import os
import warnings
##
import hvac
_logger = logging.getLogger()
class _AuthBase(object):
name = '_AuthBase'
client = None
def __init__(self, uri, auth_xml, *args, **kwargs):
self.uri = uri
self.xml = auth_xml
_logger.debug('Intialized instance of {0}'.format(self.name))
def authCheck(self):
if not self.client.is_authenticated():
_logger.debug('Could not authenticate to {0} using {1}.'.format(self.uri, self.name))
_logger.error('Could not authenticate')
raise RuntimeError('Could not authenticate')
return(None)
def getClient(self):
pass # Dummy/placeholder func.
return(None)
class _BasicAuthBase(_AuthBase):
name = '_BasicAuthBase'
client = None
username = None
password = None
mount = None
def __init__(self, uri, auth_xml, default_mountpoint = 'userpass', *args, **kwargs):
super().__init__(uri, auth_xml, *args, **kwargs)
self.default_mountpoint = default_mountpoint
self.setCreds()
def setCreds(self):
self.username = self.xml.find('.//username').text
_logger.debug('Set username: {0}'.format(self.username))
self.password = self.xml.find('.//password').text
_logger.debug('Set password: {0}'.format(self.password))
_mntpt = self.xml.find('.//mountPoint')
if _mntpt is not None:
self.mount = _mntpt.text
else:
self.mount = self.default_mountpoint
_logger.debug('Set mountpoint: {0}'.format(self.mount))
self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
return(None)
class AppRole(_AuthBase):
name = 'AppRole'
config_name = 'appRole'
role = None
secret = None
def __init__(self, uri, auth_xml, *args, **kwargs):
super().__init__(uri, auth_xml, *args, **kwargs)
self.getClient()
def getClient(self):
self.role = self.xml.find('.//role').text
_logger.debug('Set role: {0}'.format(self.role))
self.secret = self.xml.find('.//secret').text
_logger.debug('Set secret: {0}'.format(self.secret))
self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
self.client.auth_approle(self.role, secret_id = self.secret)
_logger.debug('Attempted to authenticate client.')
self.authCheck()
return(None)
class LDAP(_BasicAuthBase):
name = 'LDAP'
config_name = 'ldap'
def __init__(self, uri, auth_xml, *args, **kwargs):
super().__init__(uri, auth_xml, default_mountpoint = 'ldap', *args, **kwargs)
self.getClient()
def getClient(self):
self.client.auth.ldap.login(username = self.username,
password = self.password,
mount_point = self.mount)
_logger.debug('Attempted to authenticate client.')
self.authCheck()
return(None)
class Token(_AuthBase):
name = 'Token'
config_name = 'token'
token = None
def __init__(self, uri, auth_xml, *args, **kwargs):
super().__init__(uri, auth_xml, *args, **kwargs)
self.getClient()
def _getEnv(self, env_var):
var = os.environ.get(env_var)
if not var:
_logger.debug(('Environment variable {0} was specified as containing the token '
'but it is empty').format(env_var))
_logger.error('Env var not populated')
raise OSError('Env var not populated')
return(var)
def _getFile(self, fpath):
fpath = os.path.abspath(os.path.expanduser(fpath))
with open(fpath, 'r') as fh:
contents = fh.read().strip()
return(contents)
def getClient(self):
_token = self.xml.text
chk = True
if _token is not None:
self.token = _token
else:
# First we check the attrib.
a = self.xml.attrib.get('source')
if not a:
_exhausted = False
# try, in order, env var and then ~/.vault-token
while not _exhausted:
try:
self._getEnv('VAULT_TOKEN')
break
except OSError as e:
pass
try:
self._getFile('~/.vault-token')
_exhausted = True
except Exception as e:
_exhausted = True
if not self.token:
_logger.debug(('Unable to automatically determine token from '
'environment variable or filesystem defaults. Ignore this if you are initializing '
'Vault.'))
_logger.warning('Cannot determine token')
warnings.warn('Cannot determine token')
chk = False
else:
if a.startswith('env:'):
e = a.split(':', 1)
self.token = self._getEnv(e)
else:
self.token = self._getFile(a)
_logger.debug('Set token: {0}'.format(self.token))
self.client = hvac.Client(url = self.uri)
_logger.info('Initialized client.')
self.client.token = self.token
_logger.debug('Applied token.')
if chk:
self.authCheck()
return(None)
class UserPass(_BasicAuthBase):
name = 'UserPass'
config_name = 'userpass'
def __init__(self, uri, auth_xml, *args, **kwargs):
super().__init__(uri, auth_xml, default_mountpoint = 'userpass', *args, **kwargs)
self.getClient()
def getClient(self):
resp = self.client.auth.userpass.login(username = self.username,
password = self.password,
mount_point = self.mount)
_logger.debug('Attempted to authenticate client.')
try:
self.client.token = resp['auth']['client_token']
except KeyError:
# Auth failed. We'll let authCheck() handle the error.
pass
self.authCheck()
return(None)