""" Honeypot Alert Engine — sliding window rule evaluation. Evaluates rules against incoming events and fires alerts when thresholds are exceeded. Supports both instant and windowed (rate-based) rules. """ import time import threading import json import sqlite3 import os import socket import ipaddress import urllib.parse import urllib.request import urllib.error from collections import defaultdict # ============================================================ # Default alert rules # ============================================================ DEFAULT_RULES = [ { "id": "brute_force", "name": "Brute Force", "description": "Multiple auth attempts from same IP", "event_match": {"event_type": "SVC_AUTH_ATTEMPT"}, "group_by": "src_ip", "threshold": 5, "window_s": 60, "severity": "HIGH", }, { "id": "port_scan", "name": "Port Scan Burst", "description": "Connections to 3+ services from same IP", "event_match": {"event_type": "SVC_CONNECT"}, "group_by": "src_ip", "count_distinct": "dst_port", "threshold": 3, "window_s": 30, "severity": "HIGH", }, { "id": "ddos_syn", "name": "SYN Flood", "description": "SYN flood detected", "event_match": {"event_type": "SYN_FLOOD"}, "threshold": 1, "window_s": 0, "severity": "CRITICAL", }, { "id": "ddos_udp", "name": "UDP Flood", "description": "UDP flood detected", "event_match": {"event_type": "UDP_FLOOD"}, "threshold": 1, "window_s": 0, "severity": "CRITICAL", }, { "id": "shell_access", "name": "Shell Access", "description": "Command executed in honeypot service", "event_match": {"event_type": "SVC_COMMAND"}, "service_match": ["telnet", "ssh"], "threshold": 1, "window_s": 0, "severity": "MEDIUM", }, { "id": "honeytoken", "name": "Honeytoken Accessed", "description": "Attacker accessed a honeytoken file", "detail_contains": "HONEYTOKEN", "threshold": 1, "window_s": 0, "severity": "CRITICAL", }, { "id": "malware_sig", "name": "Malware Signature", "description": "Known malware pattern detected", "field_match": {"malware_tag": True}, "threshold": 1, "window_s": 0, "severity": "CRITICAL", }, { "id": "download_attempt", "name": "Download Attempt", "description": "Attacker attempted to download payload", "detail_contains": "DOWNLOAD", "threshold": 1, "window_s": 0, "severity": "HIGH", }, { "id": "scada_write", "name": "SCADA Write", "description": "Write attempt to Modbus/SCADA registers", "event_match": {"event_type": "SVC_COMMAND"}, "service_match": ["modbus"], "detail_contains": "Write", "threshold": 1, "window_s": 0, "severity": "CRITICAL", }, ] class HpAlertEngine: """ Evaluates alert rules against incoming events. Maintains sliding window counters for rate-based rules. """ def __init__(self, db_path=None): self.lock = threading.Lock() self.rules = [] self._next_rule_id = 100 # Windowed counters: rule_id -> group_key -> list of timestamps self._windows = defaultdict(lambda: defaultdict(list)) # Windowed distinct: rule_id -> group_key -> set of values self._distinct = defaultdict(lambda: defaultdict(set)) # Active alerts: list of dicts self.active_alerts = [] self._next_alert_id = 1 # Cooldown: rule_id:group_key -> last_alert_ts (avoid spam) self._cooldowns = {} # Store reference for kill chain analysis self._store = None # Webhook management self.webhook_urls = [] # List of {"id": int, "url": str, "name": str, "enabled": bool} self._next_webhook_id = 1 # Alert persistence if db_path is None: db_path = os.path.join(os.path.dirname(__file__), "honeypot_alerts.db") self._alert_db_path = db_path self._init_alert_db() # Load defaults for r in DEFAULT_RULES: self.rules.append(dict(r)) def set_store(self, store): """Set reference to HpStore for kill chain analysis.""" self._store = store # ============================================================ # Rule CRUD # ============================================================ def get_rules(self) -> list: with self.lock: return [dict(r) for r in self.rules] def add_rule(self, rule: dict) -> dict: with self.lock: rule["id"] = f"custom_{self._next_rule_id}" self._next_rule_id += 1 self.rules.append(rule) self._persist_rule(rule) return rule def delete_rule(self, rule_id: str) -> bool: with self.lock: before = len(self.rules) self.rules = [r for r in self.rules if r["id"] != rule_id] deleted = len(self.rules) < before if deleted and rule_id.startswith("custom_"): self._delete_persisted_rule(rule_id) return deleted # ============================================================ # Alert queries # ============================================================ def get_active_alerts(self) -> list: with self.lock: return list(self.active_alerts) def acknowledge_alert(self, alert_id: int) -> bool: with self.lock: for a in self.active_alerts: if a["id"] == alert_id: a["acknowledged"] = True a["ack_at"] = time.time() try: conn = sqlite3.connect(self._alert_db_path) conn.execute("UPDATE alerts SET acknowledged=1, ack_at=? WHERE id=?", (a["ack_at"], alert_id)) conn.commit() conn.close() except Exception as e: print(f"[HP_ALERTS] Failed to persist ack for alert {alert_id}: {e}") return True return False def get_unacknowledged_count(self) -> int: with self.lock: return sum(1 for a in self.active_alerts if not a.get("acknowledged")) # ============================================================ # Alert Persistence # ============================================================ def _init_alert_db(self): conn = sqlite3.connect(self._alert_db_path) conn.execute("""CREATE TABLE IF NOT EXISTS alerts ( id INTEGER PRIMARY KEY, rule_id TEXT, rule_name TEXT, severity TEXT, description TEXT, src_ip TEXT, service TEXT, detail TEXT, count INTEGER, group_key TEXT, timestamp REAL, event_id INTEGER, acknowledged INTEGER DEFAULT 0, ack_at REAL )""") conn.execute("""CREATE TABLE IF NOT EXISTS custom_rules ( id TEXT PRIMARY KEY, rule_json TEXT NOT NULL, created_at REAL )""") conn.commit() conn.close() # Load existing unacknowledged alerts into memory conn = sqlite3.connect(self._alert_db_path) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM alerts WHERE acknowledged = 0 ORDER BY id DESC LIMIT 200" ).fetchall() for r in rows: self.active_alerts.append(dict(r)) if rows: self._next_alert_id = max(r["id"] for r in rows) + 1 # Load persisted custom rules custom_rows = conn.execute("SELECT id, rule_json FROM custom_rules").fetchall() for cr in custom_rows: try: rule = json.loads(cr["rule_json"]) rule["id"] = cr["id"] self.rules.append(rule) if cr["id"].startswith("custom_"): num = int(cr["id"].split("_", 1)[1]) if num >= self._next_rule_id: self._next_rule_id = num + 1 except (json.JSONDecodeError, ValueError): pass conn.close() def _persist_alert(self, alert): try: conn = sqlite3.connect(self._alert_db_path) conn.execute( "INSERT INTO alerts (id, rule_id, rule_name, severity, description, " "src_ip, service, detail, count, group_key, timestamp, event_id, " "acknowledged, ack_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", (alert["id"], alert["rule_id"], alert["rule_name"], alert["severity"], alert["description"], alert["src_ip"], alert["service"], alert["detail"], alert["count"], alert["group_key"], alert["timestamp"], alert.get("event_id"), 0, None)) conn.commit() conn.close() except Exception as e: print(f"[HP_ALERTS] Failed to persist alert {alert.get('id')}: {e}") def _persist_rule(self, rule): """Persist a custom rule to the database.""" try: conn = sqlite3.connect(self._alert_db_path) rule_copy = {k: v for k, v in rule.items() if k != "id"} conn.execute( "INSERT OR REPLACE INTO custom_rules (id, rule_json, created_at) VALUES (?,?,?)", (rule["id"], json.dumps(rule_copy), time.time())) conn.commit() conn.close() except Exception as e: print(f"[HP_ALERTS] Failed to persist rule {rule.get('id')}: {e}") def _delete_persisted_rule(self, rule_id): """Delete a custom rule from the database.""" try: conn = sqlite3.connect(self._alert_db_path) conn.execute("DELETE FROM custom_rules WHERE id = ?", (rule_id,)) conn.commit() conn.close() except Exception as e: print(f"[HP_ALERTS] Failed to delete persisted rule {rule_id}: {e}") # ============================================================ # Webhooks # ============================================================ @staticmethod def _is_safe_webhook_url(url): """Validate webhook URL to prevent SSRF attacks. Blocks private/loopback IPs, file:// scheme, and non-http(s) schemes. Returns (is_safe: bool, reason: str). """ try: parsed = urllib.parse.urlparse(url) except Exception: return False, "Malformed URL" # Only allow http and https schemes if parsed.scheme not in ("http", "https"): return False, f"Scheme '{parsed.scheme}' not allowed (only http/https)" hostname = parsed.hostname if not hostname: return False, "No hostname in URL" # Resolve hostname to IP and check for private/loopback try: resolved = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) for _family, _type, _proto, _canonname, sockaddr in resolved: ip = ipaddress.ip_address(sockaddr[0]) if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local: return False, f"Resolved to private/loopback IP ({ip})" except socket.gaierror: return False, f"Cannot resolve hostname '{hostname}'" return True, "" def add_webhook(self, url, name=""): safe, reason = self._is_safe_webhook_url(url) if not safe: return {"error": f"Unsafe webhook URL: {reason}"} with self.lock: wh = {"id": self._next_webhook_id, "url": url, "name": name or url[:40], "enabled": True} self._next_webhook_id += 1 self.webhook_urls.append(wh) return wh def remove_webhook(self, wh_id): with self.lock: before = len(self.webhook_urls) self.webhook_urls = [w for w in self.webhook_urls if w["id"] != wh_id] return len(self.webhook_urls) < before def get_webhooks(self): with self.lock: return [dict(w) for w in self.webhook_urls if w["enabled"]] _WEBHOOK_MIN_INTERVAL = 10 # seconds between sends per webhook def _send_webhooks(self, alert): """Send alert to all configured webhooks (in background thread).""" now = time.time() for wh in self.webhook_urls: if not wh["enabled"]: continue # Rate limit: skip if last send was less than _WEBHOOK_MIN_INTERVAL ago last_sent = wh.get("_last_sent", 0) if now - last_sent < self._WEBHOOK_MIN_INTERVAL: continue try: payload = json.dumps({ "text": "[%s] %s: %s from %s (%s)" % ( alert["severity"], alert["rule_name"], alert["description"], alert["src_ip"], alert.get("service", "N/A")), "alert": alert, }).encode() req = urllib.request.Request( wh["url"], data=payload, headers={"Content-Type": "application/json"}, method="POST") urllib.request.urlopen(req, timeout=5) wh["_last_sent"] = now except Exception as e: print(f"[HP_ALERTS] Webhook {wh.get('name', wh['url'][:40])} failed: {e}") # ============================================================ # Event evaluation # ============================================================ def evaluate(self, event: dict) -> list: """ Evaluate all rules against an event. Returns list of newly fired alerts. """ fired = [] now = time.time() with self.lock: for rule in self.rules: if self._matches(rule, event): alerts = self._check_threshold(rule, event, now) fired.extend(alerts) # Kill chain analysis (on significant events only) fired.extend(self._check_kill_chain(event, now)) return fired def _check_kill_chain(self, event, now): """Check if attacker has reached advanced kill chain phases.""" if not self._store: return [] ip = event.get("src_ip", "") if not ip or ip == "0.0.0.0": return [] # Only check on significant events etype = event.get("event_type", "") detail = event.get("detail", "") if etype not in ("SVC_COMMAND", "SVC_MQTT_MSG") and "HONEYTOKEN" not in detail: return [] # Cooldown: check kill chain per IP max once per 5 min cooldown_key = f"kill_chain:{ip}" last = self._cooldowns.get(cooldown_key, 0) if now - last < 300: return [] analysis = self._store.get_kill_chain_analysis(ip) if analysis["score"] < 40: return [] self._cooldowns[cooldown_key] = now rule = { "id": "kill_chain_advanced", "name": "Kill Chain: Advanced Progression", "severity": "CRITICAL", "description": "Attacker reached phase '%s' (score %d)" % ( analysis["max_phase"], analysis["score"]), } return [self._fire_alert(rule, event, now, ip, analysis["score"])] def _matches(self, rule: dict, event: dict) -> bool: """Check if event matches rule conditions.""" # event_match: check specific field values em = rule.get("event_match") if em: for k, v in em.items(): if event.get(k) != v: return False # service_match: check service in list sm = rule.get("service_match") if sm: if event.get("service") not in sm: return False # detail_contains: substring match dc = rule.get("detail_contains") if dc: detail = event.get("detail", "") if dc not in detail: return False # field_match: check if field exists and is truthy fm = rule.get("field_match") if fm: for k, v in fm.items(): if v is True: if not event.get(k): return False elif event.get(k) != v: return False return True def _check_threshold(self, rule: dict, event: dict, now: float) -> list: """Check if threshold is met, return alerts if so.""" rule_id = rule["id"] window = rule.get("window_s", 0) threshold = rule.get("threshold", 1) # Instant rules (window=0): fire immediately if window == 0: cooldown_key = f"{rule_id}:{event.get('src_ip', '')}" last = self._cooldowns.get(cooldown_key, 0) if now - last < 10: # 10s cooldown for instant rules return [] self._cooldowns[cooldown_key] = now return [self._fire_alert(rule, event, now)] # Windowed rules: track timestamps per group key group_field = rule.get("group_by", "src_ip") group_key = event.get(group_field, "unknown") timestamps = self._windows[rule_id][group_key] # Prune old entries cutoff = now - window self._windows[rule_id][group_key] = [t for t in timestamps if t > cutoff] timestamps = self._windows[rule_id][group_key] timestamps.append(now) # count_distinct: count unique values of a field distinct_field = rule.get("count_distinct") if distinct_field: self._distinct[rule_id][group_key].add(event.get(distinct_field, 0)) # Prune distinct on window reset (simplified: keep set, reset when empty) count = len(self._distinct[rule_id][group_key]) else: count = len(timestamps) if count >= threshold: cooldown_key = f"{rule_id}:{group_key}" last = self._cooldowns.get(cooldown_key, 0) if now - last < window: # cooldown = window duration return [] self._cooldowns[cooldown_key] = now # Clear window after firing self._windows[rule_id][group_key] = [] if distinct_field: self._distinct[rule_id][group_key] = set() return [self._fire_alert(rule, event, now, group_key, count)] return [] def _fire_alert(self, rule: dict, event: dict, now: float, group_key: str = None, count: int = 1) -> dict: """Create and store an alert.""" alert = { "id": self._next_alert_id, "rule_id": rule["id"], "rule_name": rule.get("name", rule["id"]), "severity": rule.get("severity", "HIGH"), "description": rule.get("description", ""), "src_ip": event.get("src_ip", ""), "service": event.get("service", ""), "detail": event.get("detail", ""), "count": count, "group_key": group_key or event.get("src_ip", ""), "timestamp": now, "event_id": event.get("id"), "acknowledged": False, } self._next_alert_id += 1 self.active_alerts.append(alert) # Keep max 200 alerts if len(self.active_alerts) > 200: self.active_alerts = self.active_alerts[-200:] # Persist alert to database self._persist_alert(alert) # Dispatch webhooks in background if self.webhook_urls: threading.Thread(target=self._send_webhooks, args=(dict(alert),), daemon=True).start() return alert