import hashlib import json import os import pathlib import shutil import subprocess ## import psutil import requests class BaseUpdater(object): _tpl_dir = os.path.join(os.path.dirname(os.path.abspath(os.path.expanduser(__file__))), 'tpl') _tpl_file = None _date_fmt = '%a, %d %b %Y %H:%M:%S %z' def __init__(self, dest_dir, dest_file, ver_file, lock_path, do_grub_cfg, boot_dir, grub_cfg, hash_type # check_gpg = True, # TODO: GPG sig checking ): self.dest_dir = os.path.abspath(os.path.expanduser(dest_dir)) self.dest_file = dest_file self.ver_file = ver_file self.do_grub = bool(do_grub_cfg) self.boot_dir = os.path.abspath(os.path.expanduser(boot_dir)) self.grub_cfg = os.path.abspath(os.path.expanduser(grub_cfg)) self.lckfile = os.path.abspath(os.path.expanduser(lock_path)) p_dest = pathlib.Path(self.dest_dir) p_boot = pathlib.Path(self.boot_dir) self.grub_iso_dir = '/{0}'.format(str(p_dest.relative_to(p_boot))) self.old_date = None self.old_ver = None self.old_hash = None self.new_date = None self.new_ver = None self.new_hash = None self.do_update = False self.force_update = False self.iso_url = None self.boot_uuid = None self.hash_type = hash_type self.dest_iso = os.path.join(self.dest_dir, self.dest_file) self.dest_ver = os.path.join(self.dest_dir, self.ver_file) def main(self): if self.getRunning(): return(None) self.lock() if self.do_update or \ self.force_update or not \ all((self.old_date, self.old_ver, self.old_hash)): self.do_update = True self.download() if self.do_grub: self.grub() self.touchVer() self.unlock() return(None) def download(self): if self.getRunning(): return(None) if not any((self.do_update, self.force_update)): return(None) if not self.iso_url: raise RuntimeError('iso_url attribute must be set first') req = requests.get(self.iso_url, stream = True, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, self.iso_url)) os.makedirs(os.path.dirname(self.dest_iso), exist_ok = True) with req as uri: with open(self.dest_iso, 'wb') as fh: shutil.copyfileobj(uri.raw, fh) hasher = hashlib.new(self.hash_type) with open(self.dest_iso, 'rb') as fh: hasher.update(fh.read()) realhash = hasher.hexdigest().lower() if realhash != self.new_hash: raise RuntimeError('Hash mismatch: {0} (LOCAL), {1} (REMOTE)'.format(realhash, self.new_hash)) self.updateVer() return(None) def getCurVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.') def getNewVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.') def getRunning(self): if not os.path.isfile(self.lckfile): return(False) my_pid = os.getpid() with open(self.lckfile, 'r') as fh: pid = int(fh.read().strip()) if not psutil.pid_exists(pid): os.remove(self.lckfile) return(False) if pid == my_pid: return(False) return(True) def getUUID(self): disk_cmd = subprocess.run(['findmnt', '-T', '/boot', '--json'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if (disk_cmd.returncode != 0) or disk_cmd.stderr.decode('utf-8').strip() != '': raise RuntimeError('Could not get disk UUID: {0}'.format(disk_cmd.stderr.decode('utf-8'))) disk_dict = json.loads(disk_cmd.stdout.decode('utf-8')) disk_dev = disk_dict['filesystems'][0]['source'] info_cmd = subprocess.run(['blkid', '-o', 'export', disk_dev], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if (info_cmd.returncode != 0) or info_cmd.stderr.decode('utf-8').strip() != '': raise RuntimeError('Could not get disk UUID: {0}'.format(info_cmd.stderr.decode('utf-8'))) info_dict = {i.split('=', 1)[0].lower():i.split('=', 1)[1] for i in info_cmd.stdout.decode('utf-8').splitlines()} self.boot_uuid = info_dict.get('uuid') return(None) def grub(self): self.getUUID() import jinja2 loader = jinja2.FileSystemLoader(searchpath = self._tpl_dir) tplenv = jinja2.Environment(loader = loader) tpl = tplenv.get_template(self._tpl_file) with open(self.grub_cfg, 'w') as fh: fh.write(tpl.render(iso_path = os.path.abspath( os.path.expanduser( os.path.join(self.grub_iso_dir, self.dest_file))), disk_uuid = self.boot_uuid)) os.chmod(self.grub_cfg, 0o0755) cmd = subprocess.run(['grub-mkconfig', '-o', '{0}/grub/grub.cfg'.format(self.boot_dir)], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if cmd.returncode != 0: stderr = cmd.stderr.decode('utf-8') if stderr.strip() != '': print(stderr) raise RuntimeError('grub-mkconfig returned a non-zero exit status ({0})'.format(cmd.returncode)) return(None) def lock(self): with open(self.lckfile, 'w') as fh: fh.write(str(os.getpid())) return(None) def touchVer(self): if self.getRunning(): return(None) ver_path = pathlib.Path(self.dest_ver) ver_path.touch(exist_ok = True) return(None) def unlock(self): if os.path.isfile(self.lckfile): os.remove(self.lckfile) return(None) def updateVer(self): raise RuntimeError('BaseUpdater should be subclassed and its updateVer, getCurVer, and getNewVer methods ' 'should be replaced.')