import os import re import subprocess ## import gpg ## import arb_util import mirror import package class Repo(object): def __init__(self, repo_xml, ns = '', *args, **kwargs): self.xml = repo_xml self.name = repo_xml.attrib['name'] self.ns = ns self.gpg = None self.key = None self.mirrors = [] self.packages = [] self.packagefiles = [] self.sigfiles = [] _key_id = self.xml.attrib.get('gpgKeyID') self.key_id = (re.sub(r'\s+', '', _key_id) if _key_id else None) self.staging_dir = os.path.abspath(os.path.expanduser(self.xml.attrib.get('staging', '.'))) self.sign_pkgs = arb_util.xmlBool(self.xml.attrib.get('signPkgs', True)) self.sign_db = arb_util.xmlBool(self.xml.attrib.get('signDB', True)) if any((self.sign_db, self.sign_pkgs)): self._initSigner() self._initMirrors() self._initPackages() def _genRepo(self): if not self.packagefiles: # raise RuntimeError('.build() must be run before ._genRepo()') return(None) cmd = ['repo-add'] if self.sign_db: cmd.extend(['--sign', '--key', self.key_id]) cmd.extend(['--remove', os.path.join(self.staging_dir, '{0}.db.tar.xz'.format(self.name)), *self.packagefiles]) repo_out = subprocess.run(cmd, stderr = subprocess.PIPE, stdout = subprocess.PIPE) return(True) def _initMirrors(self): for m in self.xml.findall('{0}mirrors/{0}mirror.RemoteMirror'.format(self.ns)): self.mirrors.append(mirror.RemoteMirror(m, ns = self.ns)) for m in self.xml.findall('{0}mirrors/{0}mirror.LocalMirror'.format(self.ns)): self.mirrors.append(mirror.LocalMirror(m, ns = self.ns)) return() def _initPackages(self): for pkg in self.xml.findall('{0}packages/{0}aur'.format(self.ns)): self.packages.append(package.AURPkg(pkg, ns = self.ns)) for pkg in self.xml.findall('{0}packages/{0}pkgbuild'.format(self.ns)): self.packages.append(package.LocalPkg(pkg, ns = self.ns)) return() def _initSigner(self): if self.key_id: squashed_key = re.sub(r'^(?:0X)?([0-9A-Z]+)$', r'\g<1>', self.key_id.upper()) else: squashed_key = None gpghome = self.xml.attrib.get('gnupgHome', os.environ.get('GNUPGHOME', '~/.gnupg')) gpghome = os.path.abspath(os.path.expanduser(gpghome)) if not gpghome: raise FileNotFoundError('{0} does not exist'.format(gpghome)) self.gpg = gpg.Context(home_dir = gpghome) keys = [k for k in self.gpg.keylist(pattern = self.key_id, secret = True)] for k in keys: # In form of: (fingerprint/full, long, short) keyforms = (k.fpr, k.fpr[-16:], k.fpr[-8:]) if squashed_key: if squashed_key in keyforms: if k.can_sign: self.key = k self.key_id = k.fpr break else: for s in k.subkeys: subkeyforms = (s.fpr, s.fpr[-16:], s.fpr[-8:]) if squashed_key in subkeyforms: if s.can_sign: self.key = s self.key_id = s.fpr break else: if k.can_sign: self.key = k break if not self.key: raise ValueError('Cannot find a suitable signing GPG key') self.gpg.signers = [self.key] return() def build(self): for p in self.packages: self.packagefiles.extend(p.build(self.staging_dir)) if self.sign_pkgs: for f in self.packagefiles: sigfile = '{0}.sig'.format(f) with open(f, 'rb') as pkg: with open(sigfile, 'wb') as sig: sig.write(self.gpg.sign(pkg.read(), mode = gpg.constants.SIG_MODE_DETACH)[0]) self.sigfiles.append(sigfile) self._genRepo() return() def sync(self): for m in self.mirrors: m.sync(self.staging_dir) return()