189 lines
6.2 KiB
Python
189 lines
6.2 KiB
Python
import logging
|
|
import os
|
|
import warnings
|
|
##
|
|
import hvac
|
|
|
|
|
|
_logger = logging.getLogger()
|
|
|
|
|
|
class _AuthBase(object):
|
|
name = '_AuthBase'
|
|
client = None
|
|
|
|
def __init__(self, uri, auth_xml, *args, **kwargs):
|
|
self.uri = uri
|
|
self.xml = auth_xml
|
|
_logger.debug('Intialized instance of {0}'.format(self.name))
|
|
|
|
def authCheck(self):
|
|
if not self.client.is_authenticated():
|
|
_logger.debug('Could not authenticate to {0} using {1}.'.format(self.uri, self.name))
|
|
_logger.error('Could not authenticate')
|
|
raise RuntimeError('Could not authenticate')
|
|
return(None)
|
|
|
|
def getClient(self):
|
|
pass # Dummy/placeholder func.
|
|
return(None)
|
|
|
|
|
|
class _BasicAuthBase(_AuthBase):
|
|
name = '_BasicAuthBase'
|
|
client = None
|
|
username = None
|
|
password = None
|
|
mount = None
|
|
|
|
def __init__(self, uri, auth_xml, default_mountpoint = 'userpass', *args, **kwargs):
|
|
super().__init__(uri, auth_xml, *args, **kwargs)
|
|
self.default_mountpoint = default_mountpoint
|
|
self.setCreds()
|
|
|
|
def setCreds(self):
|
|
self.username = self.xml.find('.//username').text
|
|
_logger.debug('Set username: {0}'.format(self.username))
|
|
self.password = self.xml.find('.//password').text
|
|
_logger.debug('Set password: {0}'.format(self.password))
|
|
_mntpt = self.xml.find('.//mountPoint')
|
|
if _mntpt is not None:
|
|
self.mount = _mntpt.text
|
|
else:
|
|
self.mount = self.default_mountpoint
|
|
_logger.debug('Set mountpoint: {0}'.format(self.mount))
|
|
self.client = hvac.Client(url = self.uri)
|
|
_logger.info('Initialized client.')
|
|
return(None)
|
|
|
|
|
|
class AppRole(_AuthBase):
|
|
name = 'AppRole'
|
|
config_name = 'appRole'
|
|
role = None
|
|
secret = None
|
|
|
|
def __init__(self, uri, auth_xml, *args, **kwargs):
|
|
super().__init__(uri, auth_xml, *args, **kwargs)
|
|
self.getClient()
|
|
|
|
def getClient(self):
|
|
self.role = self.xml.find('.//role').text
|
|
_logger.debug('Set role: {0}'.format(self.role))
|
|
self.secret = self.xml.find('.//secret').text
|
|
_logger.debug('Set secret: {0}'.format(self.secret))
|
|
self.client = hvac.Client(url = self.uri)
|
|
_logger.info('Initialized client.')
|
|
self.client.auth_approle(self.role, secret_id = self.secret)
|
|
_logger.debug('Attempted to authenticate client.')
|
|
self.authCheck()
|
|
return(None)
|
|
|
|
|
|
class LDAP(_BasicAuthBase):
|
|
name = 'LDAP'
|
|
config_name = 'ldap'
|
|
|
|
def __init__(self, uri, auth_xml, *args, **kwargs):
|
|
super().__init__(uri, auth_xml, default_mountpoint = 'ldap', *args, **kwargs)
|
|
self.getClient()
|
|
|
|
def getClient(self):
|
|
self.client.auth.ldap.login(username = self.username,
|
|
password = self.password,
|
|
mount_point = self.mount)
|
|
_logger.debug('Attempted to authenticate client.')
|
|
self.authCheck()
|
|
return(None)
|
|
|
|
|
|
class Token(_AuthBase):
|
|
name = 'Token'
|
|
config_name = 'token'
|
|
token = None
|
|
|
|
def __init__(self, uri, auth_xml, *args, **kwargs):
|
|
super().__init__(uri, auth_xml, *args, **kwargs)
|
|
self.getClient()
|
|
|
|
def _getEnv(self, env_var):
|
|
var = os.environ.get(env_var)
|
|
if not var:
|
|
_logger.debug(('Environment variable {0} was specified as containing the token '
|
|
'but it is empty').format(env_var))
|
|
_logger.error('Env var not populated')
|
|
raise OSError('Env var not populated')
|
|
return(var)
|
|
|
|
def _getFile(self, fpath):
|
|
fpath = os.path.abspath(os.path.expanduser(fpath))
|
|
with open(fpath, 'r') as fh:
|
|
contents = fh.read().strip()
|
|
return(contents)
|
|
|
|
def getClient(self):
|
|
_token = self.xml.text
|
|
chk = True
|
|
if _token is not None:
|
|
self.token = _token
|
|
else:
|
|
# First we check the attrib.
|
|
a = self.xml.attrib.get('source')
|
|
if not a:
|
|
_exhausted = False
|
|
# try, in order, env var and then ~/.vault-token
|
|
while not _exhausted:
|
|
try:
|
|
self._getEnv('VAULT_TOKEN')
|
|
break
|
|
except OSError as e:
|
|
pass
|
|
try:
|
|
self._getFile('~/.vault-token')
|
|
_exhausted = True
|
|
except Exception as e:
|
|
_exhausted = True
|
|
if not self.token:
|
|
_logger.debug(('Unable to automatically determine token from '
|
|
'environment variable or filesystem defaults. Ignore this if you are initializing '
|
|
'Vault.'))
|
|
_logger.warning('Cannot determine token')
|
|
warnings.warn('Cannot determine token')
|
|
chk = False
|
|
else:
|
|
if a.startswith('env:'):
|
|
e = a.split(':', 1)
|
|
self.token = self._getEnv(e)
|
|
else:
|
|
self.token = self._getFile(a)
|
|
_logger.debug('Set token: {0}'.format(self.token))
|
|
self.client = hvac.Client(url = self.uri)
|
|
_logger.info('Initialized client.')
|
|
self.client.token = self.token
|
|
_logger.debug('Applied token.')
|
|
if chk:
|
|
self.authCheck()
|
|
return(None)
|
|
|
|
|
|
class UserPass(_BasicAuthBase):
|
|
name = 'UserPass'
|
|
config_name = 'userpass'
|
|
|
|
def __init__(self, uri, auth_xml, *args, **kwargs):
|
|
super().__init__(uri, auth_xml, default_mountpoint = 'userpass', *args, **kwargs)
|
|
self.getClient()
|
|
|
|
def getClient(self):
|
|
resp = self.client.auth.userpass.login(username = self.username,
|
|
password = self.password,
|
|
mount_point = self.mount)
|
|
_logger.debug('Attempted to authenticate client.')
|
|
try:
|
|
self.client.token = resp['auth']['client_token']
|
|
except KeyError:
|
|
# Auth failed. We'll let authCheck() handle the error.
|
|
pass
|
|
self.authCheck()
|
|
return(None)
|