#!/usr/bin/env python3 # Example .arch.json: # { # "date": "Fri, 01 Jan 2021 00:00:00 +0000", # "mirror": "http://arch.mirror.constant.com/", # "country": "US", # "notes": "https://archlinux.org/releng/releases/2021.01.01/", # "ver": "2021.01.01", # "sha1": "c3082b13d3cf0a253e1322568f2fd07479f86d52" # } import datetime import json import os import re ## import requests from lxml import etree ## import _base import arch_mirror_ranking class Updater(_base.BaseUpdater): _fname_re = re.compile(r'^archlinux-(?P[0-9]{4}\.[0-9]{2}\.[0-9]{2})-(?P(i686|x86_64)).iso$') _def_hash = 'sha1' _allowed_hashes = ('md5', 'sha1') _allowed_arches = ('x86_64', ) _datever_fmt = '%Y.%m.%d' arch = 'x86_64' # Arch Linux proper only offers x86_64. _iso_dir = 'iso/latest' _iso_fname = 'archlinux-{ver}-{arch}.iso' _iso_file = os.path.join(_iso_dir, _iso_fname) _tpl_file = 'arch_grub.conf.j2' def __init__(self, dest_dir = '/boot/iso', # Should be subdir of boot_dir dest_file = 'arch.iso', ver_file = '.arch.json', lock_path = '/tmp/.arch.lck', feed_url = 'https://archlinux.org/feeds/releases/', do_grub_cfg = True, boot_dir = '/boot', # ESP or boot partition mount; where GRUB files are installed *under* grub_cfg = '/etc/grub.d/40_custom_arch', # check_gpg = True, # TODO: GPG sig checking hash_type = 'sha1'): super().__init__(dest_dir, dest_file, ver_file, lock_path, do_grub_cfg, boot_dir, grub_cfg, hash_type) if hash_type.lower() not in self._allowed_hashes: raise ValueError('hash_type must be one of: {0}'.format(', '.join(self._allowed_hashes))) else: self.hash_type = hash_type.lower() self.feed_url = feed_url # From the JSON. self.rel_notes_url = None self.mirror_base = None self.country = None self.ipv4 = True self.ipv6 = False self._init_vars() def _init_vars(self): if self.getRunning(): return(None) self.getCountry() self.getNet() self.getCurVer() self.getNewVer() return(None) def findMirror(self): self.getCountry() if self.mirror_base: return(None) mirrors = [] for p in ('http', 'https'): m = arch_mirror_ranking.MirrorIdx(country = self.country, proto = p, ipv4 = self.ipv4, ipv6 = self.ipv6, isos = True) mirrors.extend(m.ranked_servers) # Find the "best" across http/https. mirrors = sorted(mirrors, key = lambda i: i['score']) for s in mirrors: try: req = requests.get(s['url']) if req.ok: self.mirror_base = s['url'] break except (OSError, ConnectionRefusedError): continue return(None) def getCountry(self): if self.country: # The API has limited number of accesses for free. return(None) url = 'https://ipinfo.io/country' req = requests.get(url, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, url)) self.country = req.content.decode('utf-8').strip().upper() return(None) def getCurVer(self): if self.getRunning(): return(None) if not os.path.isfile(self.dest_ver): self.getCountry() self.getNet() self.findMirror() self.do_update = True self.force_update = True self.old_ver = 0.00 return(None) with open(self.dest_ver, 'rb') as fh: ver_info = json.load(fh) self.old_date = datetime.datetime.strptime(ver_info['date'], self._date_fmt) self.old_ver = datetime.datetime.strptime(ver_info['ver'], self._datever_fmt) self.old_hash = ver_info.get(self.hash_type, self._def_hash) self.country = ver_info.get('country') self.mirror_base = ver_info.get('mirror') if not self.country: self.getCountry() if not self.mirror_base: self.getNet() self.findMirror() self.new_hash = self.old_hash self.new_ver = self.old_ver self.new_date = self.old_date # if ver_info.get('arch') != self.arch: # self.do_update = True # self.force_update = True if not os.path.isfile(self.dest_iso): self.do_update = True self.force_update = True return(None) realhash = self.getISOHash() if self.old_hash != realhash: self.do_update = True self.force_update = True return(None) def getNet(self): for k in ('ipv4', 'ipv6'): url = 'https://{0}.clientinfo.square-r00t.net'.format(k) try: req = requests.get(url) setattr(self, k, req.json()['ip']) except OSError: setattr(self, k, False) return(None) def getNewVer(self): if self.getRunning(): return(None) if not self.mirror_base: self.findMirror() req = requests.get(self.feed_url, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, self.feed_url)) feed = etree.fromstring(req.content) for item in feed.xpath('//item'): date_xml = item.find('pubDate') ver_xml = item.find('title') notes_xml = item.find('link') date = ver = notes = None if date_xml is not None: date = datetime.datetime.strptime(date_xml.text, self._date_fmt) if ver_xml is not None: ver = ver_xml.text if notes_xml is not None: notes = notes_xml.text new_ver = datetime.datetime.strptime(ver, self._datever_fmt) if not all((self.old_ver, self.old_date)) or \ (new_ver > self.old_ver) or \ (self.old_date < date): self.do_update = True self.new_ver = new_ver self.new_date = date self.rel_notes_url = notes datever = self.new_ver.strftime(self._datever_fmt) self.iso_url = os.path.join(self.mirror_base, self._iso_file.lstrip('/')).format(ver = datever, arch = self.arch) hash_url = os.path.join(self.mirror_base, self._iso_dir, '{0}sums.txt'.format(self.hash_type)) req = requests.get(hash_url, headers = {'User-Agent': 'curl/7.74.0'}) if not req.ok: raise RuntimeError('Received non-200/30x {0} for {1}'.format(req.status_code, hash_url)) hash_lines = req.content.decode('utf-8').strip().splitlines() tgt_fname = os.path.basename(self.iso_url) for line in hash_lines: if line.strip().startswith('#'): continue hash_str, fname = line.split() if fname != tgt_fname: continue self.new_hash = hash_str.lower() break break return(None) def updateVer(self): if self.getRunning(): return(None) d = {'date': self.new_date.strftime(self._date_fmt), 'mirror': self.mirror_base, 'country': self.country, 'notes': self.rel_notes_url, 'ver': self.new_ver.strftime(self._datever_fmt), self.hash_type: self.new_hash} j = json.dumps(d, indent = 4) with open(self.dest_ver, 'w') as fh: fh.write(j) fh.write('\n') os.chmod(self.dest_ver, 0o0644) return(None) if __name__ == '__main__': u = Updater() u.main()