espilon-source/tools/C3PO/hp_dashboard/hp_store.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

945 lines
38 KiB
Python

"""
Honeypot Event Store — SQLite-backed storage with structured field parsing.
Stores all honeypot events in a SQLite database for complex queries,
while maintaining in-memory counters for fast dashboard polling.
"""
import re
import time
import hashlib
import sqlite3
import threading
import json
from collections import defaultdict, OrderedDict
from typing import Optional
# ============================================================
# Field extraction regexes — parse structured data from detail
# ============================================================
RE_USER = re.compile(r"user='([^']*)'")
RE_PASS = re.compile(r"pass='([^']*)'")
RE_CMD = re.compile(r"cmd='([^']*)'")
RE_URL = re.compile(r"url='([^']*)'")
RE_FILE = re.compile(r"file='([^']*)'")
RE_CLIENT = re.compile(r"client='([^']*)'")
RE_COMMUNITY = re.compile(r"community='([^']*)'")
RE_OS_TAG = re.compile(r"\[(\w+)\]\s*$")
RE_MALWARE_TAG = re.compile(r"\[(Mirai|Botnet-generic|RCE-pipe|Obfuscation|Reverse-shell|"
r"RCE-python|RCE-perl|Destructive|Mirai-variant|HONEYTOKEN)\]")
PORT_TO_SERVICE = {
22: "ssh", 23: "telnet", 80: "http", 1883: "mqtt", 21: "ftp",
53: "dns", 161: "snmp", 69: "tftp", 5683: "coap", 6379: "redis",
554: "rtsp", 3306: "mysql", 502: "modbus", 1900: "upnp",
5060: "sip", 2323: "telnet", 23231: "telnet",
}
VALID_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
# MITRE ATT&CK technique mapping
MITRE_MAP = {
"WIFI_DEAUTH": [("T1498.001", "Direct Network Flood", "Impact")],
"WIFI_PROBE": [("T1595.001", "Scanning IP Blocks", "Reconnaissance")],
"WIFI_EVIL_TWIN": [("T1557.002", "ARP Cache Poisoning", "Credential Access")],
"WIFI_BEACON_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
"WIFI_EAPOL": [("T1557", "Adversary-in-the-Middle", "Credential Access")],
"ARP_SPOOF": [("T1557.002", "ARP Cache Poisoning", "Credential Access")],
"ARP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
"PORT_SCAN": [("T1046", "Network Service Discovery", "Discovery")],
"ICMP_SWEEP": [("T1018", "Remote System Discovery", "Discovery")],
"SYN_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
"UDP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
"SVC_CONNECT": [("T1021", "Remote Services", "Lateral Movement")],
"SVC_AUTH_ATTEMPT": [("T1110", "Brute Force", "Credential Access")],
"SVC_COMMAND": [("T1059", "Command Interpreter", "Execution")],
"SVC_HTTP_REQUEST": [("T1190", "Exploit Public-Facing App", "Initial Access")],
"SVC_MQTT_MSG": [("T1071.001", "Web Protocols", "Command and Control")],
}
MITRE_DETAIL_MAP = {
"HONEYTOKEN": [("T1083", "File and Directory Discovery", "Discovery"),
("T1005", "Data from Local System", "Collection")],
"DOWNLOAD": [("T1105", "Ingress Tool Transfer", "Command and Control")],
"DNS_TUNNEL": [("T1071.004", "DNS", "Command and Control"),
("T1048.003", "Exfiltration Over Non-C2", "Exfiltration")],
"Mirai": [("T1583.005", "Botnet", "Resource Development")],
"Reverse-shell": [("T1059.004", "Unix Shell", "Execution")],
"RCE-python": [("T1059.006", "Python", "Execution")],
"CONFIG SET": [("T1053", "Scheduled Task/Job", "Persistence")],
"SLAVEOF": [("T1020", "Automated Exfiltration", "Exfiltration")],
"Write": [("T1565.001", "Stored Data Manipulation", "Impact")],
}
VALID_EVENT_TYPES = set(MITRE_MAP.keys()) | {
"HONEYTOKEN", "SVC_FTP_CMD", "SVC_SNMP_QUERY", "SVC_COAP_REQUEST",
"SVC_MODBUS_QUERY", "SVC_UPNP_REQUEST", "SVC_SIP_REGISTER",
"SVC_REDIS_CMD", "SVC_RTSP_REQUEST", "SVC_DNS_QUERY",
}
# ============================================================
# Kill Chain phase definitions and classifier
# ============================================================
KILL_CHAIN_PHASES = [
{"id": "recon", "order": 1, "score": 10, "label": "Reconnaissance"},
{"id": "weaponize", "order": 2, "score": 15, "label": "Weaponization"},
{"id": "delivery", "order": 3, "score": 20, "label": "Delivery"},
{"id": "exploitation", "order": 4, "score": 30, "label": "Exploitation"},
{"id": "installation", "order": 5, "score": 40, "label": "Installation"},
{"id": "c2", "order": 6, "score": 50, "label": "Command & Control"},
{"id": "actions", "order": 7, "score": 60, "label": "Actions on Objectives"},
]
PHASE_SCORES = {p["id"]: p["score"] for p in KILL_CHAIN_PHASES}
PHASE_ORDER = {p["id"]: p["order"] for p in KILL_CHAIN_PHASES}
def _classify_kill_chain_phase(event_type, detail):
"""Classify an event into the highest matching kill chain phase."""
d = (detail or "").upper()
# Phase 7: Actions on Objectives (exfiltration, sabotage)
if "HONEYTOKEN EXFIL" in d or "FTP STOR" in d or "CONFIG SET" in d:
return "actions"
# Phase 6: C2 (command & control channels)
if event_type == "SVC_MQTT_MSG" or "SLAVEOF" in d or "EVAL" in d:
return "c2"
# Phase 5: Installation (malware drop, honeytoken access)
if "HONEYTOKEN" in d or "DOWNLOAD" in d:
return "installation"
# Phase 4: Exploitation (command execution in shell)
if event_type == "SVC_COMMAND":
return "exploitation"
# Phase 3: Delivery (service connection, auth attempts)
if event_type in ("SVC_CONNECT", "SVC_AUTH_ATTEMPT", "SVC_HTTP_REQUEST"):
return "delivery"
# Phase 2: Weaponization (network manipulation)
if event_type in ("ARP_SPOOF", "ARP_FLOOD", "WIFI_EVIL_TWIN", "WIFI_EAPOL"):
return "weaponize"
if "DNS_TUNNEL" in d:
return "weaponize"
# Phase 1: Reconnaissance (scanning, probing)
if event_type in ("WIFI_PROBE", "PORT_SCAN", "ICMP_SWEEP", "SYN_FLOOD", "UDP_FLOOD"):
return "recon"
return None
def _extract_mitre(event_type, detail):
"""Resolve MITRE ATT&CK techniques for an event."""
techniques = list(MITRE_MAP.get(event_type, []))
for keyword, techs in MITRE_DETAIL_MAP.items():
if keyword in detail:
techniques.extend(techs)
seen = set()
result = []
for t in techniques:
if t[0] not in seen:
seen.add(t[0])
result.append({"id": t[0], "name": t[1], "tactic": t[2]})
return result
def _session_id(src_ip: str, dst_port: int, timestamp: float) -> str:
"""Compute session ID: md5(ip + port + 5min_bucket)[:12]."""
bucket = int(timestamp / 300)
key = f"{src_ip}:{dst_port}:{bucket}"
return hashlib.md5(key.encode()).hexdigest()[:12]
def _extract_fields(detail: str) -> dict:
"""Extract structured fields from event detail string."""
fields = {}
m = RE_USER.search(detail)
if m:
fields["username"] = m.group(1)
m = RE_PASS.search(detail)
if m:
fields["password"] = m.group(1)
m = RE_CMD.search(detail)
if m:
fields["command"] = m.group(1)
m = RE_URL.search(detail)
if m:
fields["url"] = m.group(1)
m = RE_FILE.search(detail)
if m:
fields["path"] = m.group(1)
m = RE_CLIENT.search(detail)
if m:
fields["client_id"] = m.group(1)
m = RE_COMMUNITY.search(detail)
if m:
fields["username"] = m.group(1) # community as username
m = RE_MALWARE_TAG.search(detail)
if m:
fields["malware_tag"] = m.group(1)
m = RE_OS_TAG.search(detail)
if m and m.group(1) in ("Linux", "Windows", "macOS", "Cisco"):
fields["os_tag"] = m.group(1)
return fields
class HpStore:
"""
Thread-safe SQLite-backed event store.
Maintains in-memory counters for fast dashboard polling.
"""
def __init__(self, db_path: str = "honeypot_events.db", geo_lookup=None):
self.db_path = db_path
self.lock = threading.Lock()
self._local = threading.local()
self._geo = geo_lookup # Optional HpGeoLookup instance
# In-memory counters for fast polling
self.total_count = 0
self.count_by_type: dict = defaultdict(int)
self.count_by_severity: dict = defaultdict(int)
self.count_by_device: dict = defaultdict(int)
self.attackers: OrderedDict = OrderedDict()
self._last_id = 0
# SSE subscribers
self._sse_queues: list = []
# Alert engine (set via set_alert_engine after bootstrap)
self._alert_engine = None
self._init_db()
def set_alert_engine(self, engine):
"""Wire the alert engine so events are evaluated for alerts."""
self._alert_engine = engine
def _get_conn(self) -> sqlite3.Connection:
"""Thread-local SQLite connection."""
if not hasattr(self._local, "conn") or self._local.conn is None:
self._local.conn = sqlite3.connect(self.db_path)
self._local.conn.row_factory = sqlite3.Row
self._local.conn.execute("PRAGMA journal_mode=WAL")
self._local.conn.execute("PRAGMA synchronous=NORMAL")
return self._local.conn
def _init_db(self):
"""Create tables and indexes, with migrations for existing DBs."""
conn = self._get_conn()
# Create table (without mitre_techniques for compat with existing DBs)
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
device_id TEXT NOT NULL,
event_type TEXT NOT NULL,
severity TEXT NOT NULL,
src_mac TEXT,
src_ip TEXT,
src_port INTEGER,
dst_port INTEGER,
detail TEXT,
received_at REAL,
service TEXT,
username TEXT,
password TEXT,
command TEXT,
url TEXT,
path TEXT,
client_id TEXT,
session_id TEXT,
os_tag TEXT,
malware_tag TEXT
)
""")
# Migration: add mitre_techniques column if missing
cols = {r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()}
if "mitre_techniques" not in cols:
conn.execute("ALTER TABLE events ADD COLUMN mitre_techniques TEXT")
# Create indexes (safe now that all columns exist)
conn.executescript("""
CREATE INDEX IF NOT EXISTS idx_ts ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_sev ON events(severity);
CREATE INDEX IF NOT EXISTS idx_ip ON events(src_ip);
CREATE INDEX IF NOT EXISTS idx_session ON events(session_id);
CREATE INDEX IF NOT EXISTS idx_service ON events(service);
CREATE INDEX IF NOT EXISTS idx_mitre ON events(mitre_techniques);
""")
conn.commit()
# Load counters from existing data
row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone()
self.total_count = row["cnt"] or 0
self._last_id = row["max_id"] or 0
for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"):
self.count_by_type[row["event_type"]] = row["cnt"]
for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"):
self.count_by_severity[row["severity"]] = row["cnt"]
for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"):
self.count_by_device[row["device_id"]] = row["cnt"]
def parse_and_store(self, device_id: str, payload: str) -> Optional[dict]:
"""
Parse EVT wire format and store in SQLite.
Format: EVT|<type>|<severity>|<mac>|<ip>:<port>><dport>|<detail>
Returns event dict or None.
"""
try:
parts = payload.split("|", 5)
if len(parts) < 6 or parts[0] not in ("EVT", "HP"):
return None
_, event_type, severity, src_mac, addr_part, detail = parts
# Validate severity to prevent pipe injection
if severity not in VALID_SEVERITIES:
severity = "LOW"
src_ip = "0.0.0.0"
src_port = 0
dst_port = 0
if ">" in addr_part:
left, dst_port_s = addr_part.split(">", 1)
dst_port = int(dst_port_s) if dst_port_s.isdigit() else 0
if ":" in left:
src_ip, src_port_s = left.rsplit(":", 1)
src_port = int(src_port_s) if src_port_s.isdigit() else 0
now = time.time()
fields = _extract_fields(detail)
service = PORT_TO_SERVICE.get(dst_port, "")
session_id = _session_id(src_ip, dst_port, now)
mitre = _extract_mitre(event_type, detail)
mitre_json = json.dumps(mitre) if mitre else None
conn = self._get_conn()
cur = conn.execute(
"""INSERT INTO events (timestamp, device_id, event_type, severity,
src_mac, src_ip, src_port, dst_port, detail, received_at,
service, username, password, command, url, path,
client_id, session_id, os_tag, malware_tag, mitre_techniques)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(now, device_id, event_type, severity,
src_mac, src_ip, src_port, dst_port, detail, now,
service, fields.get("username"), fields.get("password"),
fields.get("command"), fields.get("url"), fields.get("path"),
fields.get("client_id"), session_id,
fields.get("os_tag"), fields.get("malware_tag"), mitre_json)
)
conn.commit()
event_id = cur.lastrowid
with self.lock:
self.total_count += 1
self._last_id = event_id
self.count_by_type[event_type] += 1
self.count_by_severity[severity] += 1
self.count_by_device[device_id] += 1
if src_ip != "0.0.0.0":
if src_ip not in self.attackers:
# Evict oldest (LRU) if at capacity — O(1)
if len(self.attackers) >= 10000:
self.attackers.popitem(last=False)
self.attackers[src_ip] = {
"ip": src_ip, "mac": src_mac, "types": set(),
"count": 0, "first_seen": now, "last_seen": now,
}
# Move to end on each access so LRU is always at front
self.attackers.move_to_end(src_ip)
atk = self.attackers[src_ip]
atk["types"].add(event_type)
atk["count"] += 1
atk["last_seen"] = now
if src_mac != "00:00:00:00:00:00":
atk["mac"] = src_mac
evt_dict = {
"id": event_id, "timestamp": now, "device_id": device_id,
"event_type": event_type, "severity": severity,
"src_mac": src_mac, "src_ip": src_ip, "src_port": src_port,
"dst_port": dst_port, "detail": detail, "received_at": now,
"service": service, "session_id": session_id,
**fields,
}
# Trigger background geo resolution (non-blocking)
if self._geo and src_ip != "0.0.0.0":
self._geo.lookup_ip(src_ip)
# Notify SSE subscribers (snapshot under lock)
with self.lock:
sse_snapshot = list(self._sse_queues)
for q in sse_snapshot:
try:
q.append(evt_dict)
except Exception:
pass
# Evaluate alert rules
if self._alert_engine:
try:
self._alert_engine.evaluate(evt_dict)
except Exception as e:
print(f"[HP_STORE] Alert evaluation error: {e}")
return evt_dict
except Exception as e:
print(f"[HP_STORE] Failed to parse event: {e} — payload={payload!r}")
return None
# ============================================================
# Query APIs
# ============================================================
def get_recent_events(self, limit: int = 100, event_type: str = None,
severity: str = None, src_ip: str = None,
service: str = None, offset: int = 0) -> list:
"""Get recent events with optional filters."""
conn = self._get_conn()
conditions = []
params = []
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if severity:
conditions.append("severity = ?")
params.append(severity)
if src_ip:
conditions.append("src_ip = ?")
params.append(src_ip)
if service:
conditions.append("service = ?")
params.append(service)
where = " WHERE " + " AND ".join(conditions) if conditions else ""
sql = f"SELECT * FROM events{where} ORDER BY id DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def get_event_by_id(self, event_id: int) -> Optional[dict]:
"""Get single event with related events (same IP +-5min)."""
conn = self._get_conn()
row = conn.execute("SELECT * FROM events WHERE id = ?", (event_id,)).fetchone()
if not row:
return None
evt = dict(row)
# Related events: same IP, +-5 minutes
related = conn.execute(
"""SELECT * FROM events WHERE src_ip = ? AND id != ?
AND timestamp BETWEEN ? AND ? ORDER BY timestamp LIMIT 20""",
(evt["src_ip"], event_id,
evt["timestamp"] - 300, evt["timestamp"] + 300)
).fetchall()
evt["related"] = [dict(r) for r in related]
return evt
def get_attacker_profile(self, ip: str) -> dict:
"""Full attacker profile: events, credentials, commands, sessions."""
conn = self._get_conn()
events = conn.execute(
"SELECT * FROM events WHERE src_ip = ? ORDER BY timestamp DESC LIMIT 200",
(ip,)
).fetchall()
creds = conn.execute(
"""SELECT username, password, service, timestamp FROM events
WHERE src_ip = ? AND username IS NOT NULL ORDER BY timestamp""",
(ip,)
).fetchall()
commands = conn.execute(
"""SELECT command, detail, service, timestamp FROM events
WHERE src_ip = ? AND command IS NOT NULL ORDER BY timestamp""",
(ip,)
).fetchall()
sessions = conn.execute(
"""SELECT session_id, service, MIN(timestamp) as start,
MAX(timestamp) as end, COUNT(*) as event_count,
MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3
WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_sev
FROM events WHERE src_ip = ? GROUP BY session_id
ORDER BY start DESC""",
(ip,)
).fetchall()
profile = {
"ip": ip,
"total_events": len(events),
"events": [dict(r) for r in events[:50]],
"credentials": [dict(r) for r in creds],
"commands": [dict(r) for r in commands],
"sessions": [dict(r) for r in sessions],
}
if self._geo:
geo = self._geo.lookup_ip(ip)
profile["country"] = geo.get("country", "")
profile["country_code"] = geo.get("country_code", "")
profile["city"] = geo.get("city", "")
profile["isp"] = geo.get("isp", "")
profile["is_private"] = geo.get("is_private", False)
# Vendor from first known MAC for this IP
with self.lock:
atk = self.attackers.get(ip, {})
profile["vendor"] = self._geo.lookup_mac_vendor(atk.get("mac", ""))
return profile
def get_sessions(self, limit: int = 50, src_ip: str = None) -> list:
"""Get grouped sessions."""
conn = self._get_conn()
conditions = []
params = []
if src_ip:
conditions.append("src_ip = ?")
params.append(src_ip)
where = " WHERE " + " AND ".join(conditions) if conditions else ""
sql = f"""SELECT session_id, src_ip, service, dst_port,
MIN(timestamp) as start_time, MAX(timestamp) as end_time,
COUNT(*) as event_count,
MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3
WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_severity,
SUM(CASE WHEN username IS NOT NULL THEN 1 ELSE 0 END) as auth_count,
SUM(CASE WHEN command IS NOT NULL THEN 1 ELSE 0 END) as cmd_count,
SUM(CASE WHEN malware_tag IS NOT NULL THEN 1 ELSE 0 END) as malware_count
FROM events{where}
GROUP BY session_id
ORDER BY start_time DESC LIMIT ?"""
params.append(limit)
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def get_session_events(self, session_id: str) -> list:
"""Get all events for a session."""
conn = self._get_conn()
rows = conn.execute(
"SELECT * FROM events WHERE session_id = ? ORDER BY timestamp",
(session_id,)
).fetchall()
return [dict(r) for r in rows]
@staticmethod
def _escape_like(s):
"""Escape LIKE wildcards to prevent DoS via crafted search queries."""
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def search(self, q: str = None, event_type: str = None,
severity: str = None, src_ip: str = None,
service: str = None, from_ts: float = None,
to_ts: float = None, limit: int = 100,
offset: int = 0) -> list:
"""Multi-criteria search."""
conn = self._get_conn()
conditions = []
params = []
if q:
conditions.append("(detail LIKE ? ESCAPE '\\' "
"OR username LIKE ? ESCAPE '\\' "
"OR command LIKE ? ESCAPE '\\')")
escaped = self._escape_like(q)
like = f"%{escaped}%"
params.extend([like, like, like])
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if severity:
conditions.append("severity = ?")
params.append(severity)
if src_ip:
conditions.append("src_ip = ?")
params.append(src_ip)
if service:
conditions.append("service = ?")
params.append(service)
if from_ts:
conditions.append("timestamp >= ?")
params.append(from_ts)
if to_ts:
conditions.append("timestamp <= ?")
params.append(to_ts)
where = " WHERE " + " AND ".join(conditions) if conditions else ""
sql = f"SELECT * FROM events{where} ORDER BY timestamp DESC LIMIT ? OFFSET ?"
params.append(limit)
params.append(offset)
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def get_timeline(self, hours: int = 24, bucket_minutes: int = 5) -> list:
"""Get event counts per time bucket, grouped by severity."""
conn = self._get_conn()
since = time.time() - (hours * 3600)
bucket_s = bucket_minutes * 60
rows = conn.execute(
"""SELECT CAST(timestamp / ? AS INTEGER) * ? as bucket,
severity, COUNT(*) as cnt
FROM events WHERE timestamp >= ?
GROUP BY bucket, severity ORDER BY bucket""",
(bucket_s, bucket_s, since)
).fetchall()
# Group by bucket
buckets = {}
for r in rows:
b = r["bucket"]
if b not in buckets:
buckets[b] = {"time": b, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0, "total": 0}
buckets[b][r["severity"]] = r["cnt"]
buckets[b]["total"] += r["cnt"]
return sorted(buckets.values(), key=lambda x: x["time"])
def get_stats(self) -> dict:
"""Get aggregated statistics (from memory for speed)."""
with self.lock:
return {
"total_events": self.total_count,
"by_type": dict(self.count_by_type),
"by_severity": dict(self.count_by_severity),
"by_device": dict(self.count_by_device),
"last_id": self._last_id,
}
def get_attackers(self, limit: int = 50) -> list:
"""Get top attackers sorted by event count, enriched with geo/vendor."""
with self.lock:
attackers = []
for ip, data in self.attackers.items():
attackers.append({
"ip": data["ip"], "mac": data["mac"],
"types": list(data["types"]),
"count": data["count"],
"first_seen": data["first_seen"],
"last_seen": data["last_seen"],
})
attackers.sort(key=lambda x: x["count"], reverse=True)
result = attackers[:limit]
if self._geo:
for atk in result:
self._geo.enrich_attacker(atk)
return result
def get_events_after(self, last_id: int, limit: int = 50) -> list:
"""Get events after a given ID (for SSE polling)."""
conn = self._get_conn()
rows = conn.execute(
"SELECT * FROM events WHERE id > ? ORDER BY id LIMIT ?",
(last_id, limit)
).fetchall()
return [dict(r) for r in rows]
def get_last_id(self) -> int:
"""Get last event ID."""
with self.lock:
return self._last_id
@staticmethod
def _csv_safe(val):
"""Escape a CSV cell to prevent formula injection in spreadsheet apps."""
s = str(val).replace(",", ";")
if s and s[0] in ("=", "+", "-", "@", "\t", "\r"):
s = "'" + s
return s
def export_events(self, fmt: str = "json", **filters) -> str:
"""Export events as JSON or CSV."""
events = self.search(**filters, limit=10000)
if fmt == "csv":
if not events:
return ""
cols = list(events[0].keys())
lines = [",".join(cols)]
for e in events:
lines.append(",".join(self._csv_safe(e.get(c, "")) for c in cols))
return "\n".join(lines)
return json.dumps(events, default=str)
def get_credential_intel(self):
"""Get credential intelligence summary."""
conn = self._get_conn()
top_usernames = conn.execute(
"SELECT username, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services "
"FROM events WHERE username IS NOT NULL "
"GROUP BY username ORDER BY cnt DESC LIMIT 20"
).fetchall()
top_passwords = conn.execute(
"SELECT password, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services "
"FROM events WHERE password IS NOT NULL "
"GROUP BY password ORDER BY cnt DESC LIMIT 20"
).fetchall()
top_combos = conn.execute(
"SELECT username, password, COUNT(*) as cnt, service "
"FROM events WHERE username IS NOT NULL AND password IS NOT NULL "
"GROUP BY username, password ORDER BY cnt DESC LIMIT 20"
).fetchall()
by_service = conn.execute(
"SELECT service, COUNT(DISTINCT username) as unique_users, "
"COUNT(*) as total_attempts "
"FROM events WHERE username IS NOT NULL AND service IS NOT NULL "
"GROUP BY service ORDER BY total_attempts DESC"
).fetchall()
return {
"top_usernames": [dict(r) for r in top_usernames],
"top_passwords": [dict(r) for r in top_passwords],
"top_combos": [dict(r) for r in top_combos],
"by_service": [dict(r) for r in by_service],
}
def get_mitre_coverage(self):
"""Get MITRE ATT&CK coverage summary."""
conn = self._get_conn()
rows = conn.execute(
"SELECT mitre_techniques FROM events WHERE mitre_techniques IS NOT NULL"
).fetchall()
techniques = {}
tactics = {}
for r in rows:
try:
for t in json.loads(r["mitre_techniques"]):
tid = t["id"]
tactic = t["tactic"]
techniques[tid] = techniques.get(tid, 0) + 1
tactics[tactic] = tactics.get(tactic, 0) + 1
except (json.JSONDecodeError, KeyError):
pass
return {"techniques": techniques, "tactics": tactics}
# ============================================================
# Kill Chain Analysis
# ============================================================
def get_kill_chain_analysis(self, ip):
"""Analyze kill chain progression for a specific IP."""
conn = self._get_conn()
rows = conn.execute(
"SELECT event_type, detail, timestamp, mitre_techniques "
"FROM events WHERE src_ip = ? ORDER BY timestamp",
(ip,)
).fetchall()
if not rows:
return {"ip": ip, "phases": {}, "score": 0, "max_phase": None,
"progression_pct": 0, "duration_seconds": 0,
"is_full_chain": False, "total_events": 0}
phases = {}
for row in rows:
phase = _classify_kill_chain_phase(row["event_type"], row["detail"])
if not phase:
continue
if phase not in phases:
phases[phase] = {
"count": 0, "first_seen": row["timestamp"],
"last_seen": row["timestamp"], "techniques": set()
}
p = phases[phase]
p["count"] += 1
p["last_seen"] = row["timestamp"]
if row["mitre_techniques"]:
try:
for t in json.loads(row["mitre_techniques"]):
p["techniques"].add(t["id"])
except (json.JSONDecodeError, KeyError):
pass
for p in phases.values():
p["techniques"] = list(p["techniques"])
score = sum(PHASE_SCORES.get(pid, 0) for pid in phases)
max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0)) if phases else None
max_order = PHASE_ORDER.get(max_phase, 0) if max_phase else 0
return {
"ip": ip,
"phases": phases,
"score": score,
"max_phase": max_phase,
"max_phase_order": max_order,
"progression_pct": round(max_order / 7 * 100),
"duration_seconds": round(rows[-1]["timestamp"] - rows[0]["timestamp"]),
"is_full_chain": max_order >= 5,
"total_events": len(rows),
}
def get_kill_chain_top(self, limit=20):
"""Get top attackers by kill chain score (single query, in-memory analysis)."""
conn = self._get_conn()
rows = conn.execute(
"SELECT src_ip, event_type, detail, timestamp, mitre_techniques "
"FROM events WHERE src_ip != '0.0.0.0' ORDER BY src_ip, timestamp"
).fetchall()
ip_events = defaultdict(list)
for r in rows:
ip_events[r["src_ip"]].append(r)
results = []
for ip, events in ip_events.items():
phases = {}
for row in events:
phase = _classify_kill_chain_phase(row["event_type"], row["detail"])
if not phase:
continue
if phase not in phases:
phases[phase] = {"count": 0, "first_seen": row["timestamp"],
"techniques": set()}
phases[phase]["count"] += 1
if row["mitre_techniques"]:
try:
for t in json.loads(row["mitre_techniques"]):
phases[phase]["techniques"].add(t["id"])
except (json.JSONDecodeError, KeyError):
pass
if not phases:
continue
for p in phases.values():
p["techniques"] = list(p["techniques"])
score = sum(PHASE_SCORES.get(pid, 0) for pid in phases)
max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0))
max_order = PHASE_ORDER.get(max_phase, 0)
results.append({
"ip": ip,
"phases": phases,
"score": score,
"max_phase": max_phase,
"max_phase_order": max_order,
"progression_pct": round(max_order / 7 * 100),
"duration_seconds": round(events[-1]["timestamp"] - events[0]["timestamp"]),
"is_full_chain": max_order >= 5,
"total_events": len(events),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
# ============================================================
# Honeytoken Stats
# ============================================================
def get_honeytoken_stats(self):
"""Get honeytoken event statistics."""
conn = self._get_conn()
ht_events = conn.execute(
"SELECT * FROM events WHERE detail LIKE '%HONEYTOKEN%' "
"ORDER BY timestamp DESC"
).fetchall()
by_service = defaultdict(int)
by_type = defaultdict(int)
by_ip = defaultdict(lambda: {"count": 0, "services": set()})
for evt in ht_events:
e = dict(evt)
svc = e.get("service") or "unknown"
by_service[svc] += 1
detail = e.get("detail", "")
if "credential" in detail.lower():
by_type["credential"] += 1
elif "exfil" in detail.lower():
by_type["exfil_attempt"] += 1
else:
by_type["file_access"] += 1
ip = e.get("src_ip", "")
if ip:
by_ip[ip]["count"] += 1
by_ip[ip]["services"].add(svc)
top_attackers = sorted(by_ip.items(), key=lambda x: x[1]["count"], reverse=True)[:10]
return {
"total": len(ht_events),
"by_service": dict(by_service),
"by_type": dict(by_type),
"top_attackers": [
{"ip": ip, "count": d["count"], "services": list(d["services"])}
for ip, d in top_attackers
],
"recent": [dict(e) for e in ht_events[:10]],
}
# ============================================================
# Data Retention / Cleanup
# ============================================================
def cleanup_old_events(self, days=30):
"""Delete events older than `days` days. Returns count of deleted rows."""
cutoff = time.time() - (days * 86400)
conn = self._get_conn()
cur = conn.execute("DELETE FROM events WHERE timestamp < ?", (cutoff,))
deleted = cur.rowcount
conn.commit()
if deleted > 0:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
# Rebuild in-memory counters from DB
self._reload_counters()
print(f"[HP_STORE] Cleanup: deleted {deleted} events older than {days} days")
return deleted
def vacuum_db(self):
"""Reclaim disk space after cleanup."""
conn = self._get_conn()
conn.execute("VACUUM")
print("[HP_STORE] Database vacuumed")
def _reload_counters(self):
"""Reload in-memory counters from the database."""
conn = self._get_conn()
with self.lock:
row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone()
self.total_count = row["cnt"] or 0
self._last_id = row["max_id"] or 0
self.count_by_type.clear()
self.count_by_severity.clear()
self.count_by_device.clear()
for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"):
self.count_by_type[row["event_type"]] = row["cnt"]
for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"):
self.count_by_severity[row["severity"]] = row["cnt"]
for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"):
self.count_by_device[row["device_id"]] = row["cnt"]
def start_cleanup_timer(self, days=30, interval_hours=24):
"""Start a background thread that cleans old events periodically."""
def _cleanup_loop():
while True:
time.sleep(interval_hours * 3600)
try:
self.cleanup_old_events(days=days)
except Exception as e:
print(f"[HP_STORE] Cleanup error: {e}")
t = threading.Thread(target=_cleanup_loop, daemon=True)
t.start()
print(f"[HP_STORE] Cleanup timer started: every {interval_hours}h, retention={days} days")
def register_sse_queue(self, q):
"""Register a deque for SSE event push."""
with self.lock:
self._sse_queues.append(q)
def unregister_sse_queue(self, q):
"""Unregister an SSE queue."""
with self.lock:
try:
self._sse_queues.remove(q)
except ValueError:
pass