adding the rewrite...

This commit is contained in:
brent s 2017-09-19 05:09:33 -04:00
parent 23a0dfedb1
commit 4da7afdeaf
9 changed files with 1216 additions and 230 deletions

View File

@ -0,0 +1,42 @@
The __init__() function of kant.SigSession() takes a single argument: args.

it should be a dict, structured like this:

{'batch': False,
'checklevel': None,
'gpgdir': '/home/bts/.gnupg',
'keys': 'EFD9413B17293AFDFE6EA6F1402A088DEDF104CB,admin@sysadministrivia.com',
'keyservers': 'hkp://sks.mirror.square-r00t.net:11371,hkps://hkps.pool.sks-keyservers.net:443,http://pgp.mit.edu:80',
'local': 'false',
'notify': True,
'sigkey': '748231EBCBD808A14F5E85D28C004C2F93481F6B',
'testkeyservers': False,
'trustlevel': None}

The gpgdir, sigkey, and keyservers are set from system defaults in kant.parseArgs() if it's run interactively.
This *may* be reworked in the future to provide a mechanism for external calls to kant.SigSession() but for now,
it's up to you to provide all the data in the dict in the above format.

It will then internally verify these items and do various conversions, so that self.args becomes this:

{'batch': False,
'checklevel': None,
'gpgdir': '/home/bts/.gnupg',
'keys': ['EFD9413B17293AFDFE6EA6F1402A088DEDF104CB',
'admin@sysadministrivia.com'],
'keyservers': [{'port': [11371, ['tcp', 'udp']],
'proto': 'hkp',
'server': 'sks.mirror.square-r00t.net'},
{'port': [443, ['tcp']],
'proto': 'hkps',
'server': 'hkps.pool.sks-keyservers.net'},
{'port': [80, ['tcp']],
'proto': 'http',
'server': 'pgp.mit.edu'}],
'local': 'false',
'notify': True,
'rcpts': {'EFD9413B17293AFDFE6EA6F1402A088DEDF104CB': {'type': 'fpr'},
'admin@sysadministrivia.com': {'type': 'email'}},
'sigkey': '748231EBCBD808A14F5E85D28C004C2F93481F6B',
'testkeyservers': False,
'trustlevel': None}

View File

@ -0,0 +1,33 @@
The following functions are available within the SigSession() class:

getTpls()
Get the user-specified templates if they exist, otherwise set up stock ones.

modifyDirmngr(op)
*op* can be either:
new/start/replace - modify dirmngr to use the runtime-specified keyserver(s)
old/stop/restore - modify dirmngr back to the keyservers that were defined before modification

buildKeys()
build out the keys dict (see REF.keys.struct.txt).

getKeys()
fetch keys in the keys dict (see REF.keys.struct.txt) from a keyserver if they aren't found in the local keyring.

trustKeys()
set up trusts for the keys in the keys dict (see REF.keys.struct.txt). prompts for each trust not found/specified at runtime.

sigKeys()
sign keys in the keys dict (see REF.keys.struct.txt), either exportable or local depending on runtime specification.

pushKeys()
push keys in the keys dict (see REF.keys.struct.txt) to the keyservers specified at runtime (as long as they weren't specified to be local/non-exportable signatures; then we don't bother).

sendMails()
send emails to each of the recipients specified in the keys dict (see REF.keys.struct.txt).

serverParser(uri)
returns a dict of a keyserver URI broken up into separate components easier for parsing.

verifyArgs(locargs)
does some verifications, classifies certain data, calls serverParser(), etc.

View File

@ -0,0 +1,44 @@
TYPES:
d = dict
l = list
s = string
i = int
b = binary (True/False)
o = object

- pkey's dict key is the 40-char key ID of the primary key
- "==>" indicates the next item is a dict and the current item may contain one or more elements of the same format,
"++>" is a list,
"-->" is a "flat" item (string, object, int, etc.)
-"status" is one of "an UPGRADE", "a DOWNGRADE", or "a NEW TRUST".

keys(d) ==> (40-char key ID)(s) ==> pkey(d) --> email(s)
--> name(s)
--> creation (o, datetime)
--> key(o, gpg)
--> trust(i)
--> check(i)
--> local(b)
--> notify(b)
==> subkeys(d) ==> (40-char key ID)(s) --> creation
--> change(b)
--> sign(b)
--> status(s)
==> uids(d) ==> email(s) --> name(s)
--> comment(s)
--> email(s)
--> updated(o, datetime)

for email templates, they are looped over for each key dict as "key".
so for example, instead of specifying "keys['748231EBCBD808A14F5E85D28C004C2F93481F6B']['pkey']['name']",
you instead should specify "key['pkey']['name']". To get the name of e.g. the second uid,
you'd use "key['uids'][(uid email)]['name'].

the same structure is available via the "mykey" dictionary. e.g. to get the key ID of *your* key,
you can use "mykey['subkeys'][0][0]".

you also have the following variables/lists/etc. available for templates (via the Jinja2 templating syntax[0]):
- "keyservers", a list of keyservers set.


[0] http://jinja.pocoo.org/docs/2.9/templates/

View File

@ -1,225 +0,0 @@
'\" t
.\" Title: kant
.\" Author: Brent Saner
.\" Generator: Asciidoctor 1.5.5
.\" Date: 2017-09-07
.\" Manual: KANT - Keysigning and Notification Tool
.\" Source: KANT
.\" Language: English
.\"
.TH "KANT" "1" "2017-09-07" "KANT" "KANT \- Keysigning and Notification Tool"
.ie \n(.g .ds Aq \(aq
.el .ds Aq '
.ss \n[.ss] 0
.nh
.ad l
.de URL
\\$2 \(laURL: \\$1 \(ra\\$3
..
.if \n[.g] .mso www.tmac
.LINKSTYLE blue R < >
.SH "NAME"
kant \- Sign GnuPG/OpenPGP/PGP keys and notify the key owner(s)
.SH "SYNOPSIS"
.sp
\fBkant\fP [\fIOPTION\fP] \-k/\-\-key \fI<KEY_IDS|BATCHFILE>\fP
.SH "OPTIONS"
.sp
Keysigning (and keysigning parties) can be a lot of fun, and can offer someone with new keys a way into the WoT (Web\-of\-Trust).
Unfortunately, they can be intimidating to those new to the experience.
This tool offers a simple and easy\-to\-use interface to sign public keys (normal, local\-only, and/or non\-exportable),
set owner trust, specify level of checking done, and push the signatures to a keyserver. It even supports batch operation via a CSV file.
.sp
\fB\-h\fP, \fB\-\-help\fP
.RS 4
Display brief help/usage and exit.
.RE
.sp
\fB\-k\fP \fIKEY_IDS|BATCHFILE\fP, \fB\-\-key\fP \fIKEY_IDS|BATCHFILE\fP
.RS 4
A single or comma\-separated list of key IDs (see \fBKEY ID FORMAT\fP) to sign, trust, and notify. Can also be an email address.
If \fB\-b\fP/\fB\-\-batch\fP is specified, this should instead be a path to the batch file (see \fBBATCHFILE/Format\fP).
.RE
.sp
\fB\-K\fP \fIKEY_ID\fP, \fB\-\-sigkey\fP \fIKEY_ID\fP
.RS 4
The key to use when signing other keys (see \fBKEY ID FORMAT\fP). The default key is automatically determined at runtime
(it will be displayed in \fB\-h\fP/\fB\-\-help\fP output).
.RE
.sp
\fB\-t\fP \fITRUSTLEVEL\fP, \fB\-\-trust\fP \fITRUSTLEVEL\fP
.RS 4
The trust level to automatically apply to all keys (if not specified, KANT will prompt for each key).
See \fBBATCHFILE/TRUSTLEVEL\fP for trust level notations.
.RE
.sp
\fB\-c\fP \fICHECKLEVEL\fP, \fB\-\-check\fP \fICHECKLEVEL\fP
.RS 4
The level of checking that was done to confirm the validity of ownership for all keys being signed. If not specified,
the default is for KANT to prompt for each key we sign. See \fBBATCHFILE/CHECKLEVEL\fP for check level notations.
.RE
.sp
\fB\-l\fP \fILOCAL\fP, \fB\-\-local\fP \fILOCAL\fP
.RS 4
If specified, make the signature(s) local\-only (i.e. non\-exportable, don\(cqt push to a keyserver).
See \fBBATCHFILE/LOCAL\fP for more information on local signatures.
.RE
.sp
\fB\-n\fP, \fB\-\-no\-notify\fP
.RS 4
This requires some explanation. If you have MSMTP[1] installed and configured for the currently active user,
then we will send out emails to recipients letting them know we have signed their key. However, if MSMTP is installed and configured
but this flag is given, then we will NOT attempt to send emails.
.RE
.sp
\fB\-s\fP \fIKEYSERVER(S)\fP, \fB\-\-keyservers\fP \fIKEYSERVER(S)\fP
.RS 4
The comma\-separated keyserver(s) to push to. The default keyserver list is automatically generated at runtime.
.RE
.sp
\fB\-b\fP, \fB\-\-batch\fP
.RS 4
If specified, operate in batch mode. See \fBBATCHFILE\fP for more information.
.RE
.sp
\fB\-D\fP \fIGPGDIR\fP, \fB\-\-gpgdir\fP \fIGPGDIR\fP
.RS 4
The GnuPG configuration directory to use (containing your keys, etc.). The default is automatically generated at runtime,
but will probably be \fB/home/<yourusername>/.gnupg\fP or similar.
.RE
.sp
\fB\-T\fP, \fB\-\-testkeyservers\fP
.RS 4
If specified, initiate a basic test connection with each set keyserver before anything else. Disabled by default.
.RE
.SH "KEY ID FORMAT"
.sp
Key IDs can be specified in one of two ways. The first (and preferred) way is to use the full 160\-bit (40\-character, hexadecimal) key ID.
A little known fact is the fingerprint of a key:
.sp
\fBDEAD BEEF DEAD BEEF DEAD BEEF DEAD BEEF DEAD BEEF\fP
.sp
is actually the full key ID of the primary key; i.e.:
.sp
\fBDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF\fP
.sp
The second way to specify a key, as far as KANT is concerned, is to use an email address.
Do note that if more than one key is found that matches the email address given (and they usually are), you will be prompted to select the specific
correct key ID anyways so it\(cqs usually a better idea to have the owner present their full key ID/fingerprint right from the get\-go.
.SH "BATCHFILE"
.SS "Format"
.sp
The batch file is a CSV\-formatted (comma\-delimited) file containing keys to sign and other information about them. It keeps the following format:
.sp
\fBKEY_ID,TRUSTLEVEL,LOCAL,CHECKLEVEL,NOTIFY\fP
.sp
For more information on each column, reference the appropriate sub\-section below.
.SS "KEY_ID"
.sp
See \fBKEY ID FORMAT\fP.
.SS "TRUSTLEVEL"
.sp
The \fITRUSTLEVEL\fP is specified by the following levels (you can use either the numeric or string representation):
.sp
.if n \{\
.RS 4
.\}
.nf
\fB\-1 = Never
0 = Unknown
1 = Untrusted
2 = Marginal
3 = Full
4 = Ultimate\fP
.fi
.if n \{\
.RE
.\}
.sp
It is how much trust to assign to a key, and the signatures that key makes on other keys.[2]
.SS "LOCAL"
.sp
Whether or not to push to a keyserver. It can be either the numeric or string representation of the following:
.sp
.if n \{\
.RS 4
.\}
.nf
\fB0 = False
1 = True\fP
.fi
.if n \{\
.RE
.\}
.sp
If \fB1/True\fP, KANT will sign the key with a local signature (and the signature will not be pushed to a keyserver or be exportable).[3]
.SS "CHECKLEVEL"
.sp
The amount of checking that has been done to confirm that the owner of the key is who they say they are and that the key matches their provided information.
It can be either the numeric or string representation of the following:
.sp
.if n \{\
.RS 4
.\}
.nf
\fB0 = Unknown
1 = None
2 = Casual
3 = Careful\fP
.fi
.if n \{\
.RE
.\}
.sp
It is up to you to determine the classification of the amount of checking you have done, but the following is recommended (it is the policy
the author follows):
.sp
.if n \{\
.RS 4
.\}
.nf
\fBUnknown:\fP The key is unknown and has not been reviewed

\fBNone:\fP The key has been signed, but no confirmation of the
ownership of the key has been performed (typically
a local signature)

\fBCasual:\fP The key has been presented and the owner is either
known to the signer or they have provided some form
of non\-government\-issued identification or other
proof (website, Keybase.io, etc.)

\fBCareful:\fP The same as \fBCasual\fP requirements but they have
provided a government\-issued ID and all information
matches
.fi
.if n \{\
.RE
.\}
.sp
It\(cqs important to check each key you sign carefully. Failure to do so may hurt others\(aq trust in your key.[4]
.SH "SEE ALSO"
.sp
gpg(1), gpgconf(1)
.SH "RESOURCES"
.sp
\fBAuthor\(cqs web site:\fP \c
.URL "https://square\-r00t.net/" "" ""
\fBAuthor\(cqs GPG information:\fP \c
.URL "https://square\-r00t.net/gpg\-info" "" ""
.SH "COPYING"
.sp
Copyright (C) 2017 Brent Saner.
.sp
Free use of this software is granted under the terms of the GPLv3 License.
.SH "NOTES"
1. http://msmtp.sourceforge.net/
2. For more information on trust levels and the Web of Trust, see: https://www.gnupg.org/gph/en/manual/x334.html and https://www.gnupg.org/gph/en/manual/x547.html
3. For more information on pushing to keyservers and local signatures, see: https://www.gnupg.org/gph/en/manual/r899.html#LSIGN and https://lists.gnupg.org/pipermail/gnupg-users/2007-January/030242.html
4. GnuPG documentation refers to this as "validity"; see https://www.gnupg.org/gph/en/manual/x334.html
.SH "AUTHOR(S)"
.sp
\fBBrent Saner\fP
.RS 4
Author(s).
.RE

943
gpg/kant/kant.new.py Executable file
View File

@ -0,0 +1,943 @@
#!/usr/bin/env python3

import argparse
import base64
import csv
import datetime
import json
import lzma
import operator
import os
import re
import shutil
import smtplib
import subprocess
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from functools import reduce
from io import BytesIO
from socket import *
import urllib.parse
import jinja2 # non-stdlib; Arch package is python-jinja2
import gpg # non-stdlib; Arch package is "python-gpgme" - see:
import gpg.constants # https://git.archlinux.org/svntogit/packages.git/tree/trunk/PKGBUILD?h=packages/gpgme and
import gpg.errors # https://gnupg.org/ftp/gcrypt/gpgme/ (incl. python bindings in build)
import pprint # development debug


class SigSession(object): # see docs/REFS.funcs.struct.txt
def __init__(self, args):
# These are the "stock" templates for emails. It's a PITA, but to save some space since we store them
# inline in here, they're XZ'd and base64'd.
self.email_tpl = {}
self.email_tpl['plain'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AWYAs1dACQZSZhvFgKNdKNXbSf05z0ZPvTvmdQ0mJQg' +
'atgzhPVeLKxz22bhxedC813X5I8Gn2g9q9Do2jPPViyuLf1SFHDUx1m7SEFsSUyT7L/j71tuxRZi' +
'xLyy6P3mHo3HeUhwgpgh6lvMYwlTf+zhj3558RJmhXprLteoKt/sY4NIMzz3TBa0ivsVo6EFA9/G' +
'2q1MpuHxg86uY1pA4tkFlmxuSklZq5EKuu7B5RSeUGB+SsjDPSfsdhoPngMET1EXTZfVSWezjJkH' +
'REyn5SgqpD7vwmwvZWcwWua+V+e/rYYF1cx0Y0Wi1wAC6NzGDce3gbcr/k6M/PiMi8ekJPCmgMgP' +
'R4GBtRi121wU374nml42WjdGjHee6Se6d0OGKscJjB5Z1eSOho63OMn5Ayu4Lvl05L/mB88Hnk5e' +
'MayYmXqQdhx3ualWiD/TdHwLf+79t43wfjs6Z/aq/lku67SGdUpjuYRV52nls6WPTuBmo2oCbzpP' +
'MojIXiYHR3cWebI2CdnVTTHHz7el4NAWIlPKtZkPR6VYj2DkND4kmkO92I258hUqLARWnNaywx87' +
'hFzhaGN9oKZdozKSZpEDyZUsymWuhQnnY76EImeha67LJwSsXLpBxuViCNlqv7ATT9iffzDmjm0i' +
'2MeLix3rBRMWp4MmWC8bP1ZAKOEq4M3hwjt1q68fH4QvtmTyic/7DBW2KsgZRu21RjK7tHtKTxwy' +
'2UN3pGT6uZRL8vNRNvD70UaeD5MW7lFBPehIeFoByaEKGFfeS8dKc1VFauDmkRlhOboLkPqqwbV+' +
'tUmU8UvTftKIx1RwTm3FKqjKCYrdqp0fL5wsA93YhRJqHfkvtjCjzRc0czszURkJMHfPyttQg2jb' +
'UQVIg40XMgew2EUdCrJC3wTvXm9tBxMqXAFBn6S4ihqiJ5PTUDKCx7EnAjK3kUwbWvDno8h1M9u9' +
'teC5CEWhcAAAAAAwG5UqO9bdswAB6QWZCwAAKFDj9rHEZ/sCAAAAAARZWg==')
self.email_tpl['html'] = ('/Td6WFoAAATm1rRGAgAhARwAAAAQz1jM4AXiAt1dAB4aCobvStaHNqdVBn1LjZcL+G+98rmZ7eGZ' +
'Wqx+3LjENIv37L1aGPICZDRBrsBugzVSasRBHkdHMrWW7SsPfRzw6btQfASTHLr48auPJlJgXTnb' +
'vgDd2ELrs6p5kXZwE7+pedhgffAcekwr0eyqVNzzdUWJpvZcDBGtp+yvIwSAcrKkUxUd5AFBigd3' +
'4IW1XEK3eQ+LSSEIquUugrd3xiFB4rkSSDGbAuVLqy4Sq3w5c8RRAKavhfSn154/H/D+3RhqFHc/' +
'/x51rgXFvgJ/DwYrr9g7JV9EQB15JvJJxazgnWftZE0W0u05Z7QNrIZQMG6LjcSjf0ep1zNYrJkf' +
'UYOfHNjXGfpmIfG0Y/cNhU1Zqv3ohv6EGWk4B7CdLjXEeDYqkJgIGA6Q4FQXM6PF7blXFQJ3papF' +
'lH0iO+ElK3LZVcql+QcVt2Ci+hwiKzsUvV5ydnHezyViTYTppjlZzju5SxxddQg+7CwGzX9O8ys1' +
'8dlbMFHD2ruPd4Zig9B1TEKHSdmQQGwITNufbSGixbuOZAbfMXP1oQqSzYkbbx2ye8ddISOr/753' +
'deLOwaQpy6tK9nb1wYwsfhpmZriWYDcKRfjkgr0srxnC2iDlMB0Do+GCLVVlmju9qcxeObWoxUaX' +
'TqecRW4fbpa9xAIH0tZlOpIyPgGfm3CXkiGOs/J/QJ4C4spqNpwppoXg6EAig7Y9GStQyEsHXZrj' +
'vLAefyaseybuMC+9okhx8VYM8esuE2GVTbCbWhn8ZTi8posQ+zabXvk7KE5zwGHDcvSGg0bYctJj' +
'V6pExLCp1vCUdP3iP06OCFMINDnGR7ZP4Da/atBUuB/F0LN//x+HfwhEUpVTG52L7f6Qjd/LhvU2' +
'f/zVfMKlw5xXwTjBu2X1oRYfhyYFhgnDECEi9iuRiVwwtnUU39r2XoaGcnMTPnZe62oy2jqTp3p+' +
'Y+klB9jUwPUg2t5IxptZ0D/H5flD+pEAAAAAYczECM+Nfu0AAfkF4wsAAEsSt/GxxGf7AgAAAAAE' +
'WVo=')
# Set up a dict of some constants and mappings
self.maps = {}
# Keylist modes
self.maps['keylist'] = {'local': gpg.constants.KEYLIST_MODE_LOCAL, # local keyring
'remote': gpg.constants.KEYLIST_MODE_EXTERN, # keyserver
# both - this is SUPPOSED to work, but doesn't seem to... it's unreliable at best?
'both': gpg.constants.KEYLIST_MODE_LOCAL|gpg.constants.KEYLIST_MODE_EXTERN}
# Validity/trust levels
self.maps['trust'] = {-1: ['never', gpg.constants.VALIDITY_NEVER], # this is... probably? not ideal, but. Never trust the key.
0: ['unknown', gpg.constants.VALIDITY_UNKNOWN], # The key's trust is unknown - typically because it hasn't been set yet.
1: ['untrusted', gpg.constants.VALIDITY_UNDEFINED], # The key is explicitly set to a blank trust
2: ['marginal', gpg.constants.VALIDITY_MARGINAL], # Trust a little.
3: ['full', gpg.constants.VALIDITY_FULL], # This is going to be the default for verified key ownership.
4: ['ultimate', gpg.constants.VALIDITY_ULTIMATE]} # This should probably only be reserved for keys you directly control.
# Validity/trust reverse mappings - see self.maps['trust'] for the meanings of these
# Used for fetching display/feedback
self.maps['rtrust'] = {gpg.constants.VALIDITY_NEVER: 'Never',
gpg.constants.VALIDITY_UNKNOWN: 'Unknown',
gpg.constants.VALIDITY_UNDEFINED: 'Untrusted',
gpg.constants.VALIDITY_MARGINAL: 'Marginal',
gpg.constants.VALIDITY_FULL: 'Full',
gpg.constants.VALIDITY_ULTIMATE: 'Ultimate'}
# Local signature and other binary (True/False) mappings
self.maps['binmap'] = {0: ['no', False],
1: ['yes', True]}
# Level of care taken when checking key ownership/valid identity
self.maps['check'] = {0: ['unknown', 0],
1: ['none', 1],
2: ['casual', 2],
3: ['careful', 3]}
# Default protocol/port mappings for keyservers
self.maps['proto'] = {'hkp': [11371, ['tcp', 'udp']], # Standard HKP protocol
'hkps': [443, ['tcp']], # Yes, same as https
'http': [80, ['tcp']], # HTTP (plaintext)
'https': [443, ['tcp']], # SSL/TLS
'ldap': [389, ['tcp', 'udp']], # Includes TLS negotiation since it runs on the same port
'ldaps': [636, ['tcp', 'udp']]} # SSL
self.maps['hashalgos'] = {gpg.constants.MD_MD5: 'md5',
gpg.constants.MD_SHA1: 'sha1',
gpg.constants.MD_RMD160: 'ripemd160',
gpg.constants.MD_MD2: 'md2',
gpg.constants.MD_TIGER: 'tiger192',
gpg.constants.MD_HAVAL: 'haval',
gpg.constants.MD_SHA256: 'sha256',
gpg.constants.MD_SHA384: 'sha384',
gpg.constants.MD_SHA512: 'sha512',
gpg.constants.MD_SHA224: 'sha224',
gpg.constants.MD_MD4: 'md4',
gpg.constants.MD_CRC32: 'crc32',
gpg.constants.MD_CRC32_RFC1510: 'crc32rfc1510',
gpg.constants.MD_CRC24_RFC2440: 'crc24rfc2440'}
# Now that all the static data's set up, we can continue.
self.args = self.verifyArgs(args) # Make the args accessible to all functions in the class - see docs/REF.args.struct.txt
# Get the GPGME context
try:
os.environ['GNUPGHOME'] = self.args['gpgdir']
self.ctx = gpg.Context()
except:
raise RuntimeError('Could not use {0} as a GnuPG home'.format(self.args['gpgdir']))
self.cfgdir = os.path.join(os.environ['HOME'], '.kant')
if not os.path.isdir(self.cfgdir):
print('No KANT configuration directory found; creating one at {0}...'.format(self.cfgdir))
os.makedirs(self.cfgdir, exist_ok = True)
self.keys = {} # See docs/REF.keys.struct.txt
self.mykey = {} # ""
self.tpls = {} # Email templates will go here
self.getTpls() # Build out self.tpls
return(None)

def getEditPrompt(self, key): # "key" should be the FPR of the primary key
# This mapping defines the default "answers" to the gpgme key editing.
# https://www.apt-browse.org/browse/debian/wheezy/main/amd64/python-pyme/1:0.8.1-2/file/usr/share/doc/python-pyme/examples/t-edit.py
# https://searchcode.com/codesearch/view/20535820/
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=doc/DETAILS
# You can get the prompt identifiers and status indicators without grokking the source
# by first interactively performing the type of edit(s) you want to do with this command:
# gpg --status-fd 2 --command-fd 2 --edit-key <KEY_ID>
if key['trust'] >= gpg.constants.VALIDITY_FULL: # For tsigning, it only prompts for two trust levels:
_loctrust = 2 # "I trust fully"
else:
_loctrust = 1 # "I trust marginally"
# TODO: make the trust depth configurable. 1 is probably the safest, but we try to guess here.
# "Full" trust is a pretty big thing.
if key['trust'] >= gpg.constants.VALIDITY_FULL:
_locdepth = 2 # Allow +1 level of trust extension
else:
_locdepth = 1 # Only trust this key
_map = {'cmds': ['trust', 'fpr', 'sign', 'tsign', 'lsign', 'nrsign', 'grip', 'list', # Valid commands
'uid', 'key', 'check', 'deluid', 'delkey', 'delsig', 'pref', 'showpref',
'revsig', 'enable', 'disable', 'showphoto', 'clean', 'minimize', 'save',
'quit'],
'prompts': {'edit_ownertrust': {'value': str(key['trust']), # Pulled at time of call
'set_ultimate': {'okay': 'yes'}}, # If confirming ultimate trust, we auto-answer yes
'untrusted_key': {'override': 'yes'}, # We don't care if it's untrusted
'pklist': {'user_id': {'enter': key['pkey']['email']}}, # Prompt for a user ID - can we change this to key ID?
'sign_uid': {'class': str(key['check']), # The certification/"check" level
'okay': 'yes'}, # Are you sure that you want to sign this key with your key..."
'trustsig_prompt': {'trust_value': str(_loctrust), # This requires some processing; see above
'trust_depth': str(_locdepth), # The "depth" of the trust signature.
'trust_regexp': None}, # We can "Restrict" trust to certain domains, but this isn't really necessary.
'keyedit': {'prompt': 'trust', # Initiate trust editing
'save': {'okay': 'yes'}}}} # Save if prompted
return(_map)
def getTpls(self):
for t in ('plain', 'html'):
_tpl_file = os.path.join(self.cfgdir, 'email.{0}.j2'.format(t))
if os.path.isfile(_tpl_file):
with open(_tpl_file, 'r') as f:
self.tpls[t] = f.read()
else:
self.tpls[t] = lzma.decompress(base64.b64decode(email_tpl[t]),
format = lzma.FORMAT_XZ,
memlimit = None,
filters = None).decode('utf-8')
with open(_tpl_file, 'w') as f:
f.write('{0}'.format(self.tpls[t]))
print('Created: {0}'.format(tpl_file))
return(self.tpls)

def modifyDirmngr(self, op):
if not self.args['keyservers']:
return()
_pid = str(os.getpid())
_activecfg = os.path.join(self.args['gpgdir'], 'dirmngr.conf')
_activegpgconf = os.path.join(self.args['gpgdir'], 'gpg.conf')
_bakcfg = '{0}.{1}'.format(_activecfg, _pid)
_bakgpgconf = '{0}.{1}'.format(_activegpgconf, _pid)
## Modify files
if op in ('new', 'start', 'replace'):
# Replace the keyservers
if os.path.lexists(_activecfg):
shutil.copy2(_activecfg, _bakcfg)
with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write:
for line in read:
if not line.startswith('keyserver '):
write.write(line)
with open(_activecfg, 'a') as f:
for s in self.args['keyservers']:
_uri = '{0}://{1}:{2}'.format(s['proto'],
s['server'],
s['port'][0])
f.write('keyserver {0}\n'.format(_uri))
# Use stronger ciphers, etc. and prompt for check/certification levels
if os.path.lexists(_activegpgconf):
shutil.copy2(_activegpgconf, _bakgpgconf)
with open(_activegpgconf, 'w') as f:
f.write('cipher-algo AES256\ndigest-algo SHA512\ncert-digest-algo SHA512\ncompress-algo BZIP2\nask-cert-level\n')
## Restore files
if op in ('old', 'stop', 'restore'):
# Restore the keyservers
if os.path.lexists(_bakcfg):
with open(_bakcfg, 'r') as read, open(_activecfg, 'w') as write:
for line in read:
write.write(line)
os.remove(_bakcfg)
else:
os.remove(_activecfg)
# Restore GPG settings
if os.path.lexists(_bakgpgconf):
with open(_bakgpgconf, 'r') as read, open(_activegpgconf, 'w') as write:
for line in read:
write.write(line)
os.remove(_bakgpgconf)
else:
os.remove(_activegpgconf)
subprocess.run(['gpgconf', '--reload', 'dirmngr']) # I *really* wish we could do this via GPGME.
return()

def getKeys(self):
_keyids = []
_keys = {}
# Do we have the key already? If not, fetch.
for r in list(self.args['rcpts'].keys()):
if self.args['rcpts'][r]['type'] == 'fpr':
_keyids.append(r)
self.ctx.set_keylist_mode(self.maps['keylist']['remote'])
try:
_k = self.ctx.get_key(r)
except:
print('{0}: We could not find this key on the keyserver.'.format(r)) # Key not on server
del(self.args['rcpts'][r])
_keyids.remove(r)
continue
self.ctx.set_keylist_mode(self.maps['keylist']['local'])
_keys[r] = {'fpr': r,
'obj': _k,
'created': _k.subkeys[0].timestamp}
if 'T' in str(_keys[r]['created']):
_keys[r]['created'] = int(datetime.datetime.strptime(_keys[r]['created'],
'%Y%m%dT%H%M%S').timestamp())
if self.args['rcpts'][r]['type'] == 'email':
# We need to actually do a lookup on the email address.
_keytmp = []
for k in self.ctx.keylist(r, mode = self.maps['keylist']['remote']):
_keytmp.append(k)
for k in _keytmp:
_keys[k.fpr] = {'fpr': k.fpr,
'obj': k,
'created': k.subkeys[0].timestamp,
'uids': {}}
# Per the docs (<gpg>/docs/DETAILS, "*** Field 6 - Creation date"),
# they may change this to ISO 8601...
if 'T' in str(_keys[k.fpr]['created']):
_keys[k.fpr]['created'] = int(datetime.datetime.strptime(_keys[k.fpr]['created'],
'%Y%m%dT%H%M%S').timestamp())
for s in k.uids:
_keys[k.fpr]['uids'][s.email] = {'comment': s.comment,
'updated': s.last_update}
if len(_keytmp) > 1: # Print the keys and prompt for a selection.
print('\nWe found the following keys for {0} <{1}>...\n\nKEY ID:'.format(r, _orig_r))
for s in _keys[r]:
print(s, _keys[r])
print('{0}\n{1:6}(Generated at {2}) UIDs:'.format(k,
'',
datetime.datetime.utcfromtimestamp(s['updated'])))
for email in _keys[k]['uids']:
print('{0:42}(Generated {3}) <{2}> {1}'.format('',
s['uids'][email]['comment'],
email,
datetime.datetime.utcfromtimestamp(s['uids'][email]['updated'])))
print()
while True:
key = input('Please enter the (full) appropriate key: ')
if key not in keys.keys():
print('Please enter a full key ID from the list above or hit ctrl-d to exit.')
else:
_keyids.append(key)
break
else:
if len(_keytmp) == 0:
print('Could not find {0}!'.format(r))
del(self.args['rcpts'][r])
continue
_keyids.append(_keys[k.fpr]['fpr'])
print('\nFound key {0} for {1} (Generated at {2}):'.format(_keys[k.fpr]['fpr'],
r,
datetime.datetime.utcfromtimestamp(_keys[k.fpr]['created'])))
for email in _keys[k.fpr]['uids']:
print('\t(Generated {2}) {0} <{1}>'.format(_keys[k.fpr]['uids'][email]['comment'],
email,
datetime.datetime.utcfromtimestamp(_keys[k.fpr]['uids'][email]['updated'])))
print()
## And now we can (FINALLY) fetch the key(s).
# TODO: replace with gpg.keylist_mode(gpgme.KEYLIST_MODE_EXTERN) and internal mechanisms?
for g in _keyids:
try:
self.ctx.op_import_keys([_keys[g]['obj']])
except gpg.errors.GPGMEError:
print('Key {0} could not be found on the keyserver'.format(g)) # The key isn't on the keyserver
self.ctx.set_keylist_mode(self.maps['keylist']['local'])
for k in _keys:
_key = _keys[k]['obj']
self.keys[k] = {'pkey': {'email': _key.uids[0].email,
'name': _key.uids[0].name,
'creation': datetime.datetime.utcfromtimestamp(_keys[k]['created']),
'key': _key},
'trust': self.args['trustlevel'], # Not set yet; we'll modify this later in buildKeys().
'local': self.args['local'], # Not set yet; we'll modify this later in buildKeys().
'notify': self.args['notify'], # Same...
'sign': True, # We don't need to prompt for this since we detect if we need to sign or not
'change': None, # ""
'status': None} # Same.
# And we add the subkeys in yet another loop.
self.keys[k]['subkeys'] = {}
self.keys[k]['uids'] = {}
for s in _key.subkeys:
self.keys[k]['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp)
for u in _key.uids:
self.keys[k]['uids'][u.email] = {'name': u.name,
'comment': u.comment,
'updated': datetime.datetime.utcfromtimestamp(u.last_update)}
del(_keys)
return()

def buildKeys(self):
self.getKeys()
# Before anything else, let's set up our own key info.
_key = self.ctx.get_key(self.args['sigkey'], secret = True)
self.mykey = {'pkey': {'email': _key.uids[0].email,
'name': _key.uids[0].name,
'creation': datetime.datetime.utcfromtimestamp(_key.subkeys[0].timestamp),
'key': _key},
'trust': 'ultimate', # No duh. This is our own key.
'local': False, # We keep our own key array separate, so we don't push it anyways.
'notify': False, # ""
'check': None, # ""
'change': False, # ""
'status': None, # ""
'sign': False} # ""
self.mykey['subkeys'] = {}
self.mykey['uids'] = {}
for s in _key.subkeys:
self.mykey['subkeys'][s.fpr] = datetime.datetime.utcfromtimestamp(s.timestamp)
for u in _key.uids:
self.mykey['uids'][u.email] = {'name': u.name,
'comment': u.comment,
'updated': datetime.datetime.utcfromtimestamp(u.last_update)}
# Now let's set up our trusts.
if self.args['batch']:
self.batchParse()
else:
for k in list(self.keys.keys()):
self.promptTrust(k)
self.promptCheck(k)
self.promptLocal(k)
self.promptNotify(k)
# In case we removed any keys, we have to run this outside of the loops
for k in list(self.keys.keys()):
for t in ('trust', 'local', 'check', 'notify'):
self.keysCleanup(k, t)
# TODO: populate self.keys[key]['change']; we use this for trust (but not sigs)
return()

def batchParse(self):
# First we grab the info from CSV
csvlines = csv.reader(self.csvraw, delimiter = ',', quotechar = '"')
for row in csvlines:
row[0] = row[0].replace('<', '').replace('>', '')
try:
if self.args['rcpts'][row[0]]['type'] == 'fpr':
k = row[0]
else: # It's an email.
key_set = False
while not key_set:
for i in list(self.keys.keys()):
if row[0] in list(self.keys[i]['uids'].keys()):
k = i
key_set = True
self.keys[k]['trust'] = row[1].lower().strip()
self.keys[k]['local'] = row[2].lower().strip()
self.keys[k]['check'] = row[3].lower().strip()
self.keys[k]['notify'] = row[4].lower().strip()
except KeyError:
continue # It was deemed to be an invalid key earlier
return()

def promptTrust(self, k):
if 'trust' not in self.keys[k].keys() or not self.keys[k]['trust']:
trust_in = input(('\nWhat trust level should we assign to {0}? (The default is '+
'Marginal.)\n\t\t\t\t ({1} <{2}>)' +
'\n\n\t\033[1m-1 = Never\n\t 0 = Unknown\n\t 1 = Untrusted\n\t 2 = Marginal\n\t 3 = Full' +
'\n\t 4 = Ultimate\033[0m\nTrust: ').format(k,
self.keys[k]['pkey']['name'],
self.keys[k]['pkey']['email']))
if trust_in == '':
trust_in = 'marginal' # Has to be a str, so we can "pretend" it was entered
self.keys[k]['trust'] = trust_in
return()

def promptCheck(self, k):
if 'check' not in self.keys[k].keys() or self.keys[k]['check'] == None:
check_in = input(('\nHow carefully have you checked {0}\'s validity of identity/ownership of the key? ' +
'(Default is Unknown.)\n' +
'\n\t\033[1m0 = Unknown\n\t1 = None\n\t2 = Casual\n\t3 = Careful\033[0m\nCheck level: ').format(k))
if check_in == '':
check_in == 'unknown'
self.keys[k]['check'] = check_in
return()
def promptLocal(self, k):
if 'local' not in self.keys[k].keys() or self.keys[k]['local'] == None:
if self.args['keyservers']:
local_in = input(('\nShould we locally sign {0} '+
'(if yes, the signature will be non-exportable; if no, we will be able to push to a keyserver) ' +
'(Yes/\033[1mNO\033[0m)? ').format(k))
if local_in == '':
local_in = False
self.keys[k]['local'] = local_in
return()

def promptNotify(self, k):
if 'notify' not in self.keys[k].keys() or self.keys[k]['notify'] == None:
notify_in = input(('\nShould we notify {0} (via <{1}>) (\033[1mYES\033[0m/No)? ').format(k,
self.keys[k]['pkey']['email']))
if notify_in == '':
notify_in = True
self.keys[k]['local'] = local_in
return()

def keysCleanup(self, k, t): # At some point, this WHOLE thing would probably be cleaner with bitwise flags...
s = t
_errs = {'trust': 'trust level',
'local': 'local signature option',
'check': 'check level',
'notify': 'notify flag'}
if k not in self.keys.keys():
return() # It was deleted already.
if t in ('local', 'notify'): # these use a binary mapping
t = 'binmap'
# We can do some basic stuff right here.
if str(self.keys[k][s]).lower() in ('n', 'no', 'false'):
self.keys[k][s] = False
return()
elif str(self.keys[k][s]).lower() in ('y', 'yes', 'true'):
self.keys[k][s] = True
return()
# Make sure we have a known value. These will ALWAYS be str's, either from the CLI or CSV.
value_in = str(self.keys[k][s]).lower().strip()
for dictk, dictv in self.maps[t].items():
if value_in == dictv[0]:
self.keys[k][s] = int(dictk)
elif value_in == str(dictk):
self.keys[k][s] = int(dictk)
if not isinstance(self.keys[k][s], int): # It didn't get set
print('{0}: "{1}" is not a valid {2}; skipping. Run kant again to fix.'.format(k, self.keys[k][s], _errs[s]))
del(self.keys[k])
return()
return()
def sigKeys(self): # The More Business-End(TM)
# NOTE: If the trust level is anything but 2 (the default), we should use op_interact() instead and do a tsign.
self.ctx.keylist_mode = gpg.constants.KEYLIST_MODE_SIGS
_mkey = self.mykey['pkey']['key']
self.ctx.signers = [_mkey]
for k in list(self.keys.keys()):
key = self.keys[k]['pkey']['key']
for uid in key.uids:
for s in uid.signatures:
try:
signerkey = ctx.get_key(s.keyid).subkeys[0].fpr
if signerkey == mkey.subkeys[0].fpr:
self.trusts[k]['sign'] = False # We already signed this key
except gpgme.GpgError:
pass # usually if we get this it means we don't have a signer's key in our keyring
# And again, we loop. ALLLLL that buildup for one line.
for k in list(self.keys.keys()):
# TODO: configure to allow for user-entered expiration?
if self.keys[k]['sign']:
self.ctx.key_sign(self.keys[k]['pkey']['key'], local = self.keys[k]['local'])
return()

class KeyEditor(object):
def __init__(self, optmap):
self.replied_once = False # This is used to handle the first prompt vs. the last
self.optmap = optmap
return(None)

def editKey(self, status, args, out):
_result = None
out.seek(0, 0)
def mapDict(m, d):
return(reduce(operator.getitem, m, d))
if args == 'keyedit.prompt' and self.replied_once:
_result = 'quit'
elif status == 'KEY_CONSIDERED':
_result = None
self.replied_once = False
elif status == 'GET_LINE':
self.replied_once = True
_ilist = args.split('.')
_result = mapDict(_ilist, self.optmap['prompts'])
if not _result:
_result = None
return(_result)

def trustKeys(self): # The Son of Business-End(TM)
# TODO: add check for change
for k in list(self.keys.keys()):
_key = self.keys[k]
_map = self.getEditPrompt(_key)
out = gpg.Data()
self.ctx.interact(_key['pkey']['key'], self.KeyEditor(_map).editKey, sink = out, fnc_value = out)
out.seek(0, 0)
return()
def pushKeys(self): # The Last Business-End(TM)
for k in list(self.keys.keys()):
if not self.keys[k]['local'] and self.keys[k]['sign']:
self.ctx.op_export(k, gpg.constants.EXPORT_MODE_EXTERN, None)
return()

class Mailer(object): # I lied; The Return of the Business-End(TM)
def __init__(self):
_homeconf = os.path.join(os.environ['HOME'], '.msmtprc')
_sysconf = '/etc/msmtprc'
self.msmtp = {'path': None}
if not os.path.isfile(_homeconf):
if not os.path.isfile(_sysconf):
self.msmtp['conf'] = False
else:
self.msmtp['conf'] = _sysconf
else:
self.msmtp['conf'] = _homeconf
if os.path.isfile(self.msmtp['conf']):
for p in (os.environ['PATH']).split(':'):
if os.path.isfile(os.path.join(p, 'msmtp')):
self.msmtp['path'] = os.path.join(p, 'msmtp')
if self.msmtp['path']:
# Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary.
# Now we need to parse the config.
self.msmtp['cfg'] = self.getCfg()
return(None)

def getCfg(self):
cfg = {'default': None, 'defaults': {}}
_defaults = False
_acct = None
with open(self.msmtp['conf'], 'r') as f:
_cfg_raw = f.read()
for l in _cfg_raw.splitlines():
if re.match('^\s?(#.*|)$', l):
continue # Skip over blank and commented lines
_line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)]
if _line[0] == 'account':
if re.match('^default\s?:\s?', _line[1]): # it's the default account specifier
cfg['default'] = _line[1].split(':', maxsplit = 1)[1].strip()
else:
if _line[1] not in cfg.keys(): # it's a new account definition
cfg[_line[1]] = {}
_acct = _line[1]
_defaults = False
elif _line[0] == 'defaults': # it's the defaults
_acct = 'defaults'
else: # it's a config directive
cfg[_acct][_line[0]] = _line[1]
for a in list(cfg):
if a != 'default':
for k, v in cfg['defaults'].items():
if k not in cfg[a].keys():
cfg[a][k] = v
del(cfg['defaults'])
return(cfg)

def sendEmail(self, msg, key, profile): # This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc.
if 'tls-starttls' in self.msmtp['cfg'][profile].keys() and self.msmtp['cfg'][profile]['tls-starttls'] == 'on':
smtpserver = smtplib.SMTP(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port']))
smtpserver.ehlo()
smtpserver.starttls()
# we need to EHLO twice with a STARTTLS because email is weird.
elif self.msmtp['cfg'][profile]['tls'] == 'on':
smtpserver = smtplib.SMTP_SSL(self.msmtp['cfg'][profile]['host'], int(self.msmtp['cfg'][profile]['port']))
smtpserver.ehlo()
smtpserver.login(self.msmtp['cfg'][profile]['user'], self.msmtp['cfg'][profile]['password'])
smtpserver.sendmail(self.msmtp['cfg'][profile]['user'], key['pkey']['email'], msg.as_string())
smtpserver.close()
return()
def postalWorker(self):
m = self.Mailer()
if 'KANT' in m.msmtp['cfg'].keys():
_profile = 'KANT'
else:
_profile = m.msmtp['cfg']['default'] # TODO: let this be specified on the CLI args?
if 'user' not in m.msmtp['cfg'][_profile].keys() or not m.msmtp['cfg'][_profile]['user']:
return() # We don't have MSMTP configured.
# Reconstruct the keyserver list.
_keyservers = []
for k in self.args['keyservers']:
_keyservers.append('{0}://{1}:{2}'.format(k['proto'], k['server'], k['port'][0]))
# Export our key so we can attach it.
_pubkeys = {}
for e in ('asc', 'gpg'):
if e == 'asc':
self.ctx.armor = True
else:
self.ctx.armor = False
_pubkeys[e] = gpg.Data() # This is a data buffer to store your ASCII-armored pubkeys
self.ctx.op_export_keys([self.mykey['pkey']['key']], 0, _pubkeys[e])
_pubkeys[e].seek(0, 0) # Read with e.g. _sigs['asc'].read()
for k in list(self.keys.keys()):
if self.keys[k]['notify']:
_body = {}
for t in list(self.tpls.keys()):
# There's gotta be a more efficient way of doing this...
#_tplenv = jinja2.Environment(loader = jinja2.BaseLoader()).from_string(self.tpls[t])
_tplenv = jinja2.Environment().from_string(self.tpls[t])
_body[t] = _tplenv.render(key = self.keys[k],
mykey = self.mykey,
keyservers = _keyservers)
b = MIMEMultipart('alternative') # Set up a body
for c in _body.keys():
b.attach(MIMEText(_body[c], c))
bmsg = MIMEMultipart()
bmsg.attach(b)
for s in _pubkeys.keys():
_attchmnt = MIMEApplication(_pubkeys[s].read(), '{0}.{1}'.format(self.mykey['pkey']['key'].fpr, s))
_attchmnt['Content-Disposition'] = 'attachment; filename="{0}.{1}"'.format(self.mykey['pkey']['key'].fpr, s)
bmsg.attach(_attchmnt)
# Now we sign the body. This incomprehensible bit monkey-formats bmsg to be a multi-RFC-compatible
# string, which is then passed to our gpgme instance's signing mechanishm, and the output of that is
# returned as plaintext. Whew.
self.ctx.armor = True
_sig = self.ctx.sign((bmsg.as_string().replace('\n', '\r\n')).encode('utf-8'),
mode = gpg.constants.SIG_MODE_DETACH)
imsg = Message() # Build yet another intermediate message...
imsg['Content-Type'] = 'application/pgp-signature; name="signature.asc"'
imsg['Content-Description'] = 'OpenPGP digital signature'
imsg.set_payload(_sig[0].decode('utf-8'))
msg = MIMEMultipart(_subtype = 'signed',
micalg = "pgp-{0}".format(self.maps['hashalgos'][_sig[1].signatures[0].hash_algo]),
protocol = 'application/pgp-signature')
msg.attach(bmsg) # Attach the body (plaintext, html, pubkey attachmants)
msg.attach(imsg) # Attach the isignature
msg['To'] = self.keys[k]['pkey']['email']
if 'from' in m.msmtp['cfg'][_profile].keys():
msg['From'] = m.msmtp['cfg'][_profile]['from']
else:
msg['From'] = self.mykey['pkey']['email']
msg['Subject'] = 'Your GnuPG/PGP key has been signed'
msg['Openpgp'] = 'id={0}'.format(self.mykey['pkey']['key'].fpr)
msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z')
msg['User-Agent'] = 'KANT (part of the r00t^2 OpTools suite: https://git.square-r00t.net/OpTools)'
m.sendEmail(msg, self.keys[k], _profile) # Send the email
for d in (msg, imsg, bmsg, b, _body, _tplenv): # Not necessary, but it pays to be paranoid; we do NOT want leaks.
del(d)
del(m)
return()
def saveResults(self):
_cachedir = os.path.join(self.cfgdir, 'cache', datetime.datetime.utcnow().strftime('%Y.%m.%d_%H.%M.%S'))
os.makedirs(_cachedir, exist_ok = True)
for k in self.keys.keys():
_keyout = self.keys[k]
# We need to normalize the datetime objects and gpg objects to strings
_keyout['pkey']['creation'] = str(self.keys[k]['pkey']['creation'])
_keyout['pkey']['key'] = '<GPGME object>'
for u in list(_keyout['uids'].keys()):
_keyout['uids'][u]['updated'] = str(self.keys[k]['uids'][u]['updated'])
for s in list(_keyout['subkeys'].keys()):
_keyout['subkeys'][s] = str(self.keys[k]['subkeys'][s])
_fname = os.path.join(_cachedir, '{0}.json'.format(k))
with open(_fname, 'a') as f:
f.write('{0}\n'.format(json.dumps(_keyout, sort_keys = True, indent = 4)))
del(_keyout)
# And let's grab a copy of our key in the state that it exists in currently
_mykey = self.mykey
# We need to normalize the datetime objects and gpg objects to strings again
_mykey['pkey']['creation'] = str(_mykey['pkey']['creation'])
_mykey['pkey']['key'] = '<GPGME object>'
for u in list(_mykey['uids'].keys()):
_mykey['uids'][u]['updated'] = str(self.mykey['uids'][u]['updated'])
for s in list(_mykey['subkeys'].keys()):
_mykey['subkeys'][s] = str(self.mykey['subkeys'][s])
with open(os.path.join(_cachedir, '_SIGKEY.json'), 'w') as f:
f.write('{0}\n'.format(json.dumps(_mykey, sort_keys = True, indent = 4)))
return()

def serverParser(self, uri):
# https://en.wikipedia.org/wiki/Key_server_(cryptographic)#Keyserver_examples
_server = {}
_urlobj = urllib.parse.urlparse(uri)
_server['proto'] = _urlobj.scheme
_lazy = False
if not _server['proto']:
_server['proto'] = 'hkp' # Default
_server['server'] = _urlobj.hostname
if not _server['server']:
_server['server'] = re.sub('^([A-Za-z]://)?(.+[^:][^0-9])(:[0-9]+)?$', '\g<2>', uri, re.MULTILINE)
_lazy = True
_server['port'] = _urlobj.port
if not _server['port']:
if _lazy:
_p = re.sub('.*:([0-9]+)$', '\g<1>', uri, re.MULTILINE)
_server['port'] = self.maps['proto'][_server['proto']] # Default
return(_server)

def verifyArgs(self, locargs):
## Some pythonization...
if not locargs['batch']:
locargs['keys'] = [re.sub('\s', '', k) for k in locargs['keys'].split(',')]
else:
## Batch file
_batchfilepath = os.path.abspath(os.path.expanduser(locargs['keys']))
if not os.path.isfile(_batchfilepath):
raise ValueError('{0} does not exist or is not a regular file.'.format(_batchfilepath))
else:
with open(_batchfilepath, 'r') as f:
self.csvraw = f.readlines()
locargs['keys'] = _batchfilepath
locargs['keyservers'] = [re.sub('\s', '', s) for s in locargs['keyservers'].split(',')]
locargs['keyservers'] = [self.serverParser(s) for s in locargs['keyservers']]
## Key(s) to sign
locargs['rcpts'] = {}
if not locargs['batch']:
_keyiter = locargs['keys']
else:
_keyiter = []
for row in csv.reader(self.csvraw, delimiter = ',', quotechar = '"'):
_keyiter.append(row[0])
for k in _keyiter:
locargs['rcpts'][k] = {}
try:
int(k, 16)
_ktype = 'fpr'
except: # If it isn't a valid key ID...
if not re.match('^<?[\w\.\+\-]+\@[\w-]+\.[a-z]{2,3}>?$', k): # is it an email address?
raise ValueError('{0} is not a valid email address'.format(k))
else:
r = k.replace('<', '').replace('>', '')
locargs['rcpts'][r] = locargs['rcpts'][k]
del(locargs['rcpts'][k])
k = r
_ktype = 'email'
locargs['rcpts'][k]['type'] = _ktype
# Security is important. We don't want users getting collisions, so we don't allow shortened key IDs.
if _ktype == 'fpr' and not len(k) == 40:
raise ValueError('{0} is not a full 40-char key ID or key fingerprint'.format(k))
## Signing key
if not locargs['sigkey']:
raise ValueError('A key for signing is required') # We need a key we can sign with.
else:
if not os.path.lexists(locargs['gpgdir']):
raise FileNotFoundError('{0} does not exist'.format(locargs['gpgdir']))
elif os.path.isfile(locargs['gpgdir']):
raise NotADirectoryError('{0} is not a directory'.format(locargs['gpgdir']))
# Now we need to verify that the private key exists...
try:
_ctx = gpg.Context()
_sigkey = _ctx.get_key(locargs['sigkey'], True)
except gpg.errors.GPGMEError or gpg.errors.KeyNotFound:
raise ValueError('Cannot use key {0}'.format(locargs['sigkey']))
# And that it is an eligible candidate to use to sign.
if not _sigkey.can_sign or True in (_sigkey.revoked, _sigkey.expired, _sigkey.disabled):
raise ValueError('{0} is not a valid candidate for signing'.format(locargs['sigkey']))
## Keyservers
if locargs['testkeyservers']:
for s in locargs['keyservers']:
# Test to make sure the keyserver is accessible.
_v6test = socket(AF_INET6, SOCK_DGRAM)
try:
_v6test.connect(('ipv6.square-r00t.net', 0))
_nettype = AF_INET6 # We have IPv6 intarwebz
except:
_nettype = AF_INET # No IPv6, default to IPv4
for _proto in locargs['keyservers'][s]['port'][1]:
if _proto == 'udp':
_netproto = SOCK_DGRAM
elif _proto == 'tcp':
_netproto = SOCK_STREAM
_sock = socket(nettype, netproto)
_sock.settimeout(10)
_tests = _sock.connect_ex((locargs['keyservers'][s]['server'],
int(locargs['keyservers'][s]['port'][0])))
_uristr = '{0}://{1}:{2} ({3})'.format(locargs['keyservers'][s]['proto'],
locargs['keyservers'][s]['server'],
locargs['keyservers'][s]['port'][0],
_proto.upper())
if not tests == 0:
raise OSError('Keyserver {0} is not available'.format(_uristr))
else:
print('Keyserver {0} is accepting connections.'.format(_uristr))
sock.close()
return(locargs)
def parseArgs():
def getDefGPGDir():
try:
gpgdir = os.environ['GNUPGHOME']
except KeyError:
try:
homedir = os.environ['HOME']
gpgdchk = os.path.join(homedir, '.gnupg')
except KeyError:
# There is no reason that this should ever get this far, but... edge cases be crazy.
gpgdchk = os.path.join(os.path.expanduser('~'), '.gnupg')
if os.path.isdir(gpgdchk):
gpgdir = gpgdchk
else:
gpgdir = None
return(gpgdir)
def getDefKey(defgpgdir):
os.environ['GNUPGHOME'] = defgpgdir
if not defgpgdir:
return(None)
defkey = None
ctx = gpg.Context()
for k in ctx.keylist(None, secret = True): # "None" is query string; this grabs all keys in the private keyring
if k.can_sign and True not in (k.revoked, k.expired, k.disabled):
defkey = k.subkeys[0].fpr
break # We'll just use the first primary key we find that's valid as the default.
return(defkey)
def getDefKeyservers(defgpgdir):
srvlst = [None]
# We don't need these since we use the gpg agent. Requires GPG 2.1 and above, probably.
#if os.path.isfile(os.path.join(defgpgdir, 'dirmngr.conf')):
# pass
dirmgr_out = subprocess.run(['gpg-connect-agent', '--dirmngr', 'keyserver', '/bye'], stdout = subprocess.PIPE)
for l in dirmgr_out.stdout.decode('utf-8').splitlines():
#if len(l) == 3 and l.lower().startswith('s keyserver'): # It's a keyserver line
if l.lower().startswith('s keyserver'): # It's a keyserver line
s = l.split()[2]
if len(srvlst) == 1 and srvlst[0] == None:
srvlst = [s]
else:
srvlst.append(s)
return(','.join(srvlst))
defgpgdir = getDefGPGDir()
defkey = getDefKey(defgpgdir)
defkeyservers = getDefKeyservers(defgpgdir)
args = argparse.ArgumentParser(description = 'Keysigning Assistance and Notifying Tool (KANT)',
epilog = 'brent s. || 2017 || https://square-r00t.net')
args.add_argument('-k',
'--keys',
dest = 'keys',
metavar = 'KEYS | /path/to/batchfile',
required = True,
help = 'A single/comma-separated list of keys to sign, ' +
'trust, & notify. Can also be an email address. ' +
'If -b/--batch is specified, this should instead be ' +
'a path to the batch file. See the man page for more info.')
args.add_argument('-K',
'--sigkey',
dest = 'sigkey',
default = defkey,
help = 'The key to use when signing other keys. Default is \033[1m{0}\033[0m.'.format(defkey))
args.add_argument('-t',
'--trust',
dest = 'trustlevel',
default = None,
help = 'The trust level to automatically apply to all keys ' +
'(if not specified, kant will prompt for each key). ' +
'See BATCHFILE/TRUSTLEVEL in the man page for trust ' +
'level notations.')
args.add_argument('-c',
'--check',
dest = 'checklevel',
default = None,
help = 'The level of checking done (if not specified, kant will ' +
'prompt for each key). See -b/--batch for check level notations.')
args.add_argument('-l',
'--local',
dest = 'local',
default = None,
help = 'Make the signature(s) local-only (i.e. don\'t push to a keyserver).')
args.add_argument('-n',
'--no-notify',
dest = 'notify',
action = 'store_false',
help = 'If specified, do NOT notify any key recipients that you\'ve signed ' +
'their key, even if KANT is able to.')
args.add_argument('-s',
'--keyservers',
dest = 'keyservers',
default = defkeyservers,
help = 'The comma-separated keyserver(s) to push to.\n' +
'Default keyserver list is: \n\n\t\033[1m{0}\033[0m\n\n'.format(re.sub(',',
'\n\t',
defkeyservers)))
args.add_argument('-b',
'--batch',
dest = 'batch',
action = 'store_true',
help = 'If specified, -k/--keys is a CSV file to use as a ' +
'batch run. See the BATCHFILE section in the man page for more info.')
args.add_argument('-D',
'--gpgdir',
dest = 'gpgdir',
default = defgpgdir,
help = 'The GnuPG configuration directory to use (containing\n' +
'your keys, etc.); default is \033[1m{0}\033[0m.'.format(defgpgdir))
args.add_argument('-T',
'--testkeyservers',
dest = 'testkeyservers',
action = 'store_true',
help = 'If specified, initiate a test connection with each\n'
'set keyserver before anything else. Disabled by default.')
return(args)





def main():
# This could be cleaner-looking, but we do it this way so the class can be used externally
# with a dict instead of an argparser result.
args = vars(parseArgs().parse_args())
sess = SigSession(args)
sess.modifyDirmngr('new')
sess.buildKeys()
sess.sigKeys()
sess.trustKeys()
sess.pushKeys()
sess.postalWorker()
sess.saveResults()
sess.modifyDirmngr('old')

if __name__ == '__main__':
main()

View File

@ -4,6 +4,7 @@ import argparse
import csv import csv
import datetime import datetime
import email import email
import jinja2
import os import os
import re import re
import shutil import shutil
@ -42,13 +43,13 @@ import gpg # non-stdlib; Arch package is "python-gpgme" - see
class sigsession(object): class sigsession(object):
def __init__(self, args): def __init__(self, args):
self.args = args self.args = args
self.keyids = []


def getKeys(self): def getKeys(self):
# Get our concept # Get our context
os.environ['GNUPGHOME'] = self.args['gpgdir'] os.environ['GNUPGHOME'] = self.args['gpgdir']
ctx = gpg.Context() ctx = gpg.Context()
keys = {} keys = {}
self.keyids = []
# Do we have the key already? If not, fetch. # Do we have the key already? If not, fetch.
for k in list(self.args['rcpts']): for k in list(self.args['rcpts']):
if self.args['rcpts'][k]['type'] == 'fpr': if self.args['rcpts'][k]['type'] == 'fpr':

View File

@ -3,12 +3,18 @@
# This is less of a test suite and more of an active documentation on some python-gpgme (https://pypi.python.org/pypi/gpg) examples. # This is less of a test suite and more of an active documentation on some python-gpgme (https://pypi.python.org/pypi/gpg) examples.
# Because their only documentation for the python bindings is in pydoc, and the C API manual is kind of useless. # Because their only documentation for the python bindings is in pydoc, and the C API manual is kind of useless.


import datetime
import gpg import gpg
import gpg.constants import gpg.constants
import inspect import inspect
import jinja2
import os import os
import pprint import pprint
import re import re
import smtplib
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import subprocess import subprocess
import operator import operator
from functools import reduce from functools import reduce
@ -107,7 +113,7 @@ class KeyEditor(object):


def edit_fnc(self, status, args, out): def edit_fnc(self, status, args, out):
result = None result = None
out.seek(0,0) out.seek(0, 0)
#print(status, args) #print(status, args)
#print(out.read().decode('utf-8')) #print(out.read().decode('utf-8'))
#print('{0} ({1})'.format(status, args)) #print('{0} ({1})'.format(status, args))
@ -130,5 +136,147 @@ class KeyEditor(object):
# Test setting trust # Test setting trust
out = gpg.Data() out = gpg.Data()
ctx.interact(tkey2, KeyEditor().edit_fnc, sink = out, fnc_value = out) ctx.interact(tkey2, KeyEditor().edit_fnc, sink = out, fnc_value = out)
out.seek(0,0) out.seek(0, 0)
#print(out.read(), end = ' ') #print(out.read(), end = '\n\n')

#Test sending to a keyserver
buf = gpg.Data()
ctx.op_export(tkey2.fpr, gpg.constants.EXPORT_MODE_EXTERN, None)

# Test writing the pubkey out to a file
buf = gpg.Data()
ctx.op_export_keys([tkey2], 0, buf) # do i NEED to specify a mode?
buf.seek(0, 0)
with open('/tmp/pubkeytest.gpg', 'wb') as f:
f.write(buf.read())
#del(buf)
# Let's also test writing out the ascii-armored..
ctx.armor = True
#buf = gpg.Data()
buf.seek(0, 0)
ctx.op_export_keys([tkey2], 0, buf) # do i NEED to specify a mode?
buf.seek(0, 0)
#print(buf.read())
#buf.seek(0, 0)
with open('/tmp/pubkeytest.asc', 'wb') as f:
f.write(buf.read())
del(buf)

# And lastly, let's test msmtprc
def getCfg(fname):
cfg = {'default': None, 'defaults': {}}
_defaults = False
_acct = None
with open(fname, 'r') as f:
cfg_raw = f.read()
for l in cfg_raw.splitlines():
if re.match('^\s?(#.*|)$', l):
continue # skip over blank and commented lines
line = [i.strip() for i in re.split('\s+', l.strip(), maxsplit = 1)]
if line[0] == 'account':
if re.match('^default\s?:\s?', line[1]): # it's the default account specifier
cfg['default'] = line[1].split(':', maxsplit = 1)[1].strip()
else:
if line[1] not in cfg.keys(): # it's a new account definition
cfg[line[1]] = {}
_acct = line[1]
_defaults = False
elif line[0] == 'defaults': # it's the defaults
_acct = 'defaults'
else: # it's a config directive
cfg[_acct][line[0]] = line[1]
for a in list(cfg):
if a != 'default':
for k, v in cfg['defaults'].items():
if k not in cfg[a].keys():
cfg[a][k] = v
del(cfg['defaults'])
return(cfg)
homeconf = os.path.join(os.environ['HOME'], '.msmtprc')
sysconf = '/etc/msmtprc'
msmtp = {'path': None}
if not os.path.isfile(homeconf):
if not os.path.isfile(sysconf):
msmtp['conf'] = False
else:
msmtp['conf'] = sysconf
else:
msmtp['conf'] = homeconf
if os.path.isfile(msmtp['conf']):
path = os.environ['PATH']
for p in path.split(':'):
fullpath = os.path.join(p, 'msmtp')
if os.path.isfile(fullpath):
msmtp['path'] = fullpath
break # break out the first instance of it we find since the shell parses PATH first to last and so do we
if msmtp['path']:
# Okay. So we have a config file, which we're assuming to be set up correctly, and a path to a binary.
# Now we need to parse the config.
msmtp['cfg'] = getCfg(msmtp['conf'])
pprint.pprint(msmtp)
if msmtp['path']:
# Get the appropriate MSMTP profile
profile = msmtp['cfg']['default']
# Buuuut i use a different profile when i test, because i use msmtp for production-type stuff.
#if os.environ['USER'] == 'bts':
# profile = 'gmailtesting'
# Now we can try to send an email... yikes.
## First we set up the message templates.
body_in = {'plain': None, 'html': None}
body_in['plain'] = """Hello, person!

This is a test message.

Thanks."""
body_in['html'] = """\
<html>
<head></head>
<body>
<p><b>Hi there, person!</b> This is a test email.</p>
<p>It supports fun things like HTML.</p>
<p>--<br><a href='https://games.square-r00t.net/'>https://games.square-r00t.net</a><br>
Admin: <a href='mailto:bts@square-r00t.net'>r00t^2</a>
</p>
</body>
</html>"""
# Now, some attachments.
part = {}
ctx.armor = False
buf = gpg.Data()
ctx.op_export_keys([tkey2], 0, buf)
buf.seek(0, 0)
part['gpg'] = MIMEApplication(buf.read(), '{0}.gpg'.format(tkey2.fpr))
part['gpg']['Content-Disposition'] = 'attachment; filename="{0}.gpg"'.format(tkey2.fpr)
ctx.armor = True
buf.seek(0, 0)
ctx.op_export_keys([tkey2], 0, buf)
buf.seek(0, 0)
part['asc'] = MIMEApplication(buf.read(), '{0}.asc'.format(tkey2.fpr))
part['asc']['Content-Disposition'] = 'attachment; filename="{0}.asc"'.format(tkey2.fpr)
#msg = MIMEMultipart('alternative')
msg = MIMEMultipart()
msg['preamble'] = 'This is a multi-part message in MIME format.\n'
msg['From'] = msmtp['cfg'][profile]['from']
msg['To'] = msmtp['cfg'][profile]['from'] # to send to more than one: ', '.join(somelist)
msg['Date'] = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z')
msg['Subject'] = 'TEST EMAIL VIA TEST.PY'
msg['epilogue'] = ''
body = MIMEMultipart('alternative')
body.attach(MIMEText(body_in['plain'], 'plain'))
body.attach(MIMEText(body_in['html'], 'html'))
msg.attach(body)
for f in part.keys():
msg.attach(part[f])

# This needs way more parsing to support things like plain ol' port 25 plaintext (ugh), etc.
if 'tls-starttls' in msmtp['cfg'][profile].keys() and msmtp['cfg'][profile]['tls-starttls'] == 'on':
smtpserver = smtplib.SMTP(msmtp['cfg'][profile]['host'], int(msmtp['cfg'][profile]['port']))
smtpserver.ehlo()
smtpserver.starttls()
# we need to EHLO again after a STARTTLS because email is weird.
elif msmtp['cfg'][profile]['tls'] == 'on':
smtpserver = smtplib.SMTP_SSL(msmtp['cfg'][profile]['host'], int(msmtp['cfg'][profile]['port']))
smtpserver.ehlo()
smtpserver.login(msmtp['cfg'][profile]['user'], msmtp['cfg'][profile]['password'])
smtpserver.sendmail(msmtp['cfg'][profile]['user'], msg['To'], msg.as_string())
smtpserver.close()