Various helper scripts for libvirt.
https://r00t2.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
223 lines
9.1 KiB
223 lines
9.1 KiB
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
# import os |
|
# import getpass |
|
import re |
|
## |
|
import libvirt |
|
# from lxml import etree |
|
|
|
# NOTE: docs URLS are super long. Extrapolate using following: |
|
# docsurl = 'https://libvirt.org/docs/libvirt-appdev-guide-python/en-US/html' |
|
|
|
# TODO: flesh this out. only supports guests atm |
|
# TODO: use openAuth? |
|
# {docsurl}/libvirt_application_development_guide_using_python-Connections.html#idp13928160 |
|
|
|
# I would like to take the moment to point out that I did in three hours with exactly NO prior knowledge of the libvirt |
|
# API what Red Hat couldn't do in four YEARS. https://bugzilla.redhat.com/show_bug.cgi?id=1244093 |
|
|
|
|
|
def libvirt_callback(userdata, err): |
|
# fucking worst design decision. |
|
# https://stackoverflow.com/a/45543887/733214 |
|
pass |
|
|
|
|
|
# fucking worst design decision. |
|
# https://stackoverflow.com/a/45543887/733214 |
|
libvirt.registerErrorHandler(f = libvirt_callback, ctx = None) |
|
|
|
|
|
class LV(object): |
|
def __init__(self, uri, *args, **kwargs): |
|
self.uri = uri |
|
self.conn = None |
|
self._args = args |
|
self._kwargs = kwargs |
|
|
|
def _getTargets(self, target, regex = False, ttype = 'guest', |
|
state = None, nocase = False, *args, **kwargs): |
|
targets = [] |
|
# TODO: ..._RUNNING as well? can add multiple flags |
|
state_flags = {'guest': (libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE, |
|
libvirt.VIR_CONNECT_LIST_DOMAINS_INACTIVE), |
|
'net': (libvirt.VIR_CONNECT_LIST_NETWORKS_ACTIVE, |
|
libvirt.VIR_CONNECT_LIST_NETWORKS_INACTIVE), |
|
'storage': (libvirt.VIR_CONNECT_LIST_STORAGE_POOLS_ACTIVE, |
|
libvirt.VIR_CONNECT_LIST_STORAGE_POOLS_INACTIVE)} |
|
re_flags = re.UNICODE # The default |
|
if nocase: |
|
re_flags += re.IGNORECASE |
|
if not self.conn: |
|
self.startConn() |
|
search_funcs = {'guest': self.conn.listAllDomains, |
|
'net': self.conn.listAllNetworks, |
|
'storage': self.conn.listAllStoragePools} |
|
if not regex: |
|
ptrn = r'^{0}$'.format(target) |
|
else: |
|
ptrn = target |
|
ptrn = re.compile(ptrn, re_flags) |
|
if state == 'active': |
|
flag = state_flags[ttype][0] |
|
elif state == 'inactive': |
|
flag = state_flags[ttype][1] |
|
else: |
|
flag = 0 |
|
for t in search_funcs[ttype](flag): |
|
if ptrn.search(t.name()): |
|
targets.append(t) |
|
targets.sort(key = lambda i: i.name()) |
|
return(targets) |
|
|
|
def list(self, target, verbose = False, *args, **kwargs): |
|
# {docsurl}/libvirt_application_development_guide_using_python-Guest_Domains-Information-Info.html |
|
if not self.conn: |
|
self.startConn() |
|
targets = self._getTargets(target, **kwargs) |
|
results = [] |
|
# Each attr is a tuple; the name of the attribute and the key name the result should use (if defined) |
|
attr_map = {'str': (('name', None), |
|
('OSType', 'os'), |
|
('UUIDString', 'uuid'), |
|
('hostname', None)), |
|
'bool': (('autostart', None), |
|
('hasCurrentSnapshot', 'current_snapshot'), |
|
('hasManagedSaveImage', 'managed_save_image'), |
|
('isActive', 'active'), |
|
('isPersistent', 'persistent'), |
|
('isUpdated', 'updated')), |
|
'int': (('ID', 'id'), |
|
('maxMemory', 'max_memory_KiB'), |
|
('maxVcpus', 'max_vCPUs'))} |
|
for t in targets: |
|
if not verbose: |
|
results.append(t.name()) |
|
else: |
|
r = {} |
|
for attrname, newkey in attr_map['str']: |
|
keyname = (newkey if newkey else attrname) |
|
try: |
|
r[keyname] = str(getattr(t, attrname)()) |
|
except libvirt.libvirtError: |
|
r[keyname] = '(N/A)' |
|
for attrname, newkey in attr_map['bool']: |
|
keyname = (newkey if newkey else attrname) |
|
try: |
|
r[keyname] = bool(getattr(t, attrname)()) |
|
except (libvirt.libvirtError, ValueError): |
|
r[keyname] = None |
|
for attrname, newkey in attr_map['int']: |
|
keyname = (newkey if newkey else attrname) |
|
try: |
|
r[keyname] = int(getattr(t, attrname)()) |
|
if r[keyname] == -1: |
|
r[keyname] = None |
|
except (libvirt.libvirtError, ValueError): |
|
r[keyname] = None |
|
results.append(r) |
|
return(results) |
|
|
|
def restart(self, target, *args, **kwargs): |
|
self.stop(target, state = 'active', **kwargs) |
|
self.start(target, state = 'inactive', **kwargs) |
|
return() |
|
|
|
def start(self, target, **kwargs): |
|
if not self.conn: |
|
self.startConn() |
|
targets = self._getTargets(target, state = 'inactive', **kwargs) |
|
for t in targets: |
|
t.create() |
|
return() |
|
|
|
def stop(self, target, force = False, *args, **kwargs): |
|
if not self.conn: |
|
self.startConn() |
|
targets = self._getTargets(target, state = 'active', **kwargs) |
|
for t in targets: |
|
if not force: |
|
t.shutdown() |
|
else: |
|
t.destroy() |
|
return () |
|
|
|
def startConn(self): |
|
self.conn = libvirt.open(self.uri) |
|
return() |
|
|
|
def stopConn(self): |
|
if self.conn: |
|
self.conn.close() |
|
self.conn = None |
|
return() |
|
|
|
|
|
def parseArgs(): |
|
args = argparse.ArgumentParser(description = 'Some better handling of libvirt guests') |
|
common_args = argparse.ArgumentParser(add_help = False) |
|
common_args.add_argument('-u', '--uri', |
|
dest = 'uri', |
|
default = 'qemu:///system', |
|
help = 'The URI for the libvirt to connect to. Default: qemu:///system') |
|
common_args.add_argument('-r', '--regex', |
|
action = 'store_true', |
|
help = 'If specified, use a regex pattern for TARGET instead of exact match') |
|
common_args.add_argument('-i', '--case-insensitive', |
|
action = 'store_true', |
|
dest = 'nocase', |
|
help = 'If specified, match the target name/regex pattern case-insensitive') |
|
common_args.add_argument('-T', '--target-type', |
|
# choices = ['guest', 'net', 'storage'], |
|
choices = ['guest'], |
|
default = 'guest', |
|
dest = 'ttype', |
|
help = 'The type of TARGET') |
|
common_args.add_argument('-t', '--target', |
|
dest = 'target', |
|
metavar = 'TARGET', |
|
default = '.*', |
|
help = ('The guest, network, etc. to manage. ' |
|
'If not specified, operate on all (respecting other filtering)')) |
|
subparsers = args.add_subparsers(help = 'Operation to perform', |
|
dest = 'oper', |
|
metavar = 'OPERATION', |
|
required = True) |
|
start_args = subparsers.add_parser('start', help = 'Start the target(s)', parents = [common_args]) |
|
restart_args = subparsers.add_parser('restart', help = 'Restart the target(s)', parents = [common_args]) |
|
stop_args = subparsers.add_parser('stop', help = 'Stop ("destroy") the target(s)', parents = [common_args]) |
|
stop_args.add_argument('-f', '--force', |
|
dest = 'force', |
|
action = 'store_true', |
|
help = 'Hard poweroff instead of send a shutdown/ACPI powerdown signal') |
|
list_args = subparsers.add_parser('list', help = 'List the target(s)', parents = [common_args]) |
|
list_args.add_argument('-v', '--verbose', |
|
dest = 'verbose', |
|
action = 'store_true', |
|
help = 'Display more output') |
|
list_args.add_argument('-s', '--state', |
|
dest = 'state', |
|
choices = ['active', 'inactive'], |
|
default = None, |
|
help = 'Filter results by state. Default is all states') |
|
return(args) |
|
|
|
|
|
def main(): |
|
args = parseArgs().parse_args() |
|
varargs = vars(args) |
|
lv = LV(**varargs) |
|
f = getattr(lv, args.oper)(**varargs) |
|
if args.oper == 'list': |
|
if args.verbose: |
|
import json |
|
print(json.dumps(f, indent = 4, sort_keys = True)) |
|
else: |
|
print('\n'.join(f)) |
|
return() |
|
|
|
|
|
if __name__ == '__main__': |
|
main()
|
|
|