Firewall

Sparta 300 Firewall Shield

Sparta 300 Firewall Shield

While there are many ways to establish a firewall, I created the Sparta 300 Firewall Shield.

This is a Stan Switaj created program.

sparta_300_firewall_shield.py

pre-req for ssh logging, etc.

Plain Text
sudo apt update
sudo apt install rsyslog
Python 3
#!/usr/bin/python3

# sparta_300_firewall_shield.py  —  multiprocessing.Process + os.pipe() IPC
#
# ── Status / Feature Summary ─────────────────────────────────────────────────
#
#  Architecture  : Two daemon worker processes (Sniffer + LogTailer) feed IPs
#                  to the main process via os.pipe(). The main process owns all
#                  mutable state; no shared memory or locks needed.
#
#  Sniffer       : AF_PACKET / SOCK_DGRAM — captures inbound packets before
#                  iptables processes them. Link-layer header stripped
#                  automatically so no Ethernet/VLAN/loopback parsing is needed.
#                  Two sockets run in parallel via select():
#                    • ETH_P_IP   (0x0800) — IPv4 TCP SYN detection
#                    • ETH_P_IPV6 (0x86DD) — IPv6 TCP SYN detection
#                  Both IPv4 and IPv6 SSH connection attempts are therefore
#                  detected at the earliest possible moment.
#
#  Log tailer    : Prefers /var/log/auth.log (rsyslog). Auto-detects absence
#                  of auth.log and falls back to piping journalctl -fu sshd
#                  (systemd-only systems). Recognises both failure patterns
#                  (Invalid user, Failed password, Unable to negotiate, etc.)
#                  and accepted-login patterns (Accepted password/publickey/
#                  keyboard-interactive) for full IPv4 + IPv6 coverage.
#
#  Accepted logins: Every successful SSH login is timestamped and appended to
#                  ACCEPTED_LOG_FILE immediately; also shown in the periodic
#                  report (last 50 events). Accepted logins from ALLOWED_IPS
#                  are recorded but never banned.
#
#  Banning       : Port-scoped iptables/ip6tables rules — bans the offending
#                  IP on SSH_PORT only. An IP that legitimately uses port 80
#                  or any other service is unaffected. IPv4 uses iptables;
#                  IPv6 uses ip6tables. Reboot ban file contains matching
#                  port-scoped rules for persistence across reboots.
#                  All banning is suppressed in REVIEW_MODE.
#
#  REVIEW_MODE   : True  — observe and count only, no iptables changes.
#                  False — actively ban via iptables/ip6tables.
#
#  Log rotation  : Truncates any file in LOG_FILES_TO_TRUNCATE that exceeds
#                  MAX_LOG_SIZE_BYTES (default 50 MB) to prevent disk fill.
#
# Run as root: sudo python3 ssh_block_monitor_v5.2.py

import os
import re
import sys
import time
import socket
import struct
import datetime
import subprocess
import multiprocessing
import select as _select

# ══════════════════════════════════════════════════════════════════════════════
#  CONFIGURATION  — edit the values in this section
# ══════════════════════════════════════════════════════════════════════════════

SSH_PORT = 22

ALLOWED_IPS = [
    '192.168.1.10', '192.168.1.11'
]

REVIEW_MODE = False

REPORT_INTERVAL_OPTIONS = [15, 30, 60, 120, 300]
REPORT_INTERVAL         = 15

REPORT_FILE       = '/home/username/Desktop/ssh_block_report-22.txt'
ACCEPTED_LOG_FILE = '/home/username/Desktop/ssh_accepted_logins-22.txt'
REBOOT_BAN_FILE   = '/home/username/Desktop/ssh_reboot_bans-22.sh'

LOG_FILES_TO_MONITOR  = ['/var/log/auth.log']
LOG_FILES_TO_TRUNCATE = ['/var/log/auth.log', '/var/log/kern.log']

MAX_LOG_SIZE_BYTES = 50 * 1024 * 1024

# ══════════════════════════════════════════════════════════════════════════════
#  PIPE MESSAGE PROTOCOL
#
#  Workers write newline-terminated strings to their pipe write-end:
#    A:<ip>                     — intrusion attempt
#    L:<method>:<username>:<ip> — accepted login
# ══════════════════════════════════════════════════════════════════════════════

_IP4_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
_IP6_RE = re.compile(r'^[0-9a-fA-F:]{2,39}$')

def _valid_ip(ip):
    if _IP4_RE.match(ip):
        return all(0 <= int(p) <= 255 for p in ip.split('.'))
    return bool(_IP6_RE.match(ip))

# ══════════════════════════════════════════════════════════════════════════════
#  AUTH LOG PATTERNS
# ══════════════════════════════════════════════════════════════════════════════

_ATTEMPT_PATTERNS = [
    re.compile(r'Invalid user \S+ from ([\d\.:a-fA-F]+)'),
    re.compile(r'Failed \S+ for .+? from ([\d\.:a-fA-F]+)'),
    re.compile(r'Connection closed by (?:invalid user \S+ )?([\d\.:a-fA-F]+)'),
    re.compile(r'Received disconnect from ([\d\.:a-fA-F]+)'),
    re.compile(r'Did not receive identification string from ([\d\.:a-fA-F]+)'),
    re.compile(r'Unable to negotiate with ([\d\.:a-fA-F]+)'),
    re.compile(r'Disconnected from (?:invalid user \S+ )?([\d\.:a-fA-F]+)'),
]

# Matches: Accepted password for user from 1.2.3.4
#          Accepted publickey for user from ::1
#          Accepted keyboard-interactive/pam for user from 2001:db8::1
_ACCEPTED_RE = re.compile(
    r'Accepted (\S+) for (\S+) from ([\d\.:a-fA-F]+)'
)

# ══════════════════════════════════════════════════════════════════════════════
#  WORKER: RAW SOCKET SNIFFER
#
#  Uses AF_PACKET / SOCK_DGRAM which:
#    • Operates before iptables (same level as AF_PACKET / SOCK_RAW)
#    • Strips the link-layer header automatically — no Ethernet parsing,
#      no loopback breakage, no VLAN issues
#    • pkt[0] is the first byte of the IP header (IPv4) or IPv6 header
#
#  Two sockets are opened: one for ETH_P_IP (IPv4) and one for ETH_P_IPV6,
#  so SSH connections over either protocol are detected.
#
#  Writes "A:<ip>\n" for every inbound TCP SYN → SSH_PORT.
# ══════════════════════════════════════════════════════════════════════════════

ETH_P_IP   = 0x0800
ETH_P_IPV6 = 0x86DD


def _open_packet_socket(ethertype):
    """Open an AF_PACKET / SOCK_DGRAM socket for the given EtherType."""
    return socket.socket(
        socket.AF_PACKET,
        socket.SOCK_DGRAM,
        socket.htons(ethertype),
    )


def _check_ipv4_syn(pkt):
    """Return src_ip string if pkt is a TCP SYN → SSH_PORT, else None."""
    if len(pkt) < 20:
        return None
    ihl      = (pkt[0] & 0x0F) * 4
    protocol = pkt[9]
    if protocol != 6:
        return None
    if len(pkt) < ihl + 14:
        return None
    dst_port = struct.unpack('!H', pkt[ihl + 2:ihl + 4])[0]
    if dst_port != SSH_PORT:
        return None
    flags = pkt[ihl + 13]
    if (flags & 0x02) and not (flags & 0x10):   # SYN not SYN-ACK
        return socket.inet_ntoa(pkt[12:16])
    return None


def _check_ipv6_syn(pkt):
    """Return src_ip string if pkt is a TCP SYN → SSH_PORT, else None."""
    # Fixed 40-byte IPv6 header
    if len(pkt) < 40:
        return None
    next_header = pkt[6]
    if next_header != 6:    # TCP only (no extension header walk)
        return None
    if len(pkt) < 40 + 14:
        return None
    dst_port = struct.unpack('!H', pkt[40 + 2:40 + 4])[0]
    if dst_port != SSH_PORT:
        return None
    flags = pkt[40 + 13]
    if (flags & 0x02) and not (flags & 0x10):
        return socket.inet_ntop(socket.AF_INET6, pkt[8:24])
    return None


def sniffer_process(pipe_w):
    sockets = []

    for ethertype, label in [(ETH_P_IP, 'IPv4'), (ETH_P_IPV6, 'IPv6')]:
        try:
            sockets.append((_open_packet_socket(ethertype), ethertype, label))
        except PermissionError:
            os.write(pipe_w, f'[SNIFFER] Permission denied opening {label} socket.\n'.encode())
        except Exception as exc:
            os.write(pipe_w, f'[SNIFFER] {label} socket error: {exc}\n'.encode())

    if not sockets:
        os.close(pipe_w)
        return

    fds = [s for s, _, _ in sockets]

    while True:
        try:
            readable, _, _ = _select.select(fds, [], [], 1.0)
            for raw in readable:
                try:
                    pkt = raw.recv(65535)
                except Exception:
                    continue

                # Identify which socket returned data
                for s, ethertype, _ in sockets:
                    if s is raw:
                        if ethertype == ETH_P_IP:
                            ip = _check_ipv4_syn(pkt)
                        else:
                            ip = _check_ipv6_syn(pkt)
                        if ip:
                            os.write(pipe_w, f'A:{ip}\n'.encode())
                        break
        except Exception:
            continue

# ══════════════════════════════════════════════════════════════════════════════
#  WORKER: AUTH LOG TAILER
#
#  Prefers /var/log/auth.log (rsyslog).  If auth.log does not exist, falls
#  back to piping `journalctl -fu sshd` (systemd-only systems without rsyslog).
#
#  Install rsyslog to use the file-based path:
#    sudo apt install rsyslog && sudo systemctl enable --now rsyslog
#
#  Writes "A:<ip>\n" for intrusion attempts.
#  Writes "L:<method>:<user>:<ip>\n" for accepted logins.
# ══════════════════════════════════════════════════════════════════════════════

def _process_log_line(line, pipe_w):
    """Parse one sshd log line and write to pipe if relevant."""
    m = _ACCEPTED_RE.search(line)
    if m:
        method, user, ip = m.group(1), m.group(2), m.group(3)
        os.write(pipe_w, f'L:{method}:{user}:{ip}\n'.encode())
        return
    for pat in _ATTEMPT_PATTERNS:
        m = pat.search(line)
        if m:
            os.write(pipe_w, f'A:{m.group(1)}\n'.encode())
            return


def _tail_auth_log(pipe_w):
    """File-based mode: parse auth.log from top then tail continuously."""
    log_positions = {}

    for path in LOG_FILES_TO_MONITOR:
        if not os.path.exists(path):
            log_positions[path] = 0
            continue
        try:
            with open(path, 'r', errors='replace') as fh:
                for line in fh:
                    _process_log_line(line, pipe_w)
                log_positions[path] = fh.tell()
        except Exception:
            log_positions[path] = 0

    while True:
        for path in LOG_FILES_TO_MONITOR:
            try:
                if not os.path.exists(path):
                    continue
                pos = log_positions.get(path, 0)
                if os.path.getsize(path) < pos:
                    pos = 0
                with open(path, 'r', errors='replace') as fh:
                    fh.seek(pos)
                    while True:
                        line = fh.readline()
                        if not line:
                            log_positions[path] = fh.tell()
                            break
                        _process_log_line(line, pipe_w)
            except Exception:
                pass
        time.sleep(0.5)


def _tail_journalctl(pipe_w):
    """Journald fallback: pipe journalctl -fu sshd for real-time SSH events."""
    # Try both 'sshd' and 'ssh' unit names
    for unit in ('sshd', 'ssh'):
        try:
            proc = subprocess.Popen(
                ['journalctl', '-fu', unit, '--output=cat', '--no-pager'],
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
            )
            os.write(pipe_w, f'[LOG TAILER] Using journalctl unit={unit}\n'.encode())
            for raw_line in proc.stdout:
                line = raw_line.decode(errors='replace')
                _process_log_line(line, pipe_w)
            proc.wait()
        except FileNotFoundError:
            os.write(pipe_w, b'[LOG TAILER] journalctl not found.\n')
            return
        except Exception as exc:
            os.write(pipe_w, f'[LOG TAILER] journalctl error: {exc}\n'.encode())
            time.sleep(2)


def log_tailer_process(pipe_w):
    auth_log_present = any(os.path.exists(p) for p in LOG_FILES_TO_MONITOR)

    if auth_log_present:
        os.write(pipe_w, b'[LOG TAILER] Using auth.log\n')
        _tail_auth_log(pipe_w)
    else:
        os.write(pipe_w, b'[LOG TAILER] auth.log not found - falling back to journalctl\n')
        _tail_journalctl(pipe_w)

# ══════════════════════════════════════════════════════════════════════════════
#  PARENT STATE  (owned exclusively by the main process)
# ══════════════════════════════════════════════════════════════════════════════

blocked_ips      = {}    # {ip: attempt_count}
banned_ips       = set()
reboot_ips_saved = set()
accepted_logins  = []    # [{'dt', 'ip', 'user', 'method'}]


def _record_attempt(ip):
    if not _valid_ip(ip) or ip in ALLOWED_IPS:
        return
    blocked_ips[ip] = blocked_ips.get(ip, 0) + 1
    _ban(ip)


def _record_accepted(method, user, ip):
    if not _valid_ip(ip):
        return
    entry = {
        'dt':     datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'ip':     ip,
        'user':   user,
        'method': method,
    }
    accepted_logins.append(entry)
    line = f"[{entry['dt']}]  {ip:<40}  user={user:<20}  method={method}\n"
    print(f'[ACCEPTED LOGIN]  {line.strip()}')
    with open(ACCEPTED_LOG_FILE, 'a') as fh:
        fh.write(line)


def _ban(ip):
    if REVIEW_MODE or ip in banned_ips:
        return
    banned_ips.add(ip)

    # Port-scoped ban: blocks this IP on SSH_PORT only.
    # An IP that is a legitimate port 80 / other-service client is unaffected.
    # IPv6 addresses contain ':' and require ip6tables; IPv4 uses iptables.
    cmd = (
        f'ip6tables -I INPUT -p tcp --dport {SSH_PORT} -s {ip} -j DROP'
        if ':' in ip else
        f'iptables  -I INPUT -p tcp --dport {SSH_PORT} -s {ip} -j DROP'
    )
    os.system(cmd)
    print(f'[BANNED]  {ip}  (port {SSH_PORT})')

    if ip not in reboot_ips_saved:
        reboot_ips_saved.add(ip)
        with open(REBOOT_BAN_FILE, 'a') as fh:
            fh.write(f'{cmd}\n')


def _write_report():
    now        = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    mode_label = 'REVIEW MODE  (observe only — no iptables)' if REVIEW_MODE \
                 else 'ACTIVE MODE  (iptables banning enabled)'
    lines = [
        '=' * 64,
        '  SSH Block Monitor Report',
        f'  Generated : {now}',
        f'  Mode      : {mode_label}',
        f'  Port      : {SSH_PORT}',
        '=' * 64,
        f'  INTRUSION ATTEMPTS  ({len(blocked_ips)} unique IPs)',
        f'  {"IP Address":<42}  {"Attempts":>8}',
        '-' * 64,
    ]
    for ip, count in sorted(blocked_ips.items(), key=lambda x: x[1], reverse=True):
        lines.append(f'  {ip:<42}  {1 if not REVIEW_MODE else count:>8}')
    if not blocked_ips:
        lines.append('  (none)')

    lines += [
        '-' * 64,
        f'  ACCEPTED LOGINS  ({len(accepted_logins)} events)',
        f'  {"Datetime":<20}  {"IP Address":<28}  {"User":<16}  Method',
        '-' * 64,
    ]
    for e in accepted_logins[-50:]:
        lines.append(
            f'  {e["dt"]:<20}  {e["ip"]:<28}  {e["user"]:<16}  {e["method"]}'
        )
    if not accepted_logins:
        lines.append('  (none)')

    lines += [
        '-' * 64,
        f'  Report interval: {REPORT_INTERVAL}s  |  Options: {REPORT_INTERVAL_OPTIONS}',
        f'  Full accepted log: {ACCEPTED_LOG_FILE}',
        '=' * 64,
    ]
    with open(REPORT_FILE, 'w') as fh:
        fh.write('\n'.join(lines) + '\n')


def _check_truncate_logs():
    for path in LOG_FILES_TO_TRUNCATE:
        if not os.path.exists(path):
            continue
        size = os.path.getsize(path)
        if size > MAX_LOG_SIZE_BYTES:
            print(f'[TRUNCATE] {path} is {size // (1024*1024)} MB — truncating to 0.')
            os.system(f'sudo truncate -s 0 {path}')

# ══════════════════════════════════════════════════════════════════════════════
#  MAIN
# ══════════════════════════════════════════════════════════════════════════════

def main():
    if REPORT_INTERVAL not in REPORT_INTERVAL_OPTIONS:
        sys.exit(f'[ERROR] REPORT_INTERVAL must be one of {REPORT_INTERVAL_OPTIONS}')
    if os.geteuid() != 0:
        print('[WARN] Not running as root — raw socket and iptables will fail.')

    for path, header in [
        (REBOOT_BAN_FILE,
         '#!/bin/bash\n'
         '# SSH reboot ban list — generated by ssh_block_monitor_v5.2.py\n'
         '# Rules are port-scoped: blocks SSH_PORT only, not all traffic.\n'
         f'# Apply after reboot with:  sudo bash {REBOOT_BAN_FILE}\n'
         '# ──────────────────────────────────────────────────────────\n'),
        (ACCEPTED_LOG_FILE,
         '# SSH accepted logins — generated by ssh_block_monitor_v5.2.py\n'),
    ]:
        if not os.path.exists(path):
            with open(path, 'w') as fh:
                fh.write(header)

    auth_log_present = any(os.path.exists(p) for p in LOG_FILES_TO_MONITOR)
    log_source = 'auth.log' if auth_log_present else 'journalctl (rsyslog not installed)'

    print('=' * 64)
    print('  SSH Block Monitor v5.2')
    print(f'  Port            : {SSH_PORT}')
    print(f'  Mode            : {"REVIEW (no banning)" if REVIEW_MODE else "ACTIVE (iptables)"}')
    print(f'  Log source      : {log_source}')
    print(f'  Report interval : {REPORT_INTERVAL}s  →  {REPORT_FILE}')
    print(f'  Accepted log    : {ACCEPTED_LOG_FILE}')
    print('=' * 64)

    if not auth_log_present:
        print('[WARN] /var/log/auth.log not found.')
        print('[WARN] To enable file-based logging: sudo apt install rsyslog')
        print('[WARN]   && sudo systemctl enable --now rsyslog')
        print('[INFO] Falling back to journalctl for this session.')

    sniffer_r, sniffer_w = os.pipe()
    tailer_r,  tailer_w  = os.pipe()

    workers = [
        multiprocessing.Process(target=sniffer_process,    args=(sniffer_w,), name='Sniffer',   daemon=True),
        multiprocessing.Process(target=log_tailer_process, args=(tailer_w,),  name='LogTailer', daemon=True),
    ]
    for w in workers:
        w.start()

    os.close(sniffer_w)
    os.close(tailer_w)

    print(f'[PARENT] Sniffer PID={workers[0].pid}  LogTailer PID={workers[1].pid}')

    read_fds    = [sniffer_r, tailer_r]
    buffers     = {sniffer_r: b'', tailer_r: b''}
    last_report = time.monotonic()

    try:
        while True:
            now     = time.monotonic()
            timeout = max(0.1, REPORT_INTERVAL - (now - last_report))

            readable, _, _ = _select.select(read_fds, [], [], timeout)

            for fd in readable:
                try:
                    chunk = os.read(fd, 4096)
                except Exception:
                    continue
                if not chunk:
                    read_fds.remove(fd)
                    continue
                buffers[fd] += chunk
                while b'\n' in buffers[fd]:
                    line, buffers[fd] = buffers[fd].split(b'\n', 1)
                    msg = line.decode(errors='replace').strip()
                    if not msg:
                        continue
                    if msg.startswith('A:'):
                        _record_attempt(msg[2:])
                    elif msg.startswith('L:'):
                        parts = msg[2:].split(':', 2)
                        if len(parts) == 3:
                            _record_accepted(parts[0], parts[1], parts[2])
                    else:
                        # Diagnostic messages from workers — print as-is
                        print(msg)

            if time.monotonic() - last_report >= REPORT_INTERVAL:
                try:
                    _write_report()
                    _check_truncate_logs()
                except Exception as exc:
                    print(f'[REPORT] Error: {exc}')
                last_report = time.monotonic()

    except KeyboardInterrupt:
        print('\n[STOP] Writing final report...')
        _write_report()
        print(f'[STOP] Report        → {REPORT_FILE}')
        print(f'[STOP] Accepted log  → {ACCEPTED_LOG_FILE}')
        print(f'[STOP] Reboot bans   → {REBOOT_BAN_FILE}')
        print(f'[STOP] Intrusion IPs : {len(blocked_ips)}')
        print(f'[STOP] Accepted logins: {len(accepted_logins)}')
    finally:
        for w in workers:
            w.terminate()
            w.join(timeout=2)
        for fd in (sniffer_r, tailer_r):
            try:
                os.close(fd)
            except Exception:
                pass


if __name__ == '__main__':
    main()

Now you know. 🙂

Enjoy.

0 Comments

No comments yet. Be the first!

Leave a Comment