espilon-source/tools/C3PO/hp_dashboard/hp_alerts.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

556 lines
20 KiB
Python

"""
Honeypot Alert Engine — sliding window rule evaluation.
Evaluates rules against incoming events and fires alerts
when thresholds are exceeded. Supports both instant and
windowed (rate-based) rules.
"""
import time
import threading
import json
import sqlite3
import os
import socket
import ipaddress
import urllib.parse
import urllib.request
import urllib.error
from collections import defaultdict
# ============================================================
# Default alert rules
# ============================================================
DEFAULT_RULES = [
{
"id": "brute_force",
"name": "Brute Force",
"description": "Multiple auth attempts from same IP",
"event_match": {"event_type": "SVC_AUTH_ATTEMPT"},
"group_by": "src_ip",
"threshold": 5,
"window_s": 60,
"severity": "HIGH",
},
{
"id": "port_scan",
"name": "Port Scan Burst",
"description": "Connections to 3+ services from same IP",
"event_match": {"event_type": "SVC_CONNECT"},
"group_by": "src_ip",
"count_distinct": "dst_port",
"threshold": 3,
"window_s": 30,
"severity": "HIGH",
},
{
"id": "ddos_syn",
"name": "SYN Flood",
"description": "SYN flood detected",
"event_match": {"event_type": "SYN_FLOOD"},
"threshold": 1,
"window_s": 0,
"severity": "CRITICAL",
},
{
"id": "ddos_udp",
"name": "UDP Flood",
"description": "UDP flood detected",
"event_match": {"event_type": "UDP_FLOOD"},
"threshold": 1,
"window_s": 0,
"severity": "CRITICAL",
},
{
"id": "shell_access",
"name": "Shell Access",
"description": "Command executed in honeypot service",
"event_match": {"event_type": "SVC_COMMAND"},
"service_match": ["telnet", "ssh"],
"threshold": 1,
"window_s": 0,
"severity": "MEDIUM",
},
{
"id": "honeytoken",
"name": "Honeytoken Accessed",
"description": "Attacker accessed a honeytoken file",
"detail_contains": "HONEYTOKEN",
"threshold": 1,
"window_s": 0,
"severity": "CRITICAL",
},
{
"id": "malware_sig",
"name": "Malware Signature",
"description": "Known malware pattern detected",
"field_match": {"malware_tag": True},
"threshold": 1,
"window_s": 0,
"severity": "CRITICAL",
},
{
"id": "download_attempt",
"name": "Download Attempt",
"description": "Attacker attempted to download payload",
"detail_contains": "DOWNLOAD",
"threshold": 1,
"window_s": 0,
"severity": "HIGH",
},
{
"id": "scada_write",
"name": "SCADA Write",
"description": "Write attempt to Modbus/SCADA registers",
"event_match": {"event_type": "SVC_COMMAND"},
"service_match": ["modbus"],
"detail_contains": "Write",
"threshold": 1,
"window_s": 0,
"severity": "CRITICAL",
},
]
class HpAlertEngine:
"""
Evaluates alert rules against incoming events.
Maintains sliding window counters for rate-based rules.
"""
def __init__(self, db_path=None):
self.lock = threading.Lock()
self.rules = []
self._next_rule_id = 100
# Windowed counters: rule_id -> group_key -> list of timestamps
self._windows = defaultdict(lambda: defaultdict(list))
# Windowed distinct: rule_id -> group_key -> set of values
self._distinct = defaultdict(lambda: defaultdict(set))
# Active alerts: list of dicts
self.active_alerts = []
self._next_alert_id = 1
# Cooldown: rule_id:group_key -> last_alert_ts (avoid spam)
self._cooldowns = {}
# Store reference for kill chain analysis
self._store = None
# Webhook management
self.webhook_urls = [] # List of {"id": int, "url": str, "name": str, "enabled": bool}
self._next_webhook_id = 1
# Alert persistence
if db_path is None:
db_path = os.path.join(os.path.dirname(__file__), "honeypot_alerts.db")
self._alert_db_path = db_path
self._init_alert_db()
# Load defaults
for r in DEFAULT_RULES:
self.rules.append(dict(r))
def set_store(self, store):
"""Set reference to HpStore for kill chain analysis."""
self._store = store
# ============================================================
# Rule CRUD
# ============================================================
def get_rules(self) -> list:
with self.lock:
return [dict(r) for r in self.rules]
def add_rule(self, rule: dict) -> dict:
with self.lock:
rule["id"] = f"custom_{self._next_rule_id}"
self._next_rule_id += 1
self.rules.append(rule)
self._persist_rule(rule)
return rule
def delete_rule(self, rule_id: str) -> bool:
with self.lock:
before = len(self.rules)
self.rules = [r for r in self.rules if r["id"] != rule_id]
deleted = len(self.rules) < before
if deleted and rule_id.startswith("custom_"):
self._delete_persisted_rule(rule_id)
return deleted
# ============================================================
# Alert queries
# ============================================================
def get_active_alerts(self) -> list:
with self.lock:
return list(self.active_alerts)
def acknowledge_alert(self, alert_id: int) -> bool:
with self.lock:
for a in self.active_alerts:
if a["id"] == alert_id:
a["acknowledged"] = True
a["ack_at"] = time.time()
try:
conn = sqlite3.connect(self._alert_db_path)
conn.execute("UPDATE alerts SET acknowledged=1, ack_at=? WHERE id=?",
(a["ack_at"], alert_id))
conn.commit()
conn.close()
except Exception as e:
print(f"[HP_ALERTS] Failed to persist ack for alert {alert_id}: {e}")
return True
return False
def get_unacknowledged_count(self) -> int:
with self.lock:
return sum(1 for a in self.active_alerts if not a.get("acknowledged"))
# ============================================================
# Alert Persistence
# ============================================================
def _init_alert_db(self):
conn = sqlite3.connect(self._alert_db_path)
conn.execute("""CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY, rule_id TEXT, rule_name TEXT,
severity TEXT, description TEXT, src_ip TEXT, service TEXT,
detail TEXT, count INTEGER, group_key TEXT,
timestamp REAL, event_id INTEGER,
acknowledged INTEGER DEFAULT 0, ack_at REAL
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS custom_rules (
id TEXT PRIMARY KEY,
rule_json TEXT NOT NULL,
created_at REAL
)""")
conn.commit()
conn.close()
# Load existing unacknowledged alerts into memory
conn = sqlite3.connect(self._alert_db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM alerts WHERE acknowledged = 0 ORDER BY id DESC LIMIT 200"
).fetchall()
for r in rows:
self.active_alerts.append(dict(r))
if rows:
self._next_alert_id = max(r["id"] for r in rows) + 1
# Load persisted custom rules
custom_rows = conn.execute("SELECT id, rule_json FROM custom_rules").fetchall()
for cr in custom_rows:
try:
rule = json.loads(cr["rule_json"])
rule["id"] = cr["id"]
self.rules.append(rule)
if cr["id"].startswith("custom_"):
num = int(cr["id"].split("_", 1)[1])
if num >= self._next_rule_id:
self._next_rule_id = num + 1
except (json.JSONDecodeError, ValueError):
pass
conn.close()
def _persist_alert(self, alert):
try:
conn = sqlite3.connect(self._alert_db_path)
conn.execute(
"INSERT INTO alerts (id, rule_id, rule_name, severity, description, "
"src_ip, service, detail, count, group_key, timestamp, event_id, "
"acknowledged, ack_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(alert["id"], alert["rule_id"], alert["rule_name"],
alert["severity"], alert["description"], alert["src_ip"],
alert["service"], alert["detail"], alert["count"],
alert["group_key"], alert["timestamp"], alert.get("event_id"),
0, None))
conn.commit()
conn.close()
except Exception as e:
print(f"[HP_ALERTS] Failed to persist alert {alert.get('id')}: {e}")
def _persist_rule(self, rule):
"""Persist a custom rule to the database."""
try:
conn = sqlite3.connect(self._alert_db_path)
rule_copy = {k: v for k, v in rule.items() if k != "id"}
conn.execute(
"INSERT OR REPLACE INTO custom_rules (id, rule_json, created_at) VALUES (?,?,?)",
(rule["id"], json.dumps(rule_copy), time.time()))
conn.commit()
conn.close()
except Exception as e:
print(f"[HP_ALERTS] Failed to persist rule {rule.get('id')}: {e}")
def _delete_persisted_rule(self, rule_id):
"""Delete a custom rule from the database."""
try:
conn = sqlite3.connect(self._alert_db_path)
conn.execute("DELETE FROM custom_rules WHERE id = ?", (rule_id,))
conn.commit()
conn.close()
except Exception as e:
print(f"[HP_ALERTS] Failed to delete persisted rule {rule_id}: {e}")
# ============================================================
# Webhooks
# ============================================================
@staticmethod
def _is_safe_webhook_url(url):
"""Validate webhook URL to prevent SSRF attacks.
Blocks private/loopback IPs, file:// scheme, and non-http(s) schemes.
Returns (is_safe: bool, reason: str).
"""
try:
parsed = urllib.parse.urlparse(url)
except Exception:
return False, "Malformed URL"
# Only allow http and https schemes
if parsed.scheme not in ("http", "https"):
return False, f"Scheme '{parsed.scheme}' not allowed (only http/https)"
hostname = parsed.hostname
if not hostname:
return False, "No hostname in URL"
# Resolve hostname to IP and check for private/loopback
try:
resolved = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC,
socket.SOCK_STREAM)
for _family, _type, _proto, _canonname, sockaddr in resolved:
ip = ipaddress.ip_address(sockaddr[0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
return False, f"Resolved to private/loopback IP ({ip})"
except socket.gaierror:
return False, f"Cannot resolve hostname '{hostname}'"
return True, ""
def add_webhook(self, url, name=""):
safe, reason = self._is_safe_webhook_url(url)
if not safe:
return {"error": f"Unsafe webhook URL: {reason}"}
with self.lock:
wh = {"id": self._next_webhook_id, "url": url,
"name": name or url[:40], "enabled": True}
self._next_webhook_id += 1
self.webhook_urls.append(wh)
return wh
def remove_webhook(self, wh_id):
with self.lock:
before = len(self.webhook_urls)
self.webhook_urls = [w for w in self.webhook_urls if w["id"] != wh_id]
return len(self.webhook_urls) < before
def get_webhooks(self):
with self.lock:
return [dict(w) for w in self.webhook_urls if w["enabled"]]
_WEBHOOK_MIN_INTERVAL = 10 # seconds between sends per webhook
def _send_webhooks(self, alert):
"""Send alert to all configured webhooks (in background thread)."""
now = time.time()
for wh in self.webhook_urls:
if not wh["enabled"]:
continue
# Rate limit: skip if last send was less than _WEBHOOK_MIN_INTERVAL ago
last_sent = wh.get("_last_sent", 0)
if now - last_sent < self._WEBHOOK_MIN_INTERVAL:
continue
try:
payload = json.dumps({
"text": "[%s] %s: %s from %s (%s)" % (
alert["severity"], alert["rule_name"],
alert["description"], alert["src_ip"],
alert.get("service", "N/A")),
"alert": alert,
}).encode()
req = urllib.request.Request(
wh["url"], data=payload,
headers={"Content-Type": "application/json"},
method="POST")
urllib.request.urlopen(req, timeout=5)
wh["_last_sent"] = now
except Exception as e:
print(f"[HP_ALERTS] Webhook {wh.get('name', wh['url'][:40])} failed: {e}")
# ============================================================
# Event evaluation
# ============================================================
def evaluate(self, event: dict) -> list:
"""
Evaluate all rules against an event.
Returns list of newly fired alerts.
"""
fired = []
now = time.time()
with self.lock:
for rule in self.rules:
if self._matches(rule, event):
alerts = self._check_threshold(rule, event, now)
fired.extend(alerts)
# Kill chain analysis (on significant events only)
fired.extend(self._check_kill_chain(event, now))
return fired
def _check_kill_chain(self, event, now):
"""Check if attacker has reached advanced kill chain phases."""
if not self._store:
return []
ip = event.get("src_ip", "")
if not ip or ip == "0.0.0.0":
return []
# Only check on significant events
etype = event.get("event_type", "")
detail = event.get("detail", "")
if etype not in ("SVC_COMMAND", "SVC_MQTT_MSG") and "HONEYTOKEN" not in detail:
return []
# Cooldown: check kill chain per IP max once per 5 min
cooldown_key = f"kill_chain:{ip}"
last = self._cooldowns.get(cooldown_key, 0)
if now - last < 300:
return []
analysis = self._store.get_kill_chain_analysis(ip)
if analysis["score"] < 40:
return []
self._cooldowns[cooldown_key] = now
rule = {
"id": "kill_chain_advanced",
"name": "Kill Chain: Advanced Progression",
"severity": "CRITICAL",
"description": "Attacker reached phase '%s' (score %d)" % (
analysis["max_phase"], analysis["score"]),
}
return [self._fire_alert(rule, event, now, ip, analysis["score"])]
def _matches(self, rule: dict, event: dict) -> bool:
"""Check if event matches rule conditions."""
# event_match: check specific field values
em = rule.get("event_match")
if em:
for k, v in em.items():
if event.get(k) != v:
return False
# service_match: check service in list
sm = rule.get("service_match")
if sm:
if event.get("service") not in sm:
return False
# detail_contains: substring match
dc = rule.get("detail_contains")
if dc:
detail = event.get("detail", "")
if dc not in detail:
return False
# field_match: check if field exists and is truthy
fm = rule.get("field_match")
if fm:
for k, v in fm.items():
if v is True:
if not event.get(k):
return False
elif event.get(k) != v:
return False
return True
def _check_threshold(self, rule: dict, event: dict, now: float) -> list:
"""Check if threshold is met, return alerts if so."""
rule_id = rule["id"]
window = rule.get("window_s", 0)
threshold = rule.get("threshold", 1)
# Instant rules (window=0): fire immediately
if window == 0:
cooldown_key = f"{rule_id}:{event.get('src_ip', '')}"
last = self._cooldowns.get(cooldown_key, 0)
if now - last < 10: # 10s cooldown for instant rules
return []
self._cooldowns[cooldown_key] = now
return [self._fire_alert(rule, event, now)]
# Windowed rules: track timestamps per group key
group_field = rule.get("group_by", "src_ip")
group_key = event.get(group_field, "unknown")
timestamps = self._windows[rule_id][group_key]
# Prune old entries
cutoff = now - window
self._windows[rule_id][group_key] = [t for t in timestamps if t > cutoff]
timestamps = self._windows[rule_id][group_key]
timestamps.append(now)
# count_distinct: count unique values of a field
distinct_field = rule.get("count_distinct")
if distinct_field:
self._distinct[rule_id][group_key].add(event.get(distinct_field, 0))
# Prune distinct on window reset (simplified: keep set, reset when empty)
count = len(self._distinct[rule_id][group_key])
else:
count = len(timestamps)
if count >= threshold:
cooldown_key = f"{rule_id}:{group_key}"
last = self._cooldowns.get(cooldown_key, 0)
if now - last < window: # cooldown = window duration
return []
self._cooldowns[cooldown_key] = now
# Clear window after firing
self._windows[rule_id][group_key] = []
if distinct_field:
self._distinct[rule_id][group_key] = set()
return [self._fire_alert(rule, event, now, group_key, count)]
return []
def _fire_alert(self, rule: dict, event: dict, now: float,
group_key: str = None, count: int = 1) -> dict:
"""Create and store an alert."""
alert = {
"id": self._next_alert_id,
"rule_id": rule["id"],
"rule_name": rule.get("name", rule["id"]),
"severity": rule.get("severity", "HIGH"),
"description": rule.get("description", ""),
"src_ip": event.get("src_ip", ""),
"service": event.get("service", ""),
"detail": event.get("detail", ""),
"count": count,
"group_key": group_key or event.get("src_ip", ""),
"timestamp": now,
"event_id": event.get("id"),
"acknowledged": False,
}
self._next_alert_id += 1
self.active_alerts.append(alert)
# Keep max 200 alerts
if len(self.active_alerts) > 200:
self.active_alerts = self.active_alerts[-200:]
# Persist alert to database
self._persist_alert(alert)
# Dispatch webhooks in background
if self.webhook_urls:
threading.Thread(target=self._send_webhooks, args=(dict(alert),), daemon=True).start()
return alert