okay. so. let's try this out.
This commit is contained in:
parent
803fb7c5fa
commit
a2e9075671
@ -9,11 +9,23 @@ _logger = logging.getLogger()
|
||||
class BaseFetcher(object):
|
||||
type = None
|
||||
|
||||
def __init__(self, domain, port, path, dest, owner = None, filechecks = None, *args, **kwargs):
|
||||
def __init__(self,
|
||||
domain,
|
||||
port,
|
||||
path,
|
||||
dest,
|
||||
owner = None,
|
||||
filechecks = None,
|
||||
mtime = False,
|
||||
offset = None,
|
||||
*args,
|
||||
**kwargs):
|
||||
self.domain = domain
|
||||
self.port = int(port)
|
||||
self.path = path
|
||||
self.dest = os.path.abspath(os.path.expanduser(dest))
|
||||
self.mtime = mtime
|
||||
self.offset = offset
|
||||
self.url = '{0}://{1}:{2}/{3}'.format(self.type, self.domain, self.port, self.path.lstrip('/'))
|
||||
self.owner = owner
|
||||
self.filechecks = filechecks
|
||||
@ -25,15 +37,24 @@ class BaseFetcher(object):
|
||||
def check(self):
|
||||
for k, v in self.filechecks['remote'].items():
|
||||
if v:
|
||||
tstmp_raw = self.fetch_content(v.path).decode('utf-8').strip()
|
||||
if '%s' in v.fmt:
|
||||
tstmp = datetime.datetime.fromtimestamp(float(tstmp_raw))
|
||||
if self.mtime:
|
||||
self.timestamps[k] = self.fetch_content(v.path, mtime_only = True)
|
||||
else:
|
||||
tstmp = datetime.datetime.strptime(tstmp_raw, v.fmt)
|
||||
self.timestamps[k] = tstmp
|
||||
tstmp_raw = self.fetch_content(v.path).decode('utf-8').strip()
|
||||
if '%s' in v.fmt:
|
||||
tstmp = datetime.datetime.fromtimestamp(float(tstmp_raw))
|
||||
else:
|
||||
tstmp = datetime.datetime.strptime(tstmp_raw, v.fmt)
|
||||
self.timestamps[k] = tstmp
|
||||
if self.offset:
|
||||
if self.offset.mod == '+' or not self.offset.mod:
|
||||
newval = self.timestamps[k] + self.offset.offset
|
||||
elif self.offset.mod == '-':
|
||||
newval = self.timestamps[k] - self.offset.offset
|
||||
self.timestamps[k] = newval
|
||||
_logger.debug('Updated upstream timestamps: {0}'.format(self.timestamps))
|
||||
return(None)
|
||||
|
||||
def fetch_content(self, path):
|
||||
def fetch_content(self, path, mtime_only = False):
|
||||
# Dummy func.
|
||||
return(b'')
|
||||
|
@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import ftplib
|
||||
import logging
|
||||
import io
|
||||
@ -13,8 +14,8 @@ _logger = logging.getLogger()
|
||||
class FTP(_base.BaseFetcher):
|
||||
type = 'ftp'
|
||||
|
||||
def __init__(self, domain, port, path, dest, owner = None, *args, **kwargs):
|
||||
super().__init__(domain, port, path, dest, owner = owner, *args, **kwargs)
|
||||
def __init__(self, domain, port, path, dest, owner = None, mtime = False, offset = None, *args, **kwargs):
|
||||
super().__init__(domain, port, path, dest, owner = owner, mtime = mtime, offset = offset, *args, **kwargs)
|
||||
_logger.debug('Instantiated FTP fetcher')
|
||||
self.handler = ftplib.FTP(self.domain)
|
||||
_logger.debug('Configured handler for {0}'.format(self.domain))
|
||||
@ -31,7 +32,7 @@ class FTP(_base.BaseFetcher):
|
||||
|
||||
def _disconnect(self):
|
||||
if self.connected:
|
||||
self.handler.quit()
|
||||
self.handler.close()
|
||||
_logger.debug('Disconnected from {0}:{1} as Anonymous'.format(self.domain, self.port))
|
||||
self.connected = False
|
||||
return(None)
|
||||
@ -90,13 +91,23 @@ class FTP(_base.BaseFetcher):
|
||||
self._disconnect()
|
||||
return(None)
|
||||
|
||||
def fetch_content(self, remote_filepath):
|
||||
def fetch_content(self, remote_filepath, mtime_only = False):
|
||||
self._connect()
|
||||
buf = io.BytesIO()
|
||||
self.handler.retrbinary('RETR {0}'.format(remote_filepath), buf.write)
|
||||
if mtime_only:
|
||||
directory, file = os.path.split(remote_filepath)
|
||||
parent = '/{0}'.format(directory.lstrip('/'))
|
||||
meta = self.handler.mlsd(parent)
|
||||
file_info = dict(meta)[file]
|
||||
tstmp = file_info['modify']
|
||||
content = datetime.datetime.strptime(tstmp, '%Y%m%d%H%M%S')
|
||||
else:
|
||||
buf = io.BytesIO()
|
||||
self.handler.retrbinary('RETR {0}'.format(remote_filepath), buf.write)
|
||||
self._disconnect()
|
||||
buf.seek(0, 0)
|
||||
content = buf.read()
|
||||
self._disconnect()
|
||||
buf.seek(0, 0)
|
||||
return(buf.read())
|
||||
return(content)
|
||||
|
||||
def fetch_dir(self, pathspec):
|
||||
self._connect()
|
||||
|
@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
@ -29,9 +30,20 @@ class RSync(_base.BaseFetcher):
|
||||
owner = None,
|
||||
log = True,
|
||||
filechecks = None,
|
||||
offset = None,
|
||||
mtime = False,
|
||||
*args,
|
||||
**kwargs):
|
||||
super().__init__(domain, port, path, dest, owner = owner, filechecks = filechecks, *args, **kwargs)
|
||||
super().__init__(domain,
|
||||
port,
|
||||
path,
|
||||
dest,
|
||||
owner = owner,
|
||||
filechecks = filechecks,
|
||||
offset = offset,
|
||||
mtime = mtime
|
||||
*args,
|
||||
**kwargs)
|
||||
_logger.debug('Instantiated RSync fetcher')
|
||||
if rsync_args:
|
||||
self.rsync_args = rsync_args.args[:]
|
||||
@ -89,11 +101,14 @@ class RSync(_base.BaseFetcher):
|
||||
warnings.warn(errmsg)
|
||||
return(None)
|
||||
|
||||
def fetch_content(self, remote_filepath):
|
||||
def fetch_content(self, remote_filepath, mtime_only = False):
|
||||
tf = tempfile.mkstemp()[1]
|
||||
url = os.path.join(self.url.rstrip('/'),remote_filepath.lstrip('/'))
|
||||
url = os.path.join(self.url.rstrip('/'), remote_filepath.lstrip('/'))
|
||||
rsync_args = self.rsync_args[:]
|
||||
if mtime_only and not any((('--times' in rsync_args), ('-t' in rsync_args))):
|
||||
rsync_args.insert(0, '--times')
|
||||
cmd_str = ['rsync',
|
||||
*self.rsync_args,
|
||||
*rsync_args,
|
||||
url,
|
||||
tf]
|
||||
_logger.debug('Running command: {0}'.format(' '.join(cmd_str)))
|
||||
@ -119,8 +134,11 @@ class RSync(_base.BaseFetcher):
|
||||
_logger.error(errmsg)
|
||||
_logger.debug(debugmsg)
|
||||
warnings.warn(errmsg)
|
||||
with open(tf, 'rb') as fh:
|
||||
raw_content = fh.read()
|
||||
if mtime_only:
|
||||
raw_content = datetime.datetime.fromtimestamp(os.stat(tf).st_mtime)
|
||||
else:
|
||||
with open(tf, 'rb') as fh:
|
||||
raw_content = fh.read()
|
||||
os.remove(tf)
|
||||
return(raw_content)
|
||||
|
||||
|
@ -115,6 +115,11 @@ class Mount(object):
|
||||
return(None)
|
||||
|
||||
|
||||
class TimeOffset(object):
|
||||
def __init__(self, duration_str):
|
||||
self.mod, self.offset = get_duration(duration_str)
|
||||
|
||||
|
||||
class TimestampFile(object):
|
||||
def __init__(self, ts_xml, owner_xml = None):
|
||||
self.xml = ts_xml
|
||||
@ -197,16 +202,18 @@ class Upstream(object):
|
||||
self.port = constants.PROTO_DEF_PORTS[self.sync_type]
|
||||
self.available = None
|
||||
if self.sync_type == 'rsync':
|
||||
self.fetcher = fetcher.RSync(self.domain,
|
||||
self.port,
|
||||
self.path,
|
||||
self.dest,
|
||||
rsync_args = rsync_args,
|
||||
rsync_ignores = rsync_ignores,
|
||||
filechecks = self.filechecks,
|
||||
owner = self.owner)
|
||||
_fetcher = fetcher.RSync
|
||||
else:
|
||||
self.fetcher = fetcher.FTP(self.domain, self.port, self.path, self.dest, owner = self.owner)
|
||||
_fetcher = fetcher.FTP
|
||||
self.fetcher = _fetcher(self.domain,
|
||||
self.port,
|
||||
self.path,
|
||||
self.dest,
|
||||
rsync_args = rsync_args,
|
||||
rsync_ignores = rsync_ignores,
|
||||
filechecks = self.filechecks,
|
||||
offset = self.offset,
|
||||
owner = self.owner)
|
||||
self._check_conn()
|
||||
|
||||
def _check_conn(self):
|
||||
@ -224,14 +231,15 @@ class Upstream(object):
|
||||
delay = self.xml.attrib.get('delayCheck')
|
||||
if not delay:
|
||||
return(None)
|
||||
mod, self.delay = get_duration(delay)
|
||||
delay = TimeOffset(delay)
|
||||
self.delay = delay.offset
|
||||
return(None)
|
||||
|
||||
def _get_offset(self):
|
||||
offset = self.xml.attrib.get('offset')
|
||||
if not offset:
|
||||
return(None)
|
||||
self.offset = get_duration(offset)
|
||||
self.offset = TimeOffset(offset)
|
||||
return(None)
|
||||
|
||||
def sync(self):
|
||||
|
Loading…
Reference in New Issue
Block a user