import hashlib import json import os import pathlib import shutil import subprocess import tempfile ## import psutil import requests class BaseUpdater(object): _tpl_dir = os.path.join(os.path.dirname(os.path.abspath(os.path.expanduser(__file__))), 'tpl') _tpl_file = None _date_fmt = '%a, %d %b %Y %H:%M:%S %z' _tpl_vars = {} arch = None variant = None def __init__(self, dest_dir, dest_file, ver_file, lock_path, do_grub_cfg, boot_dir, grub_cfg, hash_type # check_gpg = True, # TODO: GPG sig checking ): self.dest_dir = os.path.abspath(os.path.expanduser(dest_dir)) self.dest_file = dest_file self.ver_file = ver_file self.do_grub = bool(do_grub_cfg) self.boot_dir = os.path.abspath(os.path.expanduser(boot_dir)) self.grub_cfg = os.path.abspath(os.path.expanduser(grub_cfg)) self.lckfile = os.path.abspath(os.path.expanduser(lock_path)) p_dest = pathlib.Path(self.dest_dir) p_boot = pathlib.Path(self.boot_dir) self.grub_iso_dir = '/{0}'.format(str(p_dest.relative_to(p_boot))) self.old_date = None self.old_ver = None self.old_hash = None self.new_date = None self.new_ver = None self.new_hash = None self.do_update = False self.force_update = False self.iso_url = None self.boot_uuid = None self.hash_type = hash_type self.dest_iso = os.path.join(self.dest_dir, self.dest_file) self.dest_ver = os.path.join(self.dest_dir, self.ver_file) def _init_tplvars(self): self.getUUID() self._tpl_vars['iso_path'] = os.path.abspath( os.path.expanduser( os.path.join(self.grub_iso_dir, self.dest_file))).lstrip('/') self._tpl_vars['disk_uuid'] = self.boot_uuid self._tpl_vars['version'] = self.new_ver self._tpl_vars['arch'] = self.arch self._tpl_vars['ver_str'] = str(self.new_ver).replace('.', '') self._tpl_vars['variant'] = self.variant return(None) def main(self): if self.getRunning(): return(None) self.lock() self.getCurVer() self.getNewVer() if self.do_update or \ self.force_update or not \ all((self.old_date, self.old_ver, self.old_hash)): self.do_update = True self.download() if self.do_grub: self._init_tplvars() self.grub() self.touchVer() self.unlock() return(None) def download(self): if self.getRunning(): return(None) if not any((self.do_update, self.force_update)): return(None) if not self.iso_url: raise RuntimeError('iso_url attribute must be set first') req_chk = requests.head(self.iso_url, headers = {'User-Agent': 'curl/7.74.0'}) if not req_chk.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req_chk.status_code, self.iso_url)) _tmpfile = tempfile.mkstemp()[1] with requests.get(self.iso_url, stream = True, headers = {'User-Agent': 'curl/7.74.0'}) as req: req.raise_for_status() with open(_tmpfile, 'wb') as fh: for chunk in req.iter_content(chunk_size = 8192): fh.write(chunk) realhash = self.getISOHash(_tmpfile) if self.new_hash and realhash != self.new_hash: os.remove(_tmpfile) raise RuntimeError('Hash mismatch: {0} (LOCAL), {1} (REMOTE)'.format(realhash, self.new_hash)) os.makedirs(os.path.dirname(self.dest_iso), exist_ok = True) pathlib.Path(self.dest_iso).touch(mode = 0o0644, exist_ok = True) shutil.copyfile(_tmpfile, self.dest_iso) os.remove(_tmpfile) self.updateVer() return(None) def getCurVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.') def getISOHash(self, filepath = None): if not filepath: filepath = self.dest_iso else: filepath = os.path.abspath(os.path.expanduser(filepath)) hasher = hashlib.new(self.hash_type) # TODO: later on when python 3.8 is more prevalent, https://stackoverflow.com/a/1131238/733214 with open(filepath, 'rb') as fh: while True: chunk = fh.read(8192) if not chunk: break hasher.update(chunk) return(hasher.hexdigest().lower()) def getNewVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.') def getRunning(self): if not os.path.isfile(self.lckfile): return(False) my_pid = os.getpid() with open(self.lckfile, 'r') as fh: pid = int(fh.read().strip()) if not psutil.pid_exists(pid): os.remove(self.lckfile) return(False) if pid == my_pid: return(False) return(True) def getUUID(self): disk_cmd = subprocess.run(['findmnt', '-T', '/boot', '--json'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if (disk_cmd.returncode != 0) or disk_cmd.stderr.decode('utf-8').strip() != '': raise RuntimeError('Could not get disk UUID: {0}'.format(disk_cmd.stderr.decode('utf-8'))) disk_dict = json.loads(disk_cmd.stdout.decode('utf-8')) disk_dev = disk_dict['filesystems'][0]['source'] info_cmd = subprocess.run(['blkid', '-o', 'export', disk_dev], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if (info_cmd.returncode != 0) or info_cmd.stderr.decode('utf-8').strip() != '': raise RuntimeError('Could not get disk UUID: {0}'.format(info_cmd.stderr.decode('utf-8'))) info_dict = {i.split('=', 1)[0].lower():i.split('=', 1)[1] for i in info_cmd.stdout.decode('utf-8').splitlines()} self.boot_uuid = info_dict.get('uuid') return(None) def grub(self): import jinja2 loader = jinja2.FileSystemLoader(searchpath = self._tpl_dir) tplenv = jinja2.Environment(loader = loader) tpl = tplenv.get_template(self._tpl_file) with open(self.grub_cfg, 'w') as fh: fh.write(tpl.render(**self._tpl_vars)) os.chmod(self.grub_cfg, 0o0755) cmd = subprocess.run(['grub-mkconfig', '-o', '{0}/grub/grub.cfg'.format(self.boot_dir)], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if cmd.returncode != 0: stderr = cmd.stderr.decode('utf-8') if stderr.strip() != '': print(stderr) raise RuntimeError('grub-mkconfig returned a non-zero exit status ({0})'.format(cmd.returncode)) return(None) def lock(self): with open(self.lckfile, 'w') as fh: fh.write(str(os.getpid())) return(None) def touchVer(self): if self.getRunning(): return(None) ver_path = pathlib.Path(self.dest_ver) ver_path.touch(exist_ok = True) return(None) def unlock(self): if os.path.isfile(self.lckfile): os.remove(self.lckfile) return(None) def updateVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.')