summaryrefslogtreecommitdiff
path: root/sys/user_cull.py
blob: 464e7fdcf54ff4a76e2ea089e6a9865cd93cbf70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python3

# Because:
#   - SSH timeout doesn't work with byobu/screen/tmux
#   - SSH timeout can be overridden client-side
#   - $TMOUT can be overridden user-side
# we need to actually kill the sshd process attached to the SSH session.
# This does have some limitations, though. Namely, it doesn't work for screen/tmux sessions.
# specifically if they happen to be using byobu with a status bar that updates, because that process
# gathering information for the status bar counts as "activity". Go figure.

import datetime
import os
import psutil
import subprocess

# in seconds. 5 minutes = 300 seconds.
# if "auto", we'll try checking $TMOUT in the system bashrc and sshd_config, in that order.
timeout = 'auto'
# only apply to ssh connections instead of ssh + local.
# THIS WILL KILL SCREEN/TMUX CONNECTIONS. USE WITH CAUTION.
only_ssh = True
# send a closing message.
goodbye = True
# the message to send to the user if goodbye == True.
# can use the following for substitution:
#   pid          - The PID if the user's login process.
#   terminal     - The terminal they're logged in on.
#   loginlength  - How long they've been logged in (in minutes).
#   logintime    - When they logged in.
#   timeout      - The allowed length of time for inactivity until a timeout.
goodbye_mesg = ('You have been logged in for {loginlength} (since {logintime}) on {terminal} ({pid}).\n'
                'However, as per security policy, you have exceeded the allowed idle timeout ({timeout}).\n'
                'As such, your session will now be terminated. Please feel free to reconnect.')
# exclude these usernames
exclude_users = []


# Get the SSHD PIDs.
ssh_pids = [p.pid for p in psutil.process_iter() if p.name() == 'sshd']
# If the timeout is set to auto, try to find it.
if timeout == 'auto':
    import re
    #tmout_re = re.compile('^\s*#*(export\s*)?TMOUT=([0-9]+).*$')
    tmout_re = re.compile('^\s*(export\s*)?TMOUT=([0-9]+).*$')
    # We don't bother with factoring in ClientAliveCountMax.
    # sshd_re = re.compile('^\s*#*ClientAliveCountMax\s+([0-9+]).*$')
    sshd_re = re.compile('^\s*ClientAliveInterval\s+([0-9+]).*$')
    for f in ('/etc/bashrc', '/etc/bash.bashrc'):
        if not os.path.isfile(f):
            continue
        with open(f, 'r') as fh:
            conf = fh.read()
        for line in conf.splitlines():
            if tmout_re.search(line):
                try:
                    timeout = int(tmout_re.sub('\g<2>', line))
                    break
                except ValueError:
                    continue
    if not isinstance(timeout, int):  # keep going; check sshd_config
        with open('/etc/ssh/sshd_config', 'r') as f:
            conf = f.read()
        for line in conf.splitlines():
            if sshd_re.search(line):
                try:
                    timeout = int(tmout_re.sub('\g<1>', line))
                    break
                except ValueError:
                    continue
    # Finally, set a default. 5 minutes is sensible.
    timeout = 300
pretty_timeout = datetime.timedelta(seconds = timeout)

def get_idle(user):
    idle_time = None
    try:
        # https://unix.stackexchange.com/a/332704/284004
        last_used = datetime.datetime.fromtimestamp(os.stat('/dev/{0}'.format(user.terminal)).st_atime)
        idle_time = datetime.datetime.utcnow() - last_used
    except FileNotFoundError:
        # It's probably a graphical login (e.g. gnome uses ::1) - you're on your own.
        pass
    return(idle_time)


for user in psutil.users():
    if user.name in exclude_users:
        continue
    try:
        login_pid = user.pid
    except AttributeError:
        continue  # Doesn't have a PID
    login_length = (datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(user.started))
    if login_length.total_seconds() < timeout:
        continue  # they haven't even been logged in for long enough yet.
    idle_time = get_idle(user)
    parent_pid = psutil.Process(user.pid).ppid()
    try:
        diff = idle_time.total_seconds() >= timeout
    except AttributeError:
        # Something went wrong when getting idle_time. probably a graphical desktop login.
        diff = False
    if diff:
        fmt_vals = {'pid': user.pid,
                    'terminal': user.terminal,
                    'loginlength': login_length,
                    'logintime': datetime.datetime.fromtimestamp(user.started),
                    'timeout': pretty_timeout}
        fmtd_goodbye = goodbye_mesg.format(**fmt_vals)
        if only_ssh:
            if parent_pid in ssh_pids:
                if goodbye:
                    subprocess.run(['write',
                                    user.name,
                                    user.terminal],
                                   input = fmtd_goodbye.encode('utf-8'))
                psutil.Process(parent_pid).terminate()
        else:
            if goodbye:
                subprocess.run(['write',
                                user.name,
                                user.terminal],
                               input = fmtd_goodbye.encode('utf-8'))
            psutil.Process(parent_pid).terminate()