""" Honeypot Event Store — SQLite-backed storage with structured field parsing. Stores all honeypot events in a SQLite database for complex queries, while maintaining in-memory counters for fast dashboard polling. """ import re import time import hashlib import sqlite3 import threading import json from collections import defaultdict, OrderedDict from typing import Optional # ============================================================ # Field extraction regexes — parse structured data from detail # ============================================================ RE_USER = re.compile(r"user='([^']*)'") RE_PASS = re.compile(r"pass='([^']*)'") RE_CMD = re.compile(r"cmd='([^']*)'") RE_URL = re.compile(r"url='([^']*)'") RE_FILE = re.compile(r"file='([^']*)'") RE_CLIENT = re.compile(r"client='([^']*)'") RE_COMMUNITY = re.compile(r"community='([^']*)'") RE_OS_TAG = re.compile(r"\[(\w+)\]\s*$") RE_MALWARE_TAG = re.compile(r"\[(Mirai|Botnet-generic|RCE-pipe|Obfuscation|Reverse-shell|" r"RCE-python|RCE-perl|Destructive|Mirai-variant|HONEYTOKEN)\]") PORT_TO_SERVICE = { 22: "ssh", 23: "telnet", 80: "http", 1883: "mqtt", 21: "ftp", 53: "dns", 161: "snmp", 69: "tftp", 5683: "coap", 6379: "redis", 554: "rtsp", 3306: "mysql", 502: "modbus", 1900: "upnp", 5060: "sip", 2323: "telnet", 23231: "telnet", } VALID_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"} # MITRE ATT&CK technique mapping MITRE_MAP = { "WIFI_DEAUTH": [("T1498.001", "Direct Network Flood", "Impact")], "WIFI_PROBE": [("T1595.001", "Scanning IP Blocks", "Reconnaissance")], "WIFI_EVIL_TWIN": [("T1557.002", "ARP Cache Poisoning", "Credential Access")], "WIFI_BEACON_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")], "WIFI_EAPOL": [("T1557", "Adversary-in-the-Middle", "Credential Access")], "ARP_SPOOF": [("T1557.002", "ARP Cache Poisoning", "Credential Access")], "ARP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")], "PORT_SCAN": [("T1046", "Network Service Discovery", "Discovery")], "ICMP_SWEEP": [("T1018", "Remote System Discovery", "Discovery")], "SYN_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")], "UDP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")], "SVC_CONNECT": [("T1021", "Remote Services", "Lateral Movement")], "SVC_AUTH_ATTEMPT": [("T1110", "Brute Force", "Credential Access")], "SVC_COMMAND": [("T1059", "Command Interpreter", "Execution")], "SVC_HTTP_REQUEST": [("T1190", "Exploit Public-Facing App", "Initial Access")], "SVC_MQTT_MSG": [("T1071.001", "Web Protocols", "Command and Control")], } MITRE_DETAIL_MAP = { "HONEYTOKEN": [("T1083", "File and Directory Discovery", "Discovery"), ("T1005", "Data from Local System", "Collection")], "DOWNLOAD": [("T1105", "Ingress Tool Transfer", "Command and Control")], "DNS_TUNNEL": [("T1071.004", "DNS", "Command and Control"), ("T1048.003", "Exfiltration Over Non-C2", "Exfiltration")], "Mirai": [("T1583.005", "Botnet", "Resource Development")], "Reverse-shell": [("T1059.004", "Unix Shell", "Execution")], "RCE-python": [("T1059.006", "Python", "Execution")], "CONFIG SET": [("T1053", "Scheduled Task/Job", "Persistence")], "SLAVEOF": [("T1020", "Automated Exfiltration", "Exfiltration")], "Write": [("T1565.001", "Stored Data Manipulation", "Impact")], } VALID_EVENT_TYPES = set(MITRE_MAP.keys()) | { "HONEYTOKEN", "SVC_FTP_CMD", "SVC_SNMP_QUERY", "SVC_COAP_REQUEST", "SVC_MODBUS_QUERY", "SVC_UPNP_REQUEST", "SVC_SIP_REGISTER", "SVC_REDIS_CMD", "SVC_RTSP_REQUEST", "SVC_DNS_QUERY", } # ============================================================ # Kill Chain phase definitions and classifier # ============================================================ KILL_CHAIN_PHASES = [ {"id": "recon", "order": 1, "score": 10, "label": "Reconnaissance"}, {"id": "weaponize", "order": 2, "score": 15, "label": "Weaponization"}, {"id": "delivery", "order": 3, "score": 20, "label": "Delivery"}, {"id": "exploitation", "order": 4, "score": 30, "label": "Exploitation"}, {"id": "installation", "order": 5, "score": 40, "label": "Installation"}, {"id": "c2", "order": 6, "score": 50, "label": "Command & Control"}, {"id": "actions", "order": 7, "score": 60, "label": "Actions on Objectives"}, ] PHASE_SCORES = {p["id"]: p["score"] for p in KILL_CHAIN_PHASES} PHASE_ORDER = {p["id"]: p["order"] for p in KILL_CHAIN_PHASES} def _classify_kill_chain_phase(event_type, detail): """Classify an event into the highest matching kill chain phase.""" d = (detail or "").upper() # Phase 7: Actions on Objectives (exfiltration, sabotage) if "HONEYTOKEN EXFIL" in d or "FTP STOR" in d or "CONFIG SET" in d: return "actions" # Phase 6: C2 (command & control channels) if event_type == "SVC_MQTT_MSG" or "SLAVEOF" in d or "EVAL" in d: return "c2" # Phase 5: Installation (malware drop, honeytoken access) if "HONEYTOKEN" in d or "DOWNLOAD" in d: return "installation" # Phase 4: Exploitation (command execution in shell) if event_type == "SVC_COMMAND": return "exploitation" # Phase 3: Delivery (service connection, auth attempts) if event_type in ("SVC_CONNECT", "SVC_AUTH_ATTEMPT", "SVC_HTTP_REQUEST"): return "delivery" # Phase 2: Weaponization (network manipulation) if event_type in ("ARP_SPOOF", "ARP_FLOOD", "WIFI_EVIL_TWIN", "WIFI_EAPOL"): return "weaponize" if "DNS_TUNNEL" in d: return "weaponize" # Phase 1: Reconnaissance (scanning, probing) if event_type in ("WIFI_PROBE", "PORT_SCAN", "ICMP_SWEEP", "SYN_FLOOD", "UDP_FLOOD"): return "recon" return None def _extract_mitre(event_type, detail): """Resolve MITRE ATT&CK techniques for an event.""" techniques = list(MITRE_MAP.get(event_type, [])) for keyword, techs in MITRE_DETAIL_MAP.items(): if keyword in detail: techniques.extend(techs) seen = set() result = [] for t in techniques: if t[0] not in seen: seen.add(t[0]) result.append({"id": t[0], "name": t[1], "tactic": t[2]}) return result def _session_id(src_ip: str, dst_port: int, timestamp: float) -> str: """Compute session ID: md5(ip + port + 5min_bucket)[:12].""" bucket = int(timestamp / 300) key = f"{src_ip}:{dst_port}:{bucket}" return hashlib.md5(key.encode()).hexdigest()[:12] def _extract_fields(detail: str) -> dict: """Extract structured fields from event detail string.""" fields = {} m = RE_USER.search(detail) if m: fields["username"] = m.group(1) m = RE_PASS.search(detail) if m: fields["password"] = m.group(1) m = RE_CMD.search(detail) if m: fields["command"] = m.group(1) m = RE_URL.search(detail) if m: fields["url"] = m.group(1) m = RE_FILE.search(detail) if m: fields["path"] = m.group(1) m = RE_CLIENT.search(detail) if m: fields["client_id"] = m.group(1) m = RE_COMMUNITY.search(detail) if m: fields["username"] = m.group(1) # community as username m = RE_MALWARE_TAG.search(detail) if m: fields["malware_tag"] = m.group(1) m = RE_OS_TAG.search(detail) if m and m.group(1) in ("Linux", "Windows", "macOS", "Cisco"): fields["os_tag"] = m.group(1) return fields class HpStore: """ Thread-safe SQLite-backed event store. Maintains in-memory counters for fast dashboard polling. """ def __init__(self, db_path: str = "honeypot_events.db", geo_lookup=None): self.db_path = db_path self.lock = threading.Lock() self._local = threading.local() self._geo = geo_lookup # Optional HpGeoLookup instance # In-memory counters for fast polling self.total_count = 0 self.count_by_type: dict = defaultdict(int) self.count_by_severity: dict = defaultdict(int) self.count_by_device: dict = defaultdict(int) self.attackers: OrderedDict = OrderedDict() self._last_id = 0 # SSE subscribers self._sse_queues: list = [] # Alert engine (set via set_alert_engine after bootstrap) self._alert_engine = None self._init_db() def set_alert_engine(self, engine): """Wire the alert engine so events are evaluated for alerts.""" self._alert_engine = engine def _get_conn(self) -> sqlite3.Connection: """Thread-local SQLite connection.""" if not hasattr(self._local, "conn") or self._local.conn is None: self._local.conn = sqlite3.connect(self.db_path) self._local.conn.row_factory = sqlite3.Row self._local.conn.execute("PRAGMA journal_mode=WAL") self._local.conn.execute("PRAGMA synchronous=NORMAL") return self._local.conn def _init_db(self): """Create tables and indexes, with migrations for existing DBs.""" conn = self._get_conn() # Create table (without mitre_techniques for compat with existing DBs) conn.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, device_id TEXT NOT NULL, event_type TEXT NOT NULL, severity TEXT NOT NULL, src_mac TEXT, src_ip TEXT, src_port INTEGER, dst_port INTEGER, detail TEXT, received_at REAL, service TEXT, username TEXT, password TEXT, command TEXT, url TEXT, path TEXT, client_id TEXT, session_id TEXT, os_tag TEXT, malware_tag TEXT ) """) # Migration: add mitre_techniques column if missing cols = {r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()} if "mitre_techniques" not in cols: conn.execute("ALTER TABLE events ADD COLUMN mitre_techniques TEXT") # Create indexes (safe now that all columns exist) conn.executescript(""" CREATE INDEX IF NOT EXISTS idx_ts ON events(timestamp); CREATE INDEX IF NOT EXISTS idx_type ON events(event_type); CREATE INDEX IF NOT EXISTS idx_sev ON events(severity); CREATE INDEX IF NOT EXISTS idx_ip ON events(src_ip); CREATE INDEX IF NOT EXISTS idx_session ON events(session_id); CREATE INDEX IF NOT EXISTS idx_service ON events(service); CREATE INDEX IF NOT EXISTS idx_mitre ON events(mitre_techniques); """) conn.commit() # Load counters from existing data row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone() self.total_count = row["cnt"] or 0 self._last_id = row["max_id"] or 0 for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"): self.count_by_type[row["event_type"]] = row["cnt"] for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"): self.count_by_severity[row["severity"]] = row["cnt"] for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"): self.count_by_device[row["device_id"]] = row["cnt"] def parse_and_store(self, device_id: str, payload: str) -> Optional[dict]: """ Parse EVT wire format and store in SQLite. Format: EVT||||:>| Returns event dict or None. """ try: parts = payload.split("|", 5) if len(parts) < 6 or parts[0] not in ("EVT", "HP"): return None _, event_type, severity, src_mac, addr_part, detail = parts # Validate severity to prevent pipe injection if severity not in VALID_SEVERITIES: severity = "LOW" src_ip = "0.0.0.0" src_port = 0 dst_port = 0 if ">" in addr_part: left, dst_port_s = addr_part.split(">", 1) dst_port = int(dst_port_s) if dst_port_s.isdigit() else 0 if ":" in left: src_ip, src_port_s = left.rsplit(":", 1) src_port = int(src_port_s) if src_port_s.isdigit() else 0 now = time.time() fields = _extract_fields(detail) service = PORT_TO_SERVICE.get(dst_port, "") session_id = _session_id(src_ip, dst_port, now) mitre = _extract_mitre(event_type, detail) mitre_json = json.dumps(mitre) if mitre else None conn = self._get_conn() cur = conn.execute( """INSERT INTO events (timestamp, device_id, event_type, severity, src_mac, src_ip, src_port, dst_port, detail, received_at, service, username, password, command, url, path, client_id, session_id, os_tag, malware_tag, mitre_techniques) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (now, device_id, event_type, severity, src_mac, src_ip, src_port, dst_port, detail, now, service, fields.get("username"), fields.get("password"), fields.get("command"), fields.get("url"), fields.get("path"), fields.get("client_id"), session_id, fields.get("os_tag"), fields.get("malware_tag"), mitre_json) ) conn.commit() event_id = cur.lastrowid with self.lock: self.total_count += 1 self._last_id = event_id self.count_by_type[event_type] += 1 self.count_by_severity[severity] += 1 self.count_by_device[device_id] += 1 if src_ip != "0.0.0.0": if src_ip not in self.attackers: # Evict oldest (LRU) if at capacity — O(1) if len(self.attackers) >= 10000: self.attackers.popitem(last=False) self.attackers[src_ip] = { "ip": src_ip, "mac": src_mac, "types": set(), "count": 0, "first_seen": now, "last_seen": now, } # Move to end on each access so LRU is always at front self.attackers.move_to_end(src_ip) atk = self.attackers[src_ip] atk["types"].add(event_type) atk["count"] += 1 atk["last_seen"] = now if src_mac != "00:00:00:00:00:00": atk["mac"] = src_mac evt_dict = { "id": event_id, "timestamp": now, "device_id": device_id, "event_type": event_type, "severity": severity, "src_mac": src_mac, "src_ip": src_ip, "src_port": src_port, "dst_port": dst_port, "detail": detail, "received_at": now, "service": service, "session_id": session_id, **fields, } # Trigger background geo resolution (non-blocking) if self._geo and src_ip != "0.0.0.0": self._geo.lookup_ip(src_ip) # Notify SSE subscribers (snapshot under lock) with self.lock: sse_snapshot = list(self._sse_queues) for q in sse_snapshot: try: q.append(evt_dict) except Exception: pass # Evaluate alert rules if self._alert_engine: try: self._alert_engine.evaluate(evt_dict) except Exception as e: print(f"[HP_STORE] Alert evaluation error: {e}") return evt_dict except Exception as e: print(f"[HP_STORE] Failed to parse event: {e} — payload={payload!r}") return None # ============================================================ # Query APIs # ============================================================ def get_recent_events(self, limit: int = 100, event_type: str = None, severity: str = None, src_ip: str = None, service: str = None, offset: int = 0) -> list: """Get recent events with optional filters.""" conn = self._get_conn() conditions = [] params = [] if event_type: conditions.append("event_type = ?") params.append(event_type) if severity: conditions.append("severity = ?") params.append(severity) if src_ip: conditions.append("src_ip = ?") params.append(src_ip) if service: conditions.append("service = ?") params.append(service) where = " WHERE " + " AND ".join(conditions) if conditions else "" sql = f"SELECT * FROM events{where} ORDER BY id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] def get_event_by_id(self, event_id: int) -> Optional[dict]: """Get single event with related events (same IP +-5min).""" conn = self._get_conn() row = conn.execute("SELECT * FROM events WHERE id = ?", (event_id,)).fetchone() if not row: return None evt = dict(row) # Related events: same IP, +-5 minutes related = conn.execute( """SELECT * FROM events WHERE src_ip = ? AND id != ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp LIMIT 20""", (evt["src_ip"], event_id, evt["timestamp"] - 300, evt["timestamp"] + 300) ).fetchall() evt["related"] = [dict(r) for r in related] return evt def get_attacker_profile(self, ip: str) -> dict: """Full attacker profile: events, credentials, commands, sessions.""" conn = self._get_conn() events = conn.execute( "SELECT * FROM events WHERE src_ip = ? ORDER BY timestamp DESC LIMIT 200", (ip,) ).fetchall() creds = conn.execute( """SELECT username, password, service, timestamp FROM events WHERE src_ip = ? AND username IS NOT NULL ORDER BY timestamp""", (ip,) ).fetchall() commands = conn.execute( """SELECT command, detail, service, timestamp FROM events WHERE src_ip = ? AND command IS NOT NULL ORDER BY timestamp""", (ip,) ).fetchall() sessions = conn.execute( """SELECT session_id, service, MIN(timestamp) as start, MAX(timestamp) as end, COUNT(*) as event_count, MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3 WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_sev FROM events WHERE src_ip = ? GROUP BY session_id ORDER BY start DESC""", (ip,) ).fetchall() profile = { "ip": ip, "total_events": len(events), "events": [dict(r) for r in events[:50]], "credentials": [dict(r) for r in creds], "commands": [dict(r) for r in commands], "sessions": [dict(r) for r in sessions], } if self._geo: geo = self._geo.lookup_ip(ip) profile["country"] = geo.get("country", "") profile["country_code"] = geo.get("country_code", "") profile["city"] = geo.get("city", "") profile["isp"] = geo.get("isp", "") profile["is_private"] = geo.get("is_private", False) # Vendor from first known MAC for this IP with self.lock: atk = self.attackers.get(ip, {}) profile["vendor"] = self._geo.lookup_mac_vendor(atk.get("mac", "")) return profile def get_sessions(self, limit: int = 50, src_ip: str = None) -> list: """Get grouped sessions.""" conn = self._get_conn() conditions = [] params = [] if src_ip: conditions.append("src_ip = ?") params.append(src_ip) where = " WHERE " + " AND ".join(conditions) if conditions else "" sql = f"""SELECT session_id, src_ip, service, dst_port, MIN(timestamp) as start_time, MAX(timestamp) as end_time, COUNT(*) as event_count, MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3 WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_severity, SUM(CASE WHEN username IS NOT NULL THEN 1 ELSE 0 END) as auth_count, SUM(CASE WHEN command IS NOT NULL THEN 1 ELSE 0 END) as cmd_count, SUM(CASE WHEN malware_tag IS NOT NULL THEN 1 ELSE 0 END) as malware_count FROM events{where} GROUP BY session_id ORDER BY start_time DESC LIMIT ?""" params.append(limit) rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] def get_session_events(self, session_id: str) -> list: """Get all events for a session.""" conn = self._get_conn() rows = conn.execute( "SELECT * FROM events WHERE session_id = ? ORDER BY timestamp", (session_id,) ).fetchall() return [dict(r) for r in rows] @staticmethod def _escape_like(s): """Escape LIKE wildcards to prevent DoS via crafted search queries.""" return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") def search(self, q: str = None, event_type: str = None, severity: str = None, src_ip: str = None, service: str = None, from_ts: float = None, to_ts: float = None, limit: int = 100, offset: int = 0) -> list: """Multi-criteria search.""" conn = self._get_conn() conditions = [] params = [] if q: conditions.append("(detail LIKE ? ESCAPE '\\' " "OR username LIKE ? ESCAPE '\\' " "OR command LIKE ? ESCAPE '\\')") escaped = self._escape_like(q) like = f"%{escaped}%" params.extend([like, like, like]) if event_type: conditions.append("event_type = ?") params.append(event_type) if severity: conditions.append("severity = ?") params.append(severity) if src_ip: conditions.append("src_ip = ?") params.append(src_ip) if service: conditions.append("service = ?") params.append(service) if from_ts: conditions.append("timestamp >= ?") params.append(from_ts) if to_ts: conditions.append("timestamp <= ?") params.append(to_ts) where = " WHERE " + " AND ".join(conditions) if conditions else "" sql = f"SELECT * FROM events{where} ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.append(limit) params.append(offset) rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] def get_timeline(self, hours: int = 24, bucket_minutes: int = 5) -> list: """Get event counts per time bucket, grouped by severity.""" conn = self._get_conn() since = time.time() - (hours * 3600) bucket_s = bucket_minutes * 60 rows = conn.execute( """SELECT CAST(timestamp / ? AS INTEGER) * ? as bucket, severity, COUNT(*) as cnt FROM events WHERE timestamp >= ? GROUP BY bucket, severity ORDER BY bucket""", (bucket_s, bucket_s, since) ).fetchall() # Group by bucket buckets = {} for r in rows: b = r["bucket"] if b not in buckets: buckets[b] = {"time": b, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0, "total": 0} buckets[b][r["severity"]] = r["cnt"] buckets[b]["total"] += r["cnt"] return sorted(buckets.values(), key=lambda x: x["time"]) def get_stats(self) -> dict: """Get aggregated statistics (from memory for speed).""" with self.lock: return { "total_events": self.total_count, "by_type": dict(self.count_by_type), "by_severity": dict(self.count_by_severity), "by_device": dict(self.count_by_device), "last_id": self._last_id, } def get_attackers(self, limit: int = 50) -> list: """Get top attackers sorted by event count, enriched with geo/vendor.""" with self.lock: attackers = [] for ip, data in self.attackers.items(): attackers.append({ "ip": data["ip"], "mac": data["mac"], "types": list(data["types"]), "count": data["count"], "first_seen": data["first_seen"], "last_seen": data["last_seen"], }) attackers.sort(key=lambda x: x["count"], reverse=True) result = attackers[:limit] if self._geo: for atk in result: self._geo.enrich_attacker(atk) return result def get_events_after(self, last_id: int, limit: int = 50) -> list: """Get events after a given ID (for SSE polling).""" conn = self._get_conn() rows = conn.execute( "SELECT * FROM events WHERE id > ? ORDER BY id LIMIT ?", (last_id, limit) ).fetchall() return [dict(r) for r in rows] def get_last_id(self) -> int: """Get last event ID.""" with self.lock: return self._last_id @staticmethod def _csv_safe(val): """Escape a CSV cell to prevent formula injection in spreadsheet apps.""" s = str(val).replace(",", ";") if s and s[0] in ("=", "+", "-", "@", "\t", "\r"): s = "'" + s return s def export_events(self, fmt: str = "json", **filters) -> str: """Export events as JSON or CSV.""" events = self.search(**filters, limit=10000) if fmt == "csv": if not events: return "" cols = list(events[0].keys()) lines = [",".join(cols)] for e in events: lines.append(",".join(self._csv_safe(e.get(c, "")) for c in cols)) return "\n".join(lines) return json.dumps(events, default=str) def get_credential_intel(self): """Get credential intelligence summary.""" conn = self._get_conn() top_usernames = conn.execute( "SELECT username, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services " "FROM events WHERE username IS NOT NULL " "GROUP BY username ORDER BY cnt DESC LIMIT 20" ).fetchall() top_passwords = conn.execute( "SELECT password, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services " "FROM events WHERE password IS NOT NULL " "GROUP BY password ORDER BY cnt DESC LIMIT 20" ).fetchall() top_combos = conn.execute( "SELECT username, password, COUNT(*) as cnt, service " "FROM events WHERE username IS NOT NULL AND password IS NOT NULL " "GROUP BY username, password ORDER BY cnt DESC LIMIT 20" ).fetchall() by_service = conn.execute( "SELECT service, COUNT(DISTINCT username) as unique_users, " "COUNT(*) as total_attempts " "FROM events WHERE username IS NOT NULL AND service IS NOT NULL " "GROUP BY service ORDER BY total_attempts DESC" ).fetchall() return { "top_usernames": [dict(r) for r in top_usernames], "top_passwords": [dict(r) for r in top_passwords], "top_combos": [dict(r) for r in top_combos], "by_service": [dict(r) for r in by_service], } def get_mitre_coverage(self): """Get MITRE ATT&CK coverage summary.""" conn = self._get_conn() rows = conn.execute( "SELECT mitre_techniques FROM events WHERE mitre_techniques IS NOT NULL" ).fetchall() techniques = {} tactics = {} for r in rows: try: for t in json.loads(r["mitre_techniques"]): tid = t["id"] tactic = t["tactic"] techniques[tid] = techniques.get(tid, 0) + 1 tactics[tactic] = tactics.get(tactic, 0) + 1 except (json.JSONDecodeError, KeyError): pass return {"techniques": techniques, "tactics": tactics} # ============================================================ # Kill Chain Analysis # ============================================================ def get_kill_chain_analysis(self, ip): """Analyze kill chain progression for a specific IP.""" conn = self._get_conn() rows = conn.execute( "SELECT event_type, detail, timestamp, mitre_techniques " "FROM events WHERE src_ip = ? ORDER BY timestamp", (ip,) ).fetchall() if not rows: return {"ip": ip, "phases": {}, "score": 0, "max_phase": None, "progression_pct": 0, "duration_seconds": 0, "is_full_chain": False, "total_events": 0} phases = {} for row in rows: phase = _classify_kill_chain_phase(row["event_type"], row["detail"]) if not phase: continue if phase not in phases: phases[phase] = { "count": 0, "first_seen": row["timestamp"], "last_seen": row["timestamp"], "techniques": set() } p = phases[phase] p["count"] += 1 p["last_seen"] = row["timestamp"] if row["mitre_techniques"]: try: for t in json.loads(row["mitre_techniques"]): p["techniques"].add(t["id"]) except (json.JSONDecodeError, KeyError): pass for p in phases.values(): p["techniques"] = list(p["techniques"]) score = sum(PHASE_SCORES.get(pid, 0) for pid in phases) max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0)) if phases else None max_order = PHASE_ORDER.get(max_phase, 0) if max_phase else 0 return { "ip": ip, "phases": phases, "score": score, "max_phase": max_phase, "max_phase_order": max_order, "progression_pct": round(max_order / 7 * 100), "duration_seconds": round(rows[-1]["timestamp"] - rows[0]["timestamp"]), "is_full_chain": max_order >= 5, "total_events": len(rows), } def get_kill_chain_top(self, limit=20): """Get top attackers by kill chain score (single query, in-memory analysis).""" conn = self._get_conn() rows = conn.execute( "SELECT src_ip, event_type, detail, timestamp, mitre_techniques " "FROM events WHERE src_ip != '0.0.0.0' ORDER BY src_ip, timestamp" ).fetchall() ip_events = defaultdict(list) for r in rows: ip_events[r["src_ip"]].append(r) results = [] for ip, events in ip_events.items(): phases = {} for row in events: phase = _classify_kill_chain_phase(row["event_type"], row["detail"]) if not phase: continue if phase not in phases: phases[phase] = {"count": 0, "first_seen": row["timestamp"], "techniques": set()} phases[phase]["count"] += 1 if row["mitre_techniques"]: try: for t in json.loads(row["mitre_techniques"]): phases[phase]["techniques"].add(t["id"]) except (json.JSONDecodeError, KeyError): pass if not phases: continue for p in phases.values(): p["techniques"] = list(p["techniques"]) score = sum(PHASE_SCORES.get(pid, 0) for pid in phases) max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0)) max_order = PHASE_ORDER.get(max_phase, 0) results.append({ "ip": ip, "phases": phases, "score": score, "max_phase": max_phase, "max_phase_order": max_order, "progression_pct": round(max_order / 7 * 100), "duration_seconds": round(events[-1]["timestamp"] - events[0]["timestamp"]), "is_full_chain": max_order >= 5, "total_events": len(events), }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] # ============================================================ # Honeytoken Stats # ============================================================ def get_honeytoken_stats(self): """Get honeytoken event statistics.""" conn = self._get_conn() ht_events = conn.execute( "SELECT * FROM events WHERE detail LIKE '%HONEYTOKEN%' " "ORDER BY timestamp DESC" ).fetchall() by_service = defaultdict(int) by_type = defaultdict(int) by_ip = defaultdict(lambda: {"count": 0, "services": set()}) for evt in ht_events: e = dict(evt) svc = e.get("service") or "unknown" by_service[svc] += 1 detail = e.get("detail", "") if "credential" in detail.lower(): by_type["credential"] += 1 elif "exfil" in detail.lower(): by_type["exfil_attempt"] += 1 else: by_type["file_access"] += 1 ip = e.get("src_ip", "") if ip: by_ip[ip]["count"] += 1 by_ip[ip]["services"].add(svc) top_attackers = sorted(by_ip.items(), key=lambda x: x[1]["count"], reverse=True)[:10] return { "total": len(ht_events), "by_service": dict(by_service), "by_type": dict(by_type), "top_attackers": [ {"ip": ip, "count": d["count"], "services": list(d["services"])} for ip, d in top_attackers ], "recent": [dict(e) for e in ht_events[:10]], } # ============================================================ # Data Retention / Cleanup # ============================================================ def cleanup_old_events(self, days=30): """Delete events older than `days` days. Returns count of deleted rows.""" cutoff = time.time() - (days * 86400) conn = self._get_conn() cur = conn.execute("DELETE FROM events WHERE timestamp < ?", (cutoff,)) deleted = cur.rowcount conn.commit() if deleted > 0: conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") # Rebuild in-memory counters from DB self._reload_counters() print(f"[HP_STORE] Cleanup: deleted {deleted} events older than {days} days") return deleted def vacuum_db(self): """Reclaim disk space after cleanup.""" conn = self._get_conn() conn.execute("VACUUM") print("[HP_STORE] Database vacuumed") def _reload_counters(self): """Reload in-memory counters from the database.""" conn = self._get_conn() with self.lock: row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone() self.total_count = row["cnt"] or 0 self._last_id = row["max_id"] or 0 self.count_by_type.clear() self.count_by_severity.clear() self.count_by_device.clear() for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"): self.count_by_type[row["event_type"]] = row["cnt"] for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"): self.count_by_severity[row["severity"]] = row["cnt"] for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"): self.count_by_device[row["device_id"]] = row["cnt"] def start_cleanup_timer(self, days=30, interval_hours=24): """Start a background thread that cleans old events periodically.""" def _cleanup_loop(): while True: time.sleep(interval_hours * 3600) try: self.cleanup_old_events(days=days) except Exception as e: print(f"[HP_STORE] Cleanup error: {e}") t = threading.Thread(target=_cleanup_loop, daemon=True) t.start() print(f"[HP_STORE] Cleanup timer started: every {interval_hours}h, retention={days} days") def register_sse_queue(self, q): """Register a deque for SSE event push.""" with self.lock: self._sse_queues.append(q) def unregister_sse_queue(self, q): """Unregister an SSE queue.""" with self.lock: try: self._sse_queues.remove(q) except ValueError: pass