diff --git a/repomirror/__init__.py b/repomirror/__init__.py index 725b28d..f413ea0 100644 --- a/repomirror/__init__.py +++ b/repomirror/__init__.py @@ -2,10 +2,11 @@ from . import logger ## import logging ## +_logger = logging.getLogger() from . import config from . import constants +from . import fetcher from . import sync -_logger = logging.getLogger() - +Sync = sync.Sync diff --git a/repomirror/constants.py b/repomirror/constants.py index 72b97c4..bd025e6 100644 --- a/repomirror/constants.py +++ b/repomirror/constants.py @@ -1,2 +1,12 @@ PROTO_DEF_PORTS = {'ftp': 21, 'rsync': 873} +RSYNC_DEF_ARGS = ['recursive', + 'times', + 'links', + 'hard-links', + 'delete-after', + 'delay-updates', + 'copy-links', + 'safe-links', + 'delete-extended', + 'exclude=.*'] diff --git a/repomirror/fetcher/__init__.py b/repomirror/fetcher/__init__.py new file mode 100644 index 0000000..2d66f3c --- /dev/null +++ b/repomirror/fetcher/__init__.py @@ -0,0 +1,5 @@ +from . import ftp +from . import rsync + +FTP = ftp.FTP +RSync = rsync.RSync diff --git a/repomirror/fetcher/_base.py b/repomirror/fetcher/_base.py new file mode 100644 index 0000000..b70c553 --- /dev/null +++ b/repomirror/fetcher/_base.py @@ -0,0 +1,36 @@ +import datetime +import logging +import os + + +_logger = logging.getLogger() + + +class BaseFetcher(object): + type = None + + def __init__(self, domain, port, path, dest, owner = None, filechecks = None, *args, **kwargs): + self.domain = domain + self.port = int(port) + self.path = path + self.dest = os.path.abspath(os.path.expanduser(dest)) + self.url = '{0}://{1}:{2}/{3}'.format(self.type, self.domain, self.port, self.path.lstrip('/')) + self.owner = owner + self.filechecks = filechecks + self.timestamps = {} + os.makedirs(self.dest, mode = 0o0755, exist_ok = True) + if self.owner: + os.chown(self.dest, **self.owner) + + def check(self): + for k, v in self.filechecks['remote']: + if v: + tstmp_raw = self.fetch_content(v.path).decode('utf-8').strip() + tstmp = datetime.datetime.strptime(tstmp_raw, v.fmt) + self.timestamps[k] = tstmp + _logger.debug('Updated timestamps: {0}'.format(self.timestamps)) + return(None) + + def fetch_content(self, path): + # Dummy func. + return(b'') diff --git a/repomirror/fetcher/ftp.py b/repomirror/fetcher/ftp.py new file mode 100644 index 0000000..3269e25 --- /dev/null +++ b/repomirror/fetcher/ftp.py @@ -0,0 +1,136 @@ +import ftplib +import logging +import io +import os +import pathlib +## +from . import _base + + +_logger = logging.getLogger() + + +class FTP(_base.BaseFetcher): + type = 'ftp' + + def __init__(self, domain, port, path, dest, owner = None, *args, **kwargs): + super().__init__(domain, port, path, dest, owner = owner, *args, **kwargs) + _logger.debug('Instantiated FTP fetcher') + self.handler = ftplib.FTP(self.domain) + _logger.debug('Configured handler for {0}'.format(self.domain)) + self.handler.port = self.port + _logger.debug('Set port for {0} to {1}'.format(self.domain, self.port)) + self.connected = None + + def _connect(self): + if not self.connected: + self.handler.login() + _logger.debug('Connected to {0}:{1} as Anonymous'.format(self.domain, self.port)) + self.connected = True + return(None) + + def _disconnect(self): + if self.connected: + self.handler.quit() + _logger.debug('Disconnected from {0}:{1} as Anonymous'.format(self.domain, self.port)) + self.connected = False + return(None) + + def _pathtuple(self, path): + relpath = path.lstrip('/') + relpath_stripped = str(pathlib.Path(relpath).relative_to(self.path)) + destdir = os.path.join(self.dest, os.path.dirname(relpath_stripped)) + destpath = os.path.join(self.dest, relpath_stripped) + return((relpath, destdir, destpath)) + + def _prepdir(self, destdir): + os.makedirs(destdir, mode = 0o0755, exist_ok = True) + _logger.debug('Created directory {0} (if it did not exist)'.format(destdir)) + if self.owner: + os.chown(destdir, **self.owner) + _logger.debug('Chowned {0} to {uid}:{gid}'.format(destdir, **self.owner)) + return() + + def fetch(self): + def getter(path, relroot): + _logger.debug('getter invoked with path={0}, relroot={1}'.format(path, relroot)) + if relroot == path: + parentdir = path + _logger.debug('relroot and path are the same') + else: + parentdir = relroot + _logger.debug('relroot and path are not the same') + _logger.debug('parentdir set to {0}'.format(parentdir)) + _logger.debug('Executing LS on {0}'.format(parentdir)) + for itemspec in self.handler.mlsd(parentdir): + relpath, spec = itemspec + if relpath in ('.', '..'): + continue + _logger.debug(('Parsing path (' + 'relroot: {0}, ' + 'path: {1}, ' + 'relpath: {2}) with spec {3}').format(relroot, path, relpath, itemspec)) + ptype = spec['type'] + newpath = os.path.join(parentdir, relpath) + itemspec = (newpath, itemspec[1]) + if ptype.startswith('OS.unix=slink'): + _logger.debug('Fetching symlink {0}'.format(parentdir)) + self.fetch_symlink(itemspec) + elif ptype == 'dir': + _logger.debug('Fetching dir {0}'.format(parentdir)) + self.fetch_dir(itemspec) + _logger.debug('Recursing getter with relpath={0}, parentdir={1}'.format(relpath, parentdir)) + getter(relpath, newpath) + elif ptype == 'file': + _logger.debug('Fetching file {0}'.format(parentdir)) + self.fetch_file(itemspec) + return(None) + self._connect() + getter(self.path, self.path) + self._disconnect() + return(None) + + def fetch_content(self, remote_filepath): + self._connect() + buf = io.BytesIO() + self.handler.retrbinary('RETR {0}'.format(remote_filepath), buf.write) + self._disconnect() + buf.seek(0, 0) + return(buf.read()) + + def fetch_dir(self, pathspec): + self._connect() + # Relative to FTP root. + relpath, destdir, destpath = self._pathtuple(pathspec[0]) + mode = int(pathspec[1]['unix.mode'], 8) + os.makedirs(destpath, mode = mode, exist_ok = True) + _logger.debug('Created directory {0} with mode {1} (if it did not exist)'.format(destpath, oct(mode))) + if self.owner: + os.chown(destpath, **self.owner) + _logger.debug('Chowned {0} to {uid}:{gid}'.format(destpath, **self.owner)) + return(None) + + def fetch_file(self, pathspec): + self._connect() + relpath, destdir, destpath = self._pathtuple(pathspec[0]) + self._prepdir(destdir) + with open(destpath, 'wb') as fh: + self.handler.retrbinary('RETR {0}'.format(relpath), fh.write) + _logger.debug('Created file {0}'.format(destpath)) + mode = int(pathspec[1]['unix.mode'], 8) + os.chmod(destpath, mode) + _logger.debug('Chmodded {0} to {1}'.format(destpath, oct(mode))) + if self.owner: + os.chown(destpath, **self.owner) + _logger.debug('Chowned {0} to {uid}:{gid}'.format(destpath, **self.owner)) + return(None) + + def fetch_symlink(self, pathspec): + relpath, destdir, destpath = self._pathtuple(pathspec[0]) + self._prepdir(destdir) + # For symlinks, this is something like: OS.unix=slink:path/to/target + target = pathspec[1]['type'].split(':', 1)[1] + # We don't care if the target exists. + os.symlink(target, destpath) + _logger.debug('Created symlink {0} -> {1}'.format(destpath, target)) + return(None) diff --git a/repomirror/fetcher/rsync.py b/repomirror/fetcher/rsync.py new file mode 100644 index 0000000..b3ae612 --- /dev/null +++ b/repomirror/fetcher/rsync.py @@ -0,0 +1,99 @@ +import logging +import os +import subprocess +import sys +import tempfile +import warnings +## +_cur_dir = os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) +sys.path.append(os.path.abspath(os.path.join(_cur_dir, '..'))) +import constants +# import logger +from . import _base + + +_logger = logging.getLogger() + + +class RSync(_base.BaseFetcher): + type = 'rsync' + + def __init__(self, + domain, + port, + path, + dest, + rsync_args = None, + owner = None, + log = True, + filechecks = None, + *args, + **kwargs): + super().__init__(domain, port, path, dest, owner = owner, filechecks = filechecks, *args, **kwargs) + _logger.debug('Instantiated RSync fetcher') + if rsync_args: + self.rsync_args = rsync_args + else: + self.rsync_args = constants.RSYNC_DEF_ARGS + _logger.debug('RSync args given: {0}'.format(self.rsync_args)) + if log: + # Do I want to do this in subprocess + logging module? Or keep this? + # It looks a little ugly in the log but it makes more sense than doing it via subprocess just to write it + # back out. + _log_path = None + for h in _logger.handlers: + if isinstance(h, logging.handlers.RotatingFileHandler): + _log_path = h.baseFileName + break + self.rsync_args.extend(['--verbose', + '--log-file-format="[RSYNC {0}:{1}]:%l:%f%L"'.format(self.domain, self.port), + '--log-file={0}'.format(_log_path)]) + + def fetch(self): + path = self.url.rstrip('/') + if not path.endswith('/.'): + path += '/.' + dest = self.dest + if not dest.endswith('/.'): + dest += '/.' + # Yes, I know it's named "cmd_*str*". Yes, I know it's a *list*. + cmd_str = ['rsync', + *self.rsync_args, + path, + dest] + cmd = subprocess.run(cmd_str, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + stdout = cmd.stdout.read().decode('utf-8').strip() + stderr = cmd.stderr.read().decode('utf-8').strip() + if stdout != '': + _logger.debug('STDOUT: {0}'.format(stdout)) + if stderr != '' or cmd.returncode != 0: + _logger.error('Rsync to {0}:{1} returned exit status {2}'.format(self.domain, self.port, cmd.returncode)) + _logger.debug('STDERR: {0}'.format(stderr)) + warnings.warn('Rsync process returned non-zero') + return(None) + + def fetch_content(self, remote_filepath): + tf = tempfile.mkstemp()[1] + url = os.path.join(self.url.rstrip('/'),remote_filepath.lstrip('/')) + cmd_str = ['rsync', + *self.rsync_args, + url, + tf] + cmd = subprocess.run(cmd_str, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + stdout = cmd.stdout.read().decode('utf-8').strip() + stderr = cmd.stderr.read().decode('utf-8').strip() + if stdout != '': + _logger.debug('STDOUT: {0}'.format(stdout)) + if stderr != '' or cmd.returncode != 0: + _logger.error('Rsync to {0}:{1} returned exit status {2}'.format(self.domain, self.port, cmd.returncode)) + _logger.debug('STDERR: {0}'.format(stderr)) + warnings.warn('Rsync process returned non-zero') + with open(tf, 'rb') as fh: + raw_content = fh.read() + os.remove(tf) + return(raw_content) + diff --git a/repomirror/logger.py b/repomirror/logger.py index 8d8fa53..980c7e3 100644 --- a/repomirror/logger.py +++ b/repomirror/logger.py @@ -37,7 +37,7 @@ if _has_journald: h = journal.JournaldLogHandler() # Systemd includes times, so we don't need to. h.setFormatter(logging.Formatter(style = '{', - fmt = ('{name}:{levelname}:{name}:{filename}:' + fmt = ('{name}:{levelname}:{filename}:' '{funcName}:{lineno}: {message}'))) _cfg_args['handlers'].append(h) diff --git a/repomirror/sync.py b/repomirror/sync.py index 885c505..c243d15 100644 --- a/repomirror/sync.py +++ b/repomirror/sync.py @@ -1,13 +1,40 @@ import datetime import logging +import pwd +import grp import os +import socket ## from . import config +from . import constants +from . import fetcher _logger = logging.getLogger() +def get_owner(owner_xml): + owner = {} + user = owner_xml.find('user') + if user: + user = user.text + group = owner_xml.find('group') + if group: + group = group.text + if user: + user_obj = pwd.getpwnam(user) + else: + user_obj = pwd.getpwuid(os.geteuid()) + owner['uid'] = user_obj.pw_uid + if group: + group_obj = grp.getgrnam(group) + else: + group_obj = grp.getgrgid(pwd.getpwuid(os.geteuid()).pw_gid) + owner['gid'] = group_obj.gr_gid + _logger.debug('Resolved owner xml to {0}'.format(owner)) + return(owner) + + class Args(object): def __init__(self, args_xml): self.xml = args_xml @@ -15,8 +42,20 @@ class Args(object): self._parse_xml() def _parse_xml(self): + self.args = [] for arg_xml in self.xml.xpath('(short|long)'): - + val = arg_xml.attrib.get('value') + if arg_xml.tag == 'short': + prefix = '-' + # elif arg_xml.tag == 'long': + else: + prefix = '--' + arg = '{0}{1}'.format(prefix, arg_xml.text) + if val: + arg += '={0}'.format(val) + self.args.append(arg) + _logger.debug('Generated args list: {0}'.format(self.args)) + return(None) class Mount(object): @@ -26,36 +65,194 @@ class Mount(object): self._check_mount() def _check_mount(self): + _logger.debug('Getting mount status for {0}'.format(self.path)) with open('/proc/mounts', 'r') as fh: raw = fh.read() for line in raw.splitlines(): l = line.split() mp = l[1] if mp == self.path: + _logger.debug('{0} is mounted.'.format(self.path)) self.is_mounted = True return(None) self.is_mounted = False + _logger.debug('{0} is not mounted.'.format(self.path)) return(None) class TimestampFile(object): - def __init__(self, ts_xml): + def __init__(self, ts_xml, owner_xml = None): self.fmt = ts_xml.attrib.get('timeFormat', 'UNIX_EPOCH') if self.fmt == 'UNIX_EPOCH': self.fmt = '%s' elif self.fmt == 'MICROSECOND_EPOCH': self.fmt = '%s.%f' + _logger.debug('Set timestamp format string to {0}'.format(self.fmt)) + self.owner_xml = owner_xml + self.owner = {} + if self.owner_xml: + self.owner = get_owner(self.owner_xml) + _logger.debug('Owner set is {0}'.format(self.owner)) self.path = os.path.abspath(os.path.expanduser(ts_xml.text)) + _logger.debug('Path resolved to {0}'.format(self.path)) + + def read(self, parentdir = None): + if parentdir: + path = os.path.join(os.path.abspath(os.path.expanduser(parentdir)), + self.path.lstrip('/')) + else: + path = self.path + with open(path, 'r') as fh: + timestamp = datetime.datetime.strptime(fh.read().strip(), self.fmt) + _logger.debug('Read timestamp {0} from {1}'.format(str(timestamp), self.path)) + return(timestamp) + + def write(self): + dname = os.path.dirname(self.path) + if not os.path.isdir(dname): + os.makedirs(dname, mode = 0o0755) + if self.owner: + os.chown(dname, **self.owner) + _logger.debug('Created {0}'.format(dname)) + with open(self.path, 'w') as fh: + fh.write(datetime.datetime.utcnow().strftime(self.fmt)) + fh.write('\n') + os.chmod(self.path, mode = 0o0644) + if self.owner: + os.chown(self.path, **self.owner) + _logger.debug('Wrote timestamp to {0}'.format(self.path)) + return(None) class Upstream(object): - def __init__(self, upstream_xml): - pass + def __init__(self, upstream_xml, dest, rsync_args = None, owner = None, filechecks = None): + self.xml = upstream_xml + # These are required for all upstreams. + self.sync_type = self.xml.find('syncType').text.lower() + self.domain = self.xml.find('domain').text + self.path = self.xml.find('path').text + self.dest = os.path.abspath(os.path.expanduser(dest)) + self.owner = owner + self.filechecks = filechecks + self.has_new = False + # These are optional. + for i in ('port', 'bwlimit'): + e = self.xml.find(i) + if e: + setattr(self, i, int(e.text)) + else: + setattr(self, i, None) + if not getattr(self, 'port'): + self.port = constants.PROTO_DEF_PORTS[self.sync_type] + self.available = None + if self.sync_type == 'rsync': + self.fetcher = fetcher.RSync(self.domain, + self.port, + self.path, + self.dest, + rsync_args = rsync_args, + filechecks = self.filechecks, + owner = self.owner) + else: + self.fetcher = fetcher.FTP(self.domain, self.port, self.path, self.dest, owner = self.owner) + self._check_conn() + + def _check_conn(self): + sock = socket.socket() + sock.settimeout(7) + try: + sock.connect((self.domain, self.port)) + sock.close() + self.available = True + except (socket.timeout, socket.error): + self.available = False + return(None) + + def sync(self): + self.fetcher.fetch() + return(None) + + +class Distro(object): + def __init__(self, distro_xml): + self.xml = distro_xml + self.name = distro_xml.attrib['name'] + self.dest = os.path.abspath(os.path.expanduser(distro_xml.find('dest').text)) + self.mount = Mount(self.xml.find('mountCheck')) + self.filechecks = {'local': {'check': None, + 'sync': None}, + 'remote': {'update': None, + 'sync': None}} + self.timestamps = {} + self.rsync_args = None + self.owner = None + self.upstreams = [] + # These are optional. + self.owner_xml = self.xml.find('owner') + if self.owner_xml: + self.owner = get_owner(self.owner_xml) + self.rsync_xml = self.xml.find('rsyncArgs') + if self.rsync_xml: + self.rsync_args = Args(self.rsync_xml) + for i in ('Check', 'Sync'): + e = self.xml.find('lastLocal{0}'.format(i)) + if e: + self.filechecks['local'][i.lower()] = TimestampFile(e) + for i in ('Sync', 'Update'): + e = self.xml.find('lastRemote{0}'.format(i)) + if e: + self.filechecks['remote'][i.lower()] = TimestampFile(e) + for u in self.xml.findall('upstream'): + self.upstreams.append(Upstream(u, + self.dest, + rsync_args = self.rsync_args, + owner = self.owner, + filechecks = self.filechecks)) + + def check(self): + for k, v in self.filechecks['local']: + if v: + tstmp = v.read() + self.timestamps[k] = tstmp + _logger.debug('Updated timestamps: {0}'.format(self.timestamps)) + + def sync(self): + self.check() + for u in self.upstreams: + if not u.available: + continue + u.fetcher.check(self.filechecks['local']) + if u.has_new: + u.sync() + if self.filechecks['local']['sync']: + self.filechecks['local']['sync'].write() + break + if self.filechecks['local']['check']: + self.filechecks['local']['check'].write() + return(None) class Sync(object): def __init__(self, cfg = None, dummy = False, distro = None, logdir = None, *args, **kwargs): - _args = dict(locals()) - del(_args['self']) - _logger.debug('Sync class instantiated with args: {0}'.format(_args)) - self.cfg = config.Config(cfg) + try: + _args = dict(locals()) + del(_args['self']) + _logger.debug('Sync class instantiated with args: {0}'.format(_args)) + self.dummy = dummy + if distro: + self.distro = distro + else: + self.distro = [] + self._distro_objs = [] + self.logdir = logdir + self.xml = config.Config(cfg) + self._distro_populate() + except Exception: + _logger.error('FATAL ERROR. Stacktrace follows.', exc_info = True) + + def _distro_populate(self): + pass + + def sync(self): + for d in self._distro_objs: + d.sync() diff --git a/repomirror/test.py b/repomirror/test.py new file mode 100755 index 0000000..82ca827 --- /dev/null +++ b/repomirror/test.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +import os +import shutil +## +import logger +import fetcher + +dest = '/tmp/ipxe_ftp' +path = 'ipxe' + + +def main(): + if os.path.isdir(dest): + shutil.rmtree(dest) + f = fetcher.FTP('10.11.12.12', 21, path, dest) + f.fetch() + + +if __name__ == '__main__': + main() + diff --git a/reposync.py b/reposync.py index 62aac9b..3b5ccf1 100644 --- a/reposync.py +++ b/reposync.py @@ -25,11 +25,12 @@ def parseArgs(): dest = 'cfg', help = ('The path to the config file. If it does not exist, a bare version will be created. ' 'Default: ~/.config/repomirror.xmlost')) - args.add_argument('-n', '--dry-run', - action = 'store_true', - dest = 'dummy', - help = ('If specified, do not actually sync anything (other than timestamp files if ' - 'applicable to determine logic); do not actually sync any repositories')) + # args.add_argument('-n', '--dry-run', + # action = 'store_true', + # dest = 'dummy', + # help = ('If specified, do not actually sync anything (other than timestamp files if ' + # 'applicable to determine logic); do not actually sync any repositories. Useful for ' + # 'generating logs to determine potential issues before they happen')) args.add_argument('-d', '--distro', dest = 'distro', action = 'append', @@ -46,8 +47,8 @@ def parseArgs(): def main(): args = parseArgs().parse_args() - r = repomirror.Sync() - + r = repomirror.Sync(**vars(args)) + r.sync() return(None) diff --git a/sample.config.xml b/sample.config.xml index 14c60a8..db68d96 100644 --- a/sample.config.xml +++ b/sample.config.xml @@ -61,7 +61,21 @@ / @@ -96,7 +110,7 @@ --> rsync arch.mirror.constant.com