Arch_Repo_Builder/ARB/repo.py
2019-09-23 06:45:18 -04:00

117 lines
4.4 KiB
Python

import os
import re
import subprocess
##
import gpg
##
import arb_util
import mirror
import package
class Repo(object):
def __init__(self, repo_xml, ns = '', *args, **kwargs):
self.xml = repo_xml
self.name = repo_xml.attrib['name']
self.ns = ns
self.gpg = None
self.key = None
self.mirrors = []
self.packages = []
self.packagefiles = []
self.sigfiles = []
_key_id = self.xml.attrib.get('gpgKeyID')
self.key_id = (re.sub(r'\s+', '', _key_id) if _key_id else None)
self.staging_dir = os.path.abspath(os.path.expanduser(self.xml.attrib.get('staging',
'.')))
self.sign_pkgs = arb_util.xmlBool(self.xml.attrib.get('signPkgs', True))
self.sign_db = arb_util.xmlBool(self.xml.attrib.get('signDB', True))
if any((self.sign_db, self.sign_pkgs)):
self._initSigner()
self._initMirrors()
self._initPackages()
def _genRepo(self):
if not self.packagefiles:
# raise RuntimeError('.build() must be run before ._genRepo()')
return(None)
cmd = ['repo-add']
if self.sign_db:
cmd.extend(['--sign', '--key', self.key_id])
cmd.extend(['--remove',
os.path.join(self.staging_dir, '{0}.db.tar.xz'.format(self.name)),
*self.packagefiles])
repo_out = subprocess.run(cmd, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
return(True)
def _initMirrors(self):
for m in self.xml.findall('{0}mirrors/{0}mirror.RemoteMirror'.format(self.ns)):
self.mirrors.append(mirror.RemoteMirror(m, ns = self.ns))
for m in self.xml.findall('{0}mirrors/{0}mirror.LocalMirror'.format(self.ns)):
self.mirrors.append(mirror.LocalMirror(m, ns = self.ns))
return()
def _initPackages(self):
for pkg in self.xml.findall('{0}packages/{0}aur'.format(self.ns)):
self.packages.append(package.AURPkg(pkg, ns = self.ns))
for pkg in self.xml.findall('{0}packages/{0}pkgbuild'.format(self.ns)):
self.packages.append(package.LocalPkg(pkg, ns = self.ns))
return()
def _initSigner(self):
if self.key_id:
squashed_key = re.sub(r'^(?:0X)?([0-9A-Z]+)$', r'\g<1>', self.key_id.upper())
else:
squashed_key = None
gpghome = self.xml.attrib.get('gnupgHome',
os.environ.get('GNUPGHOME',
'~/.gnupg'))
gpghome = os.path.abspath(os.path.expanduser(gpghome))
if not gpghome:
raise FileNotFoundError('{0} does not exist'.format(gpghome))
self.gpg = gpg.Context(home_dir = gpghome)
keys = [k for k in self.gpg.keylist(pattern = self.key_id, secret = True)]
for k in keys:
# In form of: (fingerprint/full, long, short)
keyforms = (k.fpr, k.fpr[-16:], k.fpr[-8:])
if squashed_key:
if squashed_key in keyforms:
if k.can_sign:
self.key = k
self.key_id = k.fpr
break
else:
for s in k.subkeys:
subkeyforms = (s.fpr, s.fpr[-16:], s.fpr[-8:])
if squashed_key in subkeyforms:
if s.can_sign:
self.key = s
self.key_id = s.fpr
break
else:
if k.can_sign:
self.key = k
break
if not self.key:
raise ValueError('Cannot find a suitable signing GPG key')
self.gpg.signers = [self.key]
return()
def build(self):
for p in self.packages:
self.packagefiles.extend(p.build(self.staging_dir))
if self.sign_pkgs:
for f in self.packagefiles:
sigfile = '{0}.sig'.format(f)
with open(f, 'rb') as pkg:
with open(sigfile, 'wb') as sig:
sig.write(self.gpg.sign(pkg.read(), mode = gpg.constants.SIG_MODE_DETACH)[0])
self.sigfiles.append(sigfile)
self._genRepo()
return()
def sync(self):
for m in self.mirrors:
m.sync(self.staging_dir)
return()