Sparta 300 Firewall Shield
While there are many ways to establish a firewall, I created the Sparta 300 Firewall Shield.
This is a Stan Switaj created program.
sparta_300_firewall_shield.py
pre-req for ssh logging, etc.
sudo apt update
sudo apt install rsyslog#!/usr/bin/python3
# sparta_300_firewall_shield.py — multiprocessing.Process + os.pipe() IPC
#
# ── Status / Feature Summary ─────────────────────────────────────────────────
#
# Architecture : Two daemon worker processes (Sniffer + LogTailer) feed IPs
# to the main process via os.pipe(). The main process owns all
# mutable state; no shared memory or locks needed.
#
# Sniffer : AF_PACKET / SOCK_DGRAM — captures inbound packets before
# iptables processes them. Link-layer header stripped
# automatically so no Ethernet/VLAN/loopback parsing is needed.
# Two sockets run in parallel via select():
# • ETH_P_IP (0x0800) — IPv4 TCP SYN detection
# • ETH_P_IPV6 (0x86DD) — IPv6 TCP SYN detection
# Both IPv4 and IPv6 SSH connection attempts are therefore
# detected at the earliest possible moment.
#
# Log tailer : Prefers /var/log/auth.log (rsyslog). Auto-detects absence
# of auth.log and falls back to piping journalctl -fu sshd
# (systemd-only systems). Recognises both failure patterns
# (Invalid user, Failed password, Unable to negotiate, etc.)
# and accepted-login patterns (Accepted password/publickey/
# keyboard-interactive) for full IPv4 + IPv6 coverage.
#
# Accepted logins: Every successful SSH login is timestamped and appended to
# ACCEPTED_LOG_FILE immediately; also shown in the periodic
# report (last 50 events). Accepted logins from ALLOWED_IPS
# are recorded but never banned.
#
# Banning : Port-scoped iptables/ip6tables rules — bans the offending
# IP on SSH_PORT only. An IP that legitimately uses port 80
# or any other service is unaffected. IPv4 uses iptables;
# IPv6 uses ip6tables. Reboot ban file contains matching
# port-scoped rules for persistence across reboots.
# All banning is suppressed in REVIEW_MODE.
#
# REVIEW_MODE : True — observe and count only, no iptables changes.
# False — actively ban via iptables/ip6tables.
#
# Log rotation : Truncates any file in LOG_FILES_TO_TRUNCATE that exceeds
# MAX_LOG_SIZE_BYTES (default 50 MB) to prevent disk fill.
#
# Run as root: sudo python3 ssh_block_monitor_v5.2.py
import os
import re
import sys
import time
import socket
import struct
import datetime
import subprocess
import multiprocessing
import select as _select
# ══════════════════════════════════════════════════════════════════════════════
# CONFIGURATION — edit the values in this section
# ══════════════════════════════════════════════════════════════════════════════
SSH_PORT = 22
ALLOWED_IPS = [
'192.168.1.10', '192.168.1.11'
]
REVIEW_MODE = False
REPORT_INTERVAL_OPTIONS = [15, 30, 60, 120, 300]
REPORT_INTERVAL = 15
REPORT_FILE = '/home/username/Desktop/ssh_block_report-22.txt'
ACCEPTED_LOG_FILE = '/home/username/Desktop/ssh_accepted_logins-22.txt'
REBOOT_BAN_FILE = '/home/username/Desktop/ssh_reboot_bans-22.sh'
LOG_FILES_TO_MONITOR = ['/var/log/auth.log']
LOG_FILES_TO_TRUNCATE = ['/var/log/auth.log', '/var/log/kern.log']
MAX_LOG_SIZE_BYTES = 50 * 1024 * 1024
# ══════════════════════════════════════════════════════════════════════════════
# PIPE MESSAGE PROTOCOL
#
# Workers write newline-terminated strings to their pipe write-end:
# A:<ip> — intrusion attempt
# L:<method>:<username>:<ip> — accepted login
# ══════════════════════════════════════════════════════════════════════════════
_IP4_RE = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
_IP6_RE = re.compile(r'^[0-9a-fA-F:]{2,39}$')
def _valid_ip(ip):
if _IP4_RE.match(ip):
return all(0 <= int(p) <= 255 for p in ip.split('.'))
return bool(_IP6_RE.match(ip))
# ══════════════════════════════════════════════════════════════════════════════
# AUTH LOG PATTERNS
# ══════════════════════════════════════════════════════════════════════════════
_ATTEMPT_PATTERNS = [
re.compile(r'Invalid user \S+ from ([\d\.:a-fA-F]+)'),
re.compile(r'Failed \S+ for .+? from ([\d\.:a-fA-F]+)'),
re.compile(r'Connection closed by (?:invalid user \S+ )?([\d\.:a-fA-F]+)'),
re.compile(r'Received disconnect from ([\d\.:a-fA-F]+)'),
re.compile(r'Did not receive identification string from ([\d\.:a-fA-F]+)'),
re.compile(r'Unable to negotiate with ([\d\.:a-fA-F]+)'),
re.compile(r'Disconnected from (?:invalid user \S+ )?([\d\.:a-fA-F]+)'),
]
# Matches: Accepted password for user from 1.2.3.4
# Accepted publickey for user from ::1
# Accepted keyboard-interactive/pam for user from 2001:db8::1
_ACCEPTED_RE = re.compile(
r'Accepted (\S+) for (\S+) from ([\d\.:a-fA-F]+)'
)
# ══════════════════════════════════════════════════════════════════════════════
# WORKER: RAW SOCKET SNIFFER
#
# Uses AF_PACKET / SOCK_DGRAM which:
# • Operates before iptables (same level as AF_PACKET / SOCK_RAW)
# • Strips the link-layer header automatically — no Ethernet parsing,
# no loopback breakage, no VLAN issues
# • pkt[0] is the first byte of the IP header (IPv4) or IPv6 header
#
# Two sockets are opened: one for ETH_P_IP (IPv4) and one for ETH_P_IPV6,
# so SSH connections over either protocol are detected.
#
# Writes "A:<ip>\n" for every inbound TCP SYN → SSH_PORT.
# ══════════════════════════════════════════════════════════════════════════════
ETH_P_IP = 0x0800
ETH_P_IPV6 = 0x86DD
def _open_packet_socket(ethertype):
"""Open an AF_PACKET / SOCK_DGRAM socket for the given EtherType."""
return socket.socket(
socket.AF_PACKET,
socket.SOCK_DGRAM,
socket.htons(ethertype),
)
def _check_ipv4_syn(pkt):
"""Return src_ip string if pkt is a TCP SYN → SSH_PORT, else None."""
if len(pkt) < 20:
return None
ihl = (pkt[0] & 0x0F) * 4
protocol = pkt[9]
if protocol != 6:
return None
if len(pkt) < ihl + 14:
return None
dst_port = struct.unpack('!H', pkt[ihl + 2:ihl + 4])[0]
if dst_port != SSH_PORT:
return None
flags = pkt[ihl + 13]
if (flags & 0x02) and not (flags & 0x10): # SYN not SYN-ACK
return socket.inet_ntoa(pkt[12:16])
return None
def _check_ipv6_syn(pkt):
"""Return src_ip string if pkt is a TCP SYN → SSH_PORT, else None."""
# Fixed 40-byte IPv6 header
if len(pkt) < 40:
return None
next_header = pkt[6]
if next_header != 6: # TCP only (no extension header walk)
return None
if len(pkt) < 40 + 14:
return None
dst_port = struct.unpack('!H', pkt[40 + 2:40 + 4])[0]
if dst_port != SSH_PORT:
return None
flags = pkt[40 + 13]
if (flags & 0x02) and not (flags & 0x10):
return socket.inet_ntop(socket.AF_INET6, pkt[8:24])
return None
def sniffer_process(pipe_w):
sockets = []
for ethertype, label in [(ETH_P_IP, 'IPv4'), (ETH_P_IPV6, 'IPv6')]:
try:
sockets.append((_open_packet_socket(ethertype), ethertype, label))
except PermissionError:
os.write(pipe_w, f'[SNIFFER] Permission denied opening {label} socket.\n'.encode())
except Exception as exc:
os.write(pipe_w, f'[SNIFFER] {label} socket error: {exc}\n'.encode())
if not sockets:
os.close(pipe_w)
return
fds = [s for s, _, _ in sockets]
while True:
try:
readable, _, _ = _select.select(fds, [], [], 1.0)
for raw in readable:
try:
pkt = raw.recv(65535)
except Exception:
continue
# Identify which socket returned data
for s, ethertype, _ in sockets:
if s is raw:
if ethertype == ETH_P_IP:
ip = _check_ipv4_syn(pkt)
else:
ip = _check_ipv6_syn(pkt)
if ip:
os.write(pipe_w, f'A:{ip}\n'.encode())
break
except Exception:
continue
# ══════════════════════════════════════════════════════════════════════════════
# WORKER: AUTH LOG TAILER
#
# Prefers /var/log/auth.log (rsyslog). If auth.log does not exist, falls
# back to piping `journalctl -fu sshd` (systemd-only systems without rsyslog).
#
# Install rsyslog to use the file-based path:
# sudo apt install rsyslog && sudo systemctl enable --now rsyslog
#
# Writes "A:<ip>\n" for intrusion attempts.
# Writes "L:<method>:<user>:<ip>\n" for accepted logins.
# ══════════════════════════════════════════════════════════════════════════════
def _process_log_line(line, pipe_w):
"""Parse one sshd log line and write to pipe if relevant."""
m = _ACCEPTED_RE.search(line)
if m:
method, user, ip = m.group(1), m.group(2), m.group(3)
os.write(pipe_w, f'L:{method}:{user}:{ip}\n'.encode())
return
for pat in _ATTEMPT_PATTERNS:
m = pat.search(line)
if m:
os.write(pipe_w, f'A:{m.group(1)}\n'.encode())
return
def _tail_auth_log(pipe_w):
"""File-based mode: parse auth.log from top then tail continuously."""
log_positions = {}
for path in LOG_FILES_TO_MONITOR:
if not os.path.exists(path):
log_positions[path] = 0
continue
try:
with open(path, 'r', errors='replace') as fh:
for line in fh:
_process_log_line(line, pipe_w)
log_positions[path] = fh.tell()
except Exception:
log_positions[path] = 0
while True:
for path in LOG_FILES_TO_MONITOR:
try:
if not os.path.exists(path):
continue
pos = log_positions.get(path, 0)
if os.path.getsize(path) < pos:
pos = 0
with open(path, 'r', errors='replace') as fh:
fh.seek(pos)
while True:
line = fh.readline()
if not line:
log_positions[path] = fh.tell()
break
_process_log_line(line, pipe_w)
except Exception:
pass
time.sleep(0.5)
def _tail_journalctl(pipe_w):
"""Journald fallback: pipe journalctl -fu sshd for real-time SSH events."""
# Try both 'sshd' and 'ssh' unit names
for unit in ('sshd', 'ssh'):
try:
proc = subprocess.Popen(
['journalctl', '-fu', unit, '--output=cat', '--no-pager'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
os.write(pipe_w, f'[LOG TAILER] Using journalctl unit={unit}\n'.encode())
for raw_line in proc.stdout:
line = raw_line.decode(errors='replace')
_process_log_line(line, pipe_w)
proc.wait()
except FileNotFoundError:
os.write(pipe_w, b'[LOG TAILER] journalctl not found.\n')
return
except Exception as exc:
os.write(pipe_w, f'[LOG TAILER] journalctl error: {exc}\n'.encode())
time.sleep(2)
def log_tailer_process(pipe_w):
auth_log_present = any(os.path.exists(p) for p in LOG_FILES_TO_MONITOR)
if auth_log_present:
os.write(pipe_w, b'[LOG TAILER] Using auth.log\n')
_tail_auth_log(pipe_w)
else:
os.write(pipe_w, b'[LOG TAILER] auth.log not found - falling back to journalctl\n')
_tail_journalctl(pipe_w)
# ══════════════════════════════════════════════════════════════════════════════
# PARENT STATE (owned exclusively by the main process)
# ══════════════════════════════════════════════════════════════════════════════
blocked_ips = {} # {ip: attempt_count}
banned_ips = set()
reboot_ips_saved = set()
accepted_logins = [] # [{'dt', 'ip', 'user', 'method'}]
def _record_attempt(ip):
if not _valid_ip(ip) or ip in ALLOWED_IPS:
return
blocked_ips[ip] = blocked_ips.get(ip, 0) + 1
_ban(ip)
def _record_accepted(method, user, ip):
if not _valid_ip(ip):
return
entry = {
'dt': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'ip': ip,
'user': user,
'method': method,
}
accepted_logins.append(entry)
line = f"[{entry['dt']}] {ip:<40} user={user:<20} method={method}\n"
print(f'[ACCEPTED LOGIN] {line.strip()}')
with open(ACCEPTED_LOG_FILE, 'a') as fh:
fh.write(line)
def _ban(ip):
if REVIEW_MODE or ip in banned_ips:
return
banned_ips.add(ip)
# Port-scoped ban: blocks this IP on SSH_PORT only.
# An IP that is a legitimate port 80 / other-service client is unaffected.
# IPv6 addresses contain ':' and require ip6tables; IPv4 uses iptables.
cmd = (
f'ip6tables -I INPUT -p tcp --dport {SSH_PORT} -s {ip} -j DROP'
if ':' in ip else
f'iptables -I INPUT -p tcp --dport {SSH_PORT} -s {ip} -j DROP'
)
os.system(cmd)
print(f'[BANNED] {ip} (port {SSH_PORT})')
if ip not in reboot_ips_saved:
reboot_ips_saved.add(ip)
with open(REBOOT_BAN_FILE, 'a') as fh:
fh.write(f'{cmd}\n')
def _write_report():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
mode_label = 'REVIEW MODE (observe only — no iptables)' if REVIEW_MODE \
else 'ACTIVE MODE (iptables banning enabled)'
lines = [
'=' * 64,
' SSH Block Monitor Report',
f' Generated : {now}',
f' Mode : {mode_label}',
f' Port : {SSH_PORT}',
'=' * 64,
f' INTRUSION ATTEMPTS ({len(blocked_ips)} unique IPs)',
f' {"IP Address":<42} {"Attempts":>8}',
'-' * 64,
]
for ip, count in sorted(blocked_ips.items(), key=lambda x: x[1], reverse=True):
lines.append(f' {ip:<42} {1 if not REVIEW_MODE else count:>8}')
if not blocked_ips:
lines.append(' (none)')
lines += [
'-' * 64,
f' ACCEPTED LOGINS ({len(accepted_logins)} events)',
f' {"Datetime":<20} {"IP Address":<28} {"User":<16} Method',
'-' * 64,
]
for e in accepted_logins[-50:]:
lines.append(
f' {e["dt"]:<20} {e["ip"]:<28} {e["user"]:<16} {e["method"]}'
)
if not accepted_logins:
lines.append(' (none)')
lines += [
'-' * 64,
f' Report interval: {REPORT_INTERVAL}s | Options: {REPORT_INTERVAL_OPTIONS}',
f' Full accepted log: {ACCEPTED_LOG_FILE}',
'=' * 64,
]
with open(REPORT_FILE, 'w') as fh:
fh.write('\n'.join(lines) + '\n')
def _check_truncate_logs():
for path in LOG_FILES_TO_TRUNCATE:
if not os.path.exists(path):
continue
size = os.path.getsize(path)
if size > MAX_LOG_SIZE_BYTES:
print(f'[TRUNCATE] {path} is {size // (1024*1024)} MB — truncating to 0.')
os.system(f'sudo truncate -s 0 {path}')
# ══════════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════════
def main():
if REPORT_INTERVAL not in REPORT_INTERVAL_OPTIONS:
sys.exit(f'[ERROR] REPORT_INTERVAL must be one of {REPORT_INTERVAL_OPTIONS}')
if os.geteuid() != 0:
print('[WARN] Not running as root — raw socket and iptables will fail.')
for path, header in [
(REBOOT_BAN_FILE,
'#!/bin/bash\n'
'# SSH reboot ban list — generated by ssh_block_monitor_v5.2.py\n'
'# Rules are port-scoped: blocks SSH_PORT only, not all traffic.\n'
f'# Apply after reboot with: sudo bash {REBOOT_BAN_FILE}\n'
'# ──────────────────────────────────────────────────────────\n'),
(ACCEPTED_LOG_FILE,
'# SSH accepted logins — generated by ssh_block_monitor_v5.2.py\n'),
]:
if not os.path.exists(path):
with open(path, 'w') as fh:
fh.write(header)
auth_log_present = any(os.path.exists(p) for p in LOG_FILES_TO_MONITOR)
log_source = 'auth.log' if auth_log_present else 'journalctl (rsyslog not installed)'
print('=' * 64)
print(' SSH Block Monitor v5.2')
print(f' Port : {SSH_PORT}')
print(f' Mode : {"REVIEW (no banning)" if REVIEW_MODE else "ACTIVE (iptables)"}')
print(f' Log source : {log_source}')
print(f' Report interval : {REPORT_INTERVAL}s → {REPORT_FILE}')
print(f' Accepted log : {ACCEPTED_LOG_FILE}')
print('=' * 64)
if not auth_log_present:
print('[WARN] /var/log/auth.log not found.')
print('[WARN] To enable file-based logging: sudo apt install rsyslog')
print('[WARN] && sudo systemctl enable --now rsyslog')
print('[INFO] Falling back to journalctl for this session.')
sniffer_r, sniffer_w = os.pipe()
tailer_r, tailer_w = os.pipe()
workers = [
multiprocessing.Process(target=sniffer_process, args=(sniffer_w,), name='Sniffer', daemon=True),
multiprocessing.Process(target=log_tailer_process, args=(tailer_w,), name='LogTailer', daemon=True),
]
for w in workers:
w.start()
os.close(sniffer_w)
os.close(tailer_w)
print(f'[PARENT] Sniffer PID={workers[0].pid} LogTailer PID={workers[1].pid}')
read_fds = [sniffer_r, tailer_r]
buffers = {sniffer_r: b'', tailer_r: b''}
last_report = time.monotonic()
try:
while True:
now = time.monotonic()
timeout = max(0.1, REPORT_INTERVAL - (now - last_report))
readable, _, _ = _select.select(read_fds, [], [], timeout)
for fd in readable:
try:
chunk = os.read(fd, 4096)
except Exception:
continue
if not chunk:
read_fds.remove(fd)
continue
buffers[fd] += chunk
while b'\n' in buffers[fd]:
line, buffers[fd] = buffers[fd].split(b'\n', 1)
msg = line.decode(errors='replace').strip()
if not msg:
continue
if msg.startswith('A:'):
_record_attempt(msg[2:])
elif msg.startswith('L:'):
parts = msg[2:].split(':', 2)
if len(parts) == 3:
_record_accepted(parts[0], parts[1], parts[2])
else:
# Diagnostic messages from workers — print as-is
print(msg)
if time.monotonic() - last_report >= REPORT_INTERVAL:
try:
_write_report()
_check_truncate_logs()
except Exception as exc:
print(f'[REPORT] Error: {exc}')
last_report = time.monotonic()
except KeyboardInterrupt:
print('\n[STOP] Writing final report...')
_write_report()
print(f'[STOP] Report → {REPORT_FILE}')
print(f'[STOP] Accepted log → {ACCEPTED_LOG_FILE}')
print(f'[STOP] Reboot bans → {REBOOT_BAN_FILE}')
print(f'[STOP] Intrusion IPs : {len(blocked_ips)}')
print(f'[STOP] Accepted logins: {len(accepted_logins)}')
finally:
for w in workers:
w.terminate()
w.join(timeout=2)
for fd in (sniffer_r, tailer_r):
try:
os.close(fd)
except Exception:
pass
if __name__ == '__main__':
main()Now you know. 🙂
Enjoy.
0 Comments
No comments yet. Be the first!
Leave a Comment