optools/net/dhcp/dhcpcdump.py

213 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3
# INCOMPLETE
# See RFC 2131, Figure 1 and Table 1 (section 2)
# Much thanks to https://github.com/igordcard/dhcplease for digging into dhcpcd
# source for the actual file structure (and providing inspiration).
import argparse
import collections
import os
import re
import struct
from io import BytesIO
## DEFINE SOME PRETTY STUFF ##
class color(object):
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARKCYAN = '\033[36m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
class packetParser(object):
def __init__(self, data):
## Set the segment labels and struct formats
self.fmt = collections.OrderedDict()
# In the below, 'cnt' is how large (in octets) the field is.
# 'fmt' is a struct format string (https://docs.python.org/3/library/struct.html#format-characters)
# "op" through "hops" (incl.) may actually be '8B' instead of '8c'.
self.fmt['op'] = {'cnt': 8, 'fmt': '8c'} # this will always be \x02
self.fmt['htype'] = {'cnt': 8, 'fmt': '8c'} # this will always be \x01
self.fmt['hlen'] = {'cnt': 8, 'fmt': '8c'}
self.fmt['hops'] = {'cnt': 8, 'fmt': '8c'}
self.fmt['xid'] = {'cnt': 32, 'fmt': '8I'}
self.fmt['secs'] = {'cnt': 16, 'fmt': '8H'}
self.fmt['flags'] = {'cnt': 16, 'fmt': '8H'}
# "ciaddr" through "giaddr" (incl.) may actually be '4c' instead of '4B'.
self.fmt['ciaddr'] = {'cnt': 4, 'fmt': '4B'}
self.fmt['yiaddr'] = {'cnt': 4, 'fmt': '4B'}
self.fmt['siaddr'] = {'cnt': 4, 'fmt': '4B'}
self.fmt['giaddr'] = {'cnt': 4, 'fmt': '4B'}
# "chaddr" through "file" (incl.) may actually be <#>c instead of <#>B.
self.fmt['chaddr'] = {'cnt': 16, 'fmt': '16B'} # first 6 bytes used for MAC addr of client
self.fmt['sname'] = {'cnt': 64, 'fmt': '64B'} # server host name (via BOOTP)
self.fmt['file'] = {'cnt': 128, 'fmt': '128B'} # the boot filename (for BOOTP)
# OPTIONS - RFC 2132
# Starting at octet 320 (so, f.seek(319, 0)) to the end of the message are
# DHCP options. It's a variable-length field so it makes things tricky
# for us. But it's at *least* 312 octets long per the RFC?
# It probably starts with a magic.
#self.dhcp_opts = {'idx': 324, 'cnt': 4, 'fmt': '4c'}
#self.dhcp_opts = {'idx': 324, 'cnt': 4, 'fmt': None}
self.opts = {'magic': b'\x63\x82\x53\x63',
'struct': {'idx': 324, 'cnt': 4, 'fmt': '4B'},
'size': 0,
'bytes': b'\00'}
## Convert the data into a bytes object because struct.unpack() wants a stream
self.buf = BytesIO(data)
def getStd(self):
self.reconstructed_segments = collections.OrderedDict()
_idx = 0 # add to this with the 'cnt' value for each iteration.
for k in self.fmt.keys():
print('Segment: ' + k) # TODO: remove, this stuff goes in the printer
pkt = struct.Struct(self.fmt[k]['fmt'])
self.buf.seek(_idx, 0)
try:
self.reconstructed_segments[k] = pkt.unpack(self.buf.read(self.fmt[k]['cnt']))
except struct.error as e:
# Some DHCP implementations are... broken.
# I've noticed it mostly in Verizon Fi-OS gateways/WAPs/routers.
print('Warning({0}): {1}'.format(k, e))
self.buf.seek(_idx, 0)
_truesize = len(self.buf.read(self.fmt[k]['cnt']))
print('Length of bytes read: {0}'.format(_truesize))
# But sometimes it's... kind of fixable?
if k == 'file' and _truesize < self.fmt[k]['cnt']:
self.buf.seek(_idx, 0)
self.fmt[k] = {'cnt': _truesize, 'fmt': '{0}B'.format(_truesize)}
pkt = struct.Struct(self.fmt[k]['fmt'])
print('Struct format size automatically adjusted.')
try:
self.reconstructed_segments[k] = pkt.unpack(self.buf.read(self.fmt[k]['cnt']))
except struct.error as e2:
# yolo.
print('We still couldn\'t populate {0}; filling with a nullbyte.'.format(k))
print('Error (try #2): {0}'.format(e2))
print('We read {0} bytes.'.format(_truesize))
print('fmt: {0}'.format(self.fmt[k]['fmt']))
self.reconstructed_segments[k] = b'\00'
_idx += self.fmt[k]['cnt']
self.buf.seek(_idx, 0)
# Finally, check for opts. If they exist, populate.
_optbytes = len(self.buf.read())
if _optbytes >= 1:
self.opts['size'] = _optbytes
self.buf.seek(_idx, 0)
self.opts['bytes'] = self.buf.read() # read to the end
return()
def getOpts(self):
pass
def close(self):
self.buf.close()
def parseArgs():
args = argparse.ArgumentParser()
_deflease = '/var/lib/dhcpcd/'
args.add_argument('-l', '--lease',
metavar = '/path/to/lease/dir/or_file.lease',
default = _deflease,
dest = 'leasepath',
help = ('The path to the directory of lease files or specific lease file. ' +
'If a directory is provided, all lease files found within will be ' +
'parsed. Default: {0}{1}{2}').format(color.BOLD,
_deflease,
color.END))
args.add_argument('-n', '--no-color',
action = 'store_false',
dest = 'color',
help = ('If specified, suppress color formatting in output.'))
args.add_argument('-d', '--dump',
metavar = '/path/to/dumpdir',
default = False,
dest = 'dump',
help = ('If provided, dump the parsed leases to this directory (in ' +
'addition to printing). It will dump with the same filename ' +
'and overwrite any existing file with the same filename, so ' +
'do NOT use the same directory as your dhcpcd lease files! ' +
'({0}-l/--lease{1}). The directory will be created if it does ' +
'not exist').format(color.BOLD,
color.END))
args.add_argument('-p', '--pretty',
action = 'store_true',
dest = 'prettyprint',
help = ('If specified, include color formatting {0}in the dump ' +
'file(s){1}').format(color.BOLD, color.END))
return(args)
def getLeaseData(fpath):
if not os.path.isfile(fpath):
raise FileNotFoundError('{0} does not exist'.format(fpath))
with open(fpath, 'rb') as f:
_data = f.read()
return(_data)
def iterLease(args):
# If the lease path is a file, just operate on that.
# If it's a directory, iterate (recursively) through it.
leases = {}
if not os.path.lexists(args['leasepath']):
raise FileNotFoundError('{0} does not exist'.format(args['leasepath']))
if os.path.isfile(args['leasepath']):
_pp = packetParser(getLeaseData(args['leasepath']))
# TODO: convert the hex vals to their actual vals... maybe?
_keyname = re.sub('^(dhcpcd-)?(.*)\.lease$',
'\g<2>',
os.path.basename(args['leasepath']))
leases[_keyname] = leaseParse(_pp, args)
else:
# walk() instead of listdir() because whotf knows when some distro like
# *coughcoughUbuntucoughcough* will do some breaking change like creating
# subdirs based on iface name or something.
for _, _, files in os.walk(args['leasepath']):
if not files:
continue
files = [i for i in files if i.endswith('.lease')] # only get .lease files
for i in files:
_args = args.copy()
_fpath = os.path.join(args['leasepath'], i)
_keyname = re.sub('^(dhcpcd-)?(.*)\.lease$', '\g<2>', os.path.basename(_fpath))
_dupeid = 0
# JUST in case there are multiple levels of dirs in the future
# that have files of the sama name
while _keyname in leases.keys():
# TODO: convert the hex vals to their actual vals... maybe?
_keyname = re.sub('^$',
'\g<1>.{0}'.format(_dupeid),
_keyname)
_dupeid += 1
_pp = packetParser(getLeaseData(_fpath))
leases[_keyname] = leaseParse(_pp, _args, fname = _fpath)
return(leases)
def leaseParse(pp, args, fname = False):
# Essentially just a wrapper function.
# Debugging output...
if fname:
print(fname)
pp.getStd()
pp.getOpts()
if args['dump']:
pass # TODO: write to files, creating dump dir if needed, etc.
pp.close()
# do pretty-printing (color-coded segments, etc.) here
return(pp.reconstructed_segments)
if __name__ == '__main__':
args = vars(parseArgs().parse_args())
args['leasepath'] = os.path.abspath(os.path.expanduser(args['leasepath']))
if not os.path.lexists(args['leasepath']):
exit('{0} does not exist!'.format(args['leasepath']))
leases = iterLease(args)
# just print for now until we write the parser/prettyprinter
print(list(leases.keys()))