Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
945 lines
38 KiB
Python
945 lines
38 KiB
Python
"""
|
|
Honeypot Event Store — SQLite-backed storage with structured field parsing.
|
|
|
|
Stores all honeypot events in a SQLite database for complex queries,
|
|
while maintaining in-memory counters for fast dashboard polling.
|
|
"""
|
|
|
|
import re
|
|
import time
|
|
import hashlib
|
|
import sqlite3
|
|
import threading
|
|
import json
|
|
from collections import defaultdict, OrderedDict
|
|
from typing import Optional
|
|
|
|
|
|
# ============================================================
|
|
# Field extraction regexes — parse structured data from detail
|
|
# ============================================================
|
|
RE_USER = re.compile(r"user='([^']*)'")
|
|
RE_PASS = re.compile(r"pass='([^']*)'")
|
|
RE_CMD = re.compile(r"cmd='([^']*)'")
|
|
RE_URL = re.compile(r"url='([^']*)'")
|
|
RE_FILE = re.compile(r"file='([^']*)'")
|
|
RE_CLIENT = re.compile(r"client='([^']*)'")
|
|
RE_COMMUNITY = re.compile(r"community='([^']*)'")
|
|
RE_OS_TAG = re.compile(r"\[(\w+)\]\s*$")
|
|
RE_MALWARE_TAG = re.compile(r"\[(Mirai|Botnet-generic|RCE-pipe|Obfuscation|Reverse-shell|"
|
|
r"RCE-python|RCE-perl|Destructive|Mirai-variant|HONEYTOKEN)\]")
|
|
|
|
PORT_TO_SERVICE = {
|
|
22: "ssh", 23: "telnet", 80: "http", 1883: "mqtt", 21: "ftp",
|
|
53: "dns", 161: "snmp", 69: "tftp", 5683: "coap", 6379: "redis",
|
|
554: "rtsp", 3306: "mysql", 502: "modbus", 1900: "upnp",
|
|
5060: "sip", 2323: "telnet", 23231: "telnet",
|
|
}
|
|
|
|
VALID_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
|
|
|
|
# MITRE ATT&CK technique mapping
|
|
MITRE_MAP = {
|
|
"WIFI_DEAUTH": [("T1498.001", "Direct Network Flood", "Impact")],
|
|
"WIFI_PROBE": [("T1595.001", "Scanning IP Blocks", "Reconnaissance")],
|
|
"WIFI_EVIL_TWIN": [("T1557.002", "ARP Cache Poisoning", "Credential Access")],
|
|
"WIFI_BEACON_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
|
|
"WIFI_EAPOL": [("T1557", "Adversary-in-the-Middle", "Credential Access")],
|
|
"ARP_SPOOF": [("T1557.002", "ARP Cache Poisoning", "Credential Access")],
|
|
"ARP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
|
|
"PORT_SCAN": [("T1046", "Network Service Discovery", "Discovery")],
|
|
"ICMP_SWEEP": [("T1018", "Remote System Discovery", "Discovery")],
|
|
"SYN_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
|
|
"UDP_FLOOD": [("T1498.001", "Direct Network Flood", "Impact")],
|
|
"SVC_CONNECT": [("T1021", "Remote Services", "Lateral Movement")],
|
|
"SVC_AUTH_ATTEMPT": [("T1110", "Brute Force", "Credential Access")],
|
|
"SVC_COMMAND": [("T1059", "Command Interpreter", "Execution")],
|
|
"SVC_HTTP_REQUEST": [("T1190", "Exploit Public-Facing App", "Initial Access")],
|
|
"SVC_MQTT_MSG": [("T1071.001", "Web Protocols", "Command and Control")],
|
|
}
|
|
|
|
MITRE_DETAIL_MAP = {
|
|
"HONEYTOKEN": [("T1083", "File and Directory Discovery", "Discovery"),
|
|
("T1005", "Data from Local System", "Collection")],
|
|
"DOWNLOAD": [("T1105", "Ingress Tool Transfer", "Command and Control")],
|
|
"DNS_TUNNEL": [("T1071.004", "DNS", "Command and Control"),
|
|
("T1048.003", "Exfiltration Over Non-C2", "Exfiltration")],
|
|
"Mirai": [("T1583.005", "Botnet", "Resource Development")],
|
|
"Reverse-shell": [("T1059.004", "Unix Shell", "Execution")],
|
|
"RCE-python": [("T1059.006", "Python", "Execution")],
|
|
"CONFIG SET": [("T1053", "Scheduled Task/Job", "Persistence")],
|
|
"SLAVEOF": [("T1020", "Automated Exfiltration", "Exfiltration")],
|
|
"Write": [("T1565.001", "Stored Data Manipulation", "Impact")],
|
|
}
|
|
|
|
VALID_EVENT_TYPES = set(MITRE_MAP.keys()) | {
|
|
"HONEYTOKEN", "SVC_FTP_CMD", "SVC_SNMP_QUERY", "SVC_COAP_REQUEST",
|
|
"SVC_MODBUS_QUERY", "SVC_UPNP_REQUEST", "SVC_SIP_REGISTER",
|
|
"SVC_REDIS_CMD", "SVC_RTSP_REQUEST", "SVC_DNS_QUERY",
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# Kill Chain phase definitions and classifier
|
|
# ============================================================
|
|
|
|
KILL_CHAIN_PHASES = [
|
|
{"id": "recon", "order": 1, "score": 10, "label": "Reconnaissance"},
|
|
{"id": "weaponize", "order": 2, "score": 15, "label": "Weaponization"},
|
|
{"id": "delivery", "order": 3, "score": 20, "label": "Delivery"},
|
|
{"id": "exploitation", "order": 4, "score": 30, "label": "Exploitation"},
|
|
{"id": "installation", "order": 5, "score": 40, "label": "Installation"},
|
|
{"id": "c2", "order": 6, "score": 50, "label": "Command & Control"},
|
|
{"id": "actions", "order": 7, "score": 60, "label": "Actions on Objectives"},
|
|
]
|
|
|
|
PHASE_SCORES = {p["id"]: p["score"] for p in KILL_CHAIN_PHASES}
|
|
PHASE_ORDER = {p["id"]: p["order"] for p in KILL_CHAIN_PHASES}
|
|
|
|
|
|
def _classify_kill_chain_phase(event_type, detail):
|
|
"""Classify an event into the highest matching kill chain phase."""
|
|
d = (detail or "").upper()
|
|
|
|
# Phase 7: Actions on Objectives (exfiltration, sabotage)
|
|
if "HONEYTOKEN EXFIL" in d or "FTP STOR" in d or "CONFIG SET" in d:
|
|
return "actions"
|
|
# Phase 6: C2 (command & control channels)
|
|
if event_type == "SVC_MQTT_MSG" or "SLAVEOF" in d or "EVAL" in d:
|
|
return "c2"
|
|
# Phase 5: Installation (malware drop, honeytoken access)
|
|
if "HONEYTOKEN" in d or "DOWNLOAD" in d:
|
|
return "installation"
|
|
# Phase 4: Exploitation (command execution in shell)
|
|
if event_type == "SVC_COMMAND":
|
|
return "exploitation"
|
|
# Phase 3: Delivery (service connection, auth attempts)
|
|
if event_type in ("SVC_CONNECT", "SVC_AUTH_ATTEMPT", "SVC_HTTP_REQUEST"):
|
|
return "delivery"
|
|
# Phase 2: Weaponization (network manipulation)
|
|
if event_type in ("ARP_SPOOF", "ARP_FLOOD", "WIFI_EVIL_TWIN", "WIFI_EAPOL"):
|
|
return "weaponize"
|
|
if "DNS_TUNNEL" in d:
|
|
return "weaponize"
|
|
# Phase 1: Reconnaissance (scanning, probing)
|
|
if event_type in ("WIFI_PROBE", "PORT_SCAN", "ICMP_SWEEP", "SYN_FLOOD", "UDP_FLOOD"):
|
|
return "recon"
|
|
|
|
return None
|
|
|
|
|
|
def _extract_mitre(event_type, detail):
|
|
"""Resolve MITRE ATT&CK techniques for an event."""
|
|
techniques = list(MITRE_MAP.get(event_type, []))
|
|
for keyword, techs in MITRE_DETAIL_MAP.items():
|
|
if keyword in detail:
|
|
techniques.extend(techs)
|
|
seen = set()
|
|
result = []
|
|
for t in techniques:
|
|
if t[0] not in seen:
|
|
seen.add(t[0])
|
|
result.append({"id": t[0], "name": t[1], "tactic": t[2]})
|
|
return result
|
|
|
|
|
|
def _session_id(src_ip: str, dst_port: int, timestamp: float) -> str:
|
|
"""Compute session ID: md5(ip + port + 5min_bucket)[:12]."""
|
|
bucket = int(timestamp / 300)
|
|
key = f"{src_ip}:{dst_port}:{bucket}"
|
|
return hashlib.md5(key.encode()).hexdigest()[:12]
|
|
|
|
|
|
def _extract_fields(detail: str) -> dict:
|
|
"""Extract structured fields from event detail string."""
|
|
fields = {}
|
|
m = RE_USER.search(detail)
|
|
if m:
|
|
fields["username"] = m.group(1)
|
|
m = RE_PASS.search(detail)
|
|
if m:
|
|
fields["password"] = m.group(1)
|
|
m = RE_CMD.search(detail)
|
|
if m:
|
|
fields["command"] = m.group(1)
|
|
m = RE_URL.search(detail)
|
|
if m:
|
|
fields["url"] = m.group(1)
|
|
m = RE_FILE.search(detail)
|
|
if m:
|
|
fields["path"] = m.group(1)
|
|
m = RE_CLIENT.search(detail)
|
|
if m:
|
|
fields["client_id"] = m.group(1)
|
|
m = RE_COMMUNITY.search(detail)
|
|
if m:
|
|
fields["username"] = m.group(1) # community as username
|
|
m = RE_MALWARE_TAG.search(detail)
|
|
if m:
|
|
fields["malware_tag"] = m.group(1)
|
|
m = RE_OS_TAG.search(detail)
|
|
if m and m.group(1) in ("Linux", "Windows", "macOS", "Cisco"):
|
|
fields["os_tag"] = m.group(1)
|
|
return fields
|
|
|
|
|
|
class HpStore:
|
|
"""
|
|
Thread-safe SQLite-backed event store.
|
|
Maintains in-memory counters for fast dashboard polling.
|
|
"""
|
|
|
|
def __init__(self, db_path: str = "honeypot_events.db", geo_lookup=None):
|
|
self.db_path = db_path
|
|
self.lock = threading.Lock()
|
|
self._local = threading.local()
|
|
self._geo = geo_lookup # Optional HpGeoLookup instance
|
|
|
|
# In-memory counters for fast polling
|
|
self.total_count = 0
|
|
self.count_by_type: dict = defaultdict(int)
|
|
self.count_by_severity: dict = defaultdict(int)
|
|
self.count_by_device: dict = defaultdict(int)
|
|
self.attackers: OrderedDict = OrderedDict()
|
|
self._last_id = 0
|
|
|
|
# SSE subscribers
|
|
self._sse_queues: list = []
|
|
|
|
# Alert engine (set via set_alert_engine after bootstrap)
|
|
self._alert_engine = None
|
|
|
|
self._init_db()
|
|
|
|
def set_alert_engine(self, engine):
|
|
"""Wire the alert engine so events are evaluated for alerts."""
|
|
self._alert_engine = engine
|
|
|
|
def _get_conn(self) -> sqlite3.Connection:
|
|
"""Thread-local SQLite connection."""
|
|
if not hasattr(self._local, "conn") or self._local.conn is None:
|
|
self._local.conn = sqlite3.connect(self.db_path)
|
|
self._local.conn.row_factory = sqlite3.Row
|
|
self._local.conn.execute("PRAGMA journal_mode=WAL")
|
|
self._local.conn.execute("PRAGMA synchronous=NORMAL")
|
|
return self._local.conn
|
|
|
|
def _init_db(self):
|
|
"""Create tables and indexes, with migrations for existing DBs."""
|
|
conn = self._get_conn()
|
|
|
|
# Create table (without mitre_techniques for compat with existing DBs)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
device_id TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
severity TEXT NOT NULL,
|
|
src_mac TEXT,
|
|
src_ip TEXT,
|
|
src_port INTEGER,
|
|
dst_port INTEGER,
|
|
detail TEXT,
|
|
received_at REAL,
|
|
service TEXT,
|
|
username TEXT,
|
|
password TEXT,
|
|
command TEXT,
|
|
url TEXT,
|
|
path TEXT,
|
|
client_id TEXT,
|
|
session_id TEXT,
|
|
os_tag TEXT,
|
|
malware_tag TEXT
|
|
)
|
|
""")
|
|
|
|
# Migration: add mitre_techniques column if missing
|
|
cols = {r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()}
|
|
if "mitre_techniques" not in cols:
|
|
conn.execute("ALTER TABLE events ADD COLUMN mitre_techniques TEXT")
|
|
|
|
# Create indexes (safe now that all columns exist)
|
|
conn.executescript("""
|
|
CREATE INDEX IF NOT EXISTS idx_ts ON events(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_type ON events(event_type);
|
|
CREATE INDEX IF NOT EXISTS idx_sev ON events(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_ip ON events(src_ip);
|
|
CREATE INDEX IF NOT EXISTS idx_session ON events(session_id);
|
|
CREATE INDEX IF NOT EXISTS idx_service ON events(service);
|
|
CREATE INDEX IF NOT EXISTS idx_mitre ON events(mitre_techniques);
|
|
""")
|
|
conn.commit()
|
|
|
|
# Load counters from existing data
|
|
row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone()
|
|
self.total_count = row["cnt"] or 0
|
|
self._last_id = row["max_id"] or 0
|
|
|
|
for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"):
|
|
self.count_by_type[row["event_type"]] = row["cnt"]
|
|
for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"):
|
|
self.count_by_severity[row["severity"]] = row["cnt"]
|
|
for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"):
|
|
self.count_by_device[row["device_id"]] = row["cnt"]
|
|
|
|
def parse_and_store(self, device_id: str, payload: str) -> Optional[dict]:
|
|
"""
|
|
Parse EVT wire format and store in SQLite.
|
|
Format: EVT|<type>|<severity>|<mac>|<ip>:<port>><dport>|<detail>
|
|
Returns event dict or None.
|
|
"""
|
|
try:
|
|
parts = payload.split("|", 5)
|
|
if len(parts) < 6 or parts[0] not in ("EVT", "HP"):
|
|
return None
|
|
|
|
_, event_type, severity, src_mac, addr_part, detail = parts
|
|
|
|
# Validate severity to prevent pipe injection
|
|
if severity not in VALID_SEVERITIES:
|
|
severity = "LOW"
|
|
|
|
src_ip = "0.0.0.0"
|
|
src_port = 0
|
|
dst_port = 0
|
|
|
|
if ">" in addr_part:
|
|
left, dst_port_s = addr_part.split(">", 1)
|
|
dst_port = int(dst_port_s) if dst_port_s.isdigit() else 0
|
|
if ":" in left:
|
|
src_ip, src_port_s = left.rsplit(":", 1)
|
|
src_port = int(src_port_s) if src_port_s.isdigit() else 0
|
|
|
|
now = time.time()
|
|
fields = _extract_fields(detail)
|
|
service = PORT_TO_SERVICE.get(dst_port, "")
|
|
session_id = _session_id(src_ip, dst_port, now)
|
|
|
|
mitre = _extract_mitre(event_type, detail)
|
|
mitre_json = json.dumps(mitre) if mitre else None
|
|
|
|
conn = self._get_conn()
|
|
cur = conn.execute(
|
|
"""INSERT INTO events (timestamp, device_id, event_type, severity,
|
|
src_mac, src_ip, src_port, dst_port, detail, received_at,
|
|
service, username, password, command, url, path,
|
|
client_id, session_id, os_tag, malware_tag, mitre_techniques)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(now, device_id, event_type, severity,
|
|
src_mac, src_ip, src_port, dst_port, detail, now,
|
|
service, fields.get("username"), fields.get("password"),
|
|
fields.get("command"), fields.get("url"), fields.get("path"),
|
|
fields.get("client_id"), session_id,
|
|
fields.get("os_tag"), fields.get("malware_tag"), mitre_json)
|
|
)
|
|
conn.commit()
|
|
event_id = cur.lastrowid
|
|
|
|
with self.lock:
|
|
self.total_count += 1
|
|
self._last_id = event_id
|
|
self.count_by_type[event_type] += 1
|
|
self.count_by_severity[severity] += 1
|
|
self.count_by_device[device_id] += 1
|
|
|
|
if src_ip != "0.0.0.0":
|
|
if src_ip not in self.attackers:
|
|
# Evict oldest (LRU) if at capacity — O(1)
|
|
if len(self.attackers) >= 10000:
|
|
self.attackers.popitem(last=False)
|
|
self.attackers[src_ip] = {
|
|
"ip": src_ip, "mac": src_mac, "types": set(),
|
|
"count": 0, "first_seen": now, "last_seen": now,
|
|
}
|
|
# Move to end on each access so LRU is always at front
|
|
self.attackers.move_to_end(src_ip)
|
|
atk = self.attackers[src_ip]
|
|
atk["types"].add(event_type)
|
|
atk["count"] += 1
|
|
atk["last_seen"] = now
|
|
if src_mac != "00:00:00:00:00:00":
|
|
atk["mac"] = src_mac
|
|
|
|
evt_dict = {
|
|
"id": event_id, "timestamp": now, "device_id": device_id,
|
|
"event_type": event_type, "severity": severity,
|
|
"src_mac": src_mac, "src_ip": src_ip, "src_port": src_port,
|
|
"dst_port": dst_port, "detail": detail, "received_at": now,
|
|
"service": service, "session_id": session_id,
|
|
**fields,
|
|
}
|
|
|
|
# Trigger background geo resolution (non-blocking)
|
|
if self._geo and src_ip != "0.0.0.0":
|
|
self._geo.lookup_ip(src_ip)
|
|
|
|
# Notify SSE subscribers (snapshot under lock)
|
|
with self.lock:
|
|
sse_snapshot = list(self._sse_queues)
|
|
for q in sse_snapshot:
|
|
try:
|
|
q.append(evt_dict)
|
|
except Exception:
|
|
pass
|
|
|
|
# Evaluate alert rules
|
|
if self._alert_engine:
|
|
try:
|
|
self._alert_engine.evaluate(evt_dict)
|
|
except Exception as e:
|
|
print(f"[HP_STORE] Alert evaluation error: {e}")
|
|
|
|
return evt_dict
|
|
|
|
except Exception as e:
|
|
print(f"[HP_STORE] Failed to parse event: {e} — payload={payload!r}")
|
|
return None
|
|
|
|
# ============================================================
|
|
# Query APIs
|
|
# ============================================================
|
|
|
|
def get_recent_events(self, limit: int = 100, event_type: str = None,
|
|
severity: str = None, src_ip: str = None,
|
|
service: str = None, offset: int = 0) -> list:
|
|
"""Get recent events with optional filters."""
|
|
conn = self._get_conn()
|
|
conditions = []
|
|
params = []
|
|
if event_type:
|
|
conditions.append("event_type = ?")
|
|
params.append(event_type)
|
|
if severity:
|
|
conditions.append("severity = ?")
|
|
params.append(severity)
|
|
if src_ip:
|
|
conditions.append("src_ip = ?")
|
|
params.append(src_ip)
|
|
if service:
|
|
conditions.append("service = ?")
|
|
params.append(service)
|
|
|
|
where = " WHERE " + " AND ".join(conditions) if conditions else ""
|
|
sql = f"SELECT * FROM events{where} ORDER BY id DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_event_by_id(self, event_id: int) -> Optional[dict]:
|
|
"""Get single event with related events (same IP +-5min)."""
|
|
conn = self._get_conn()
|
|
row = conn.execute("SELECT * FROM events WHERE id = ?", (event_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
evt = dict(row)
|
|
# Related events: same IP, +-5 minutes
|
|
related = conn.execute(
|
|
"""SELECT * FROM events WHERE src_ip = ? AND id != ?
|
|
AND timestamp BETWEEN ? AND ? ORDER BY timestamp LIMIT 20""",
|
|
(evt["src_ip"], event_id,
|
|
evt["timestamp"] - 300, evt["timestamp"] + 300)
|
|
).fetchall()
|
|
evt["related"] = [dict(r) for r in related]
|
|
return evt
|
|
|
|
def get_attacker_profile(self, ip: str) -> dict:
|
|
"""Full attacker profile: events, credentials, commands, sessions."""
|
|
conn = self._get_conn()
|
|
events = conn.execute(
|
|
"SELECT * FROM events WHERE src_ip = ? ORDER BY timestamp DESC LIMIT 200",
|
|
(ip,)
|
|
).fetchall()
|
|
|
|
creds = conn.execute(
|
|
"""SELECT username, password, service, timestamp FROM events
|
|
WHERE src_ip = ? AND username IS NOT NULL ORDER BY timestamp""",
|
|
(ip,)
|
|
).fetchall()
|
|
|
|
commands = conn.execute(
|
|
"""SELECT command, detail, service, timestamp FROM events
|
|
WHERE src_ip = ? AND command IS NOT NULL ORDER BY timestamp""",
|
|
(ip,)
|
|
).fetchall()
|
|
|
|
sessions = conn.execute(
|
|
"""SELECT session_id, service, MIN(timestamp) as start,
|
|
MAX(timestamp) as end, COUNT(*) as event_count,
|
|
MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3
|
|
WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_sev
|
|
FROM events WHERE src_ip = ? GROUP BY session_id
|
|
ORDER BY start DESC""",
|
|
(ip,)
|
|
).fetchall()
|
|
|
|
profile = {
|
|
"ip": ip,
|
|
"total_events": len(events),
|
|
"events": [dict(r) for r in events[:50]],
|
|
"credentials": [dict(r) for r in creds],
|
|
"commands": [dict(r) for r in commands],
|
|
"sessions": [dict(r) for r in sessions],
|
|
}
|
|
if self._geo:
|
|
geo = self._geo.lookup_ip(ip)
|
|
profile["country"] = geo.get("country", "")
|
|
profile["country_code"] = geo.get("country_code", "")
|
|
profile["city"] = geo.get("city", "")
|
|
profile["isp"] = geo.get("isp", "")
|
|
profile["is_private"] = geo.get("is_private", False)
|
|
# Vendor from first known MAC for this IP
|
|
with self.lock:
|
|
atk = self.attackers.get(ip, {})
|
|
profile["vendor"] = self._geo.lookup_mac_vendor(atk.get("mac", ""))
|
|
return profile
|
|
|
|
def get_sessions(self, limit: int = 50, src_ip: str = None) -> list:
|
|
"""Get grouped sessions."""
|
|
conn = self._get_conn()
|
|
conditions = []
|
|
params = []
|
|
if src_ip:
|
|
conditions.append("src_ip = ?")
|
|
params.append(src_ip)
|
|
where = " WHERE " + " AND ".join(conditions) if conditions else ""
|
|
|
|
sql = f"""SELECT session_id, src_ip, service, dst_port,
|
|
MIN(timestamp) as start_time, MAX(timestamp) as end_time,
|
|
COUNT(*) as event_count,
|
|
MAX(CASE severity WHEN 'CRITICAL' THEN 4 WHEN 'HIGH' THEN 3
|
|
WHEN 'MEDIUM' THEN 2 ELSE 1 END) as max_severity,
|
|
SUM(CASE WHEN username IS NOT NULL THEN 1 ELSE 0 END) as auth_count,
|
|
SUM(CASE WHEN command IS NOT NULL THEN 1 ELSE 0 END) as cmd_count,
|
|
SUM(CASE WHEN malware_tag IS NOT NULL THEN 1 ELSE 0 END) as malware_count
|
|
FROM events{where}
|
|
GROUP BY session_id
|
|
ORDER BY start_time DESC LIMIT ?"""
|
|
params.append(limit)
|
|
rows = conn.execute(sql, params).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_session_events(self, session_id: str) -> list:
|
|
"""Get all events for a session."""
|
|
conn = self._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT * FROM events WHERE session_id = ? ORDER BY timestamp",
|
|
(session_id,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
@staticmethod
|
|
def _escape_like(s):
|
|
"""Escape LIKE wildcards to prevent DoS via crafted search queries."""
|
|
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
|
|
|
def search(self, q: str = None, event_type: str = None,
|
|
severity: str = None, src_ip: str = None,
|
|
service: str = None, from_ts: float = None,
|
|
to_ts: float = None, limit: int = 100,
|
|
offset: int = 0) -> list:
|
|
"""Multi-criteria search."""
|
|
conn = self._get_conn()
|
|
conditions = []
|
|
params = []
|
|
if q:
|
|
conditions.append("(detail LIKE ? ESCAPE '\\' "
|
|
"OR username LIKE ? ESCAPE '\\' "
|
|
"OR command LIKE ? ESCAPE '\\')")
|
|
escaped = self._escape_like(q)
|
|
like = f"%{escaped}%"
|
|
params.extend([like, like, like])
|
|
if event_type:
|
|
conditions.append("event_type = ?")
|
|
params.append(event_type)
|
|
if severity:
|
|
conditions.append("severity = ?")
|
|
params.append(severity)
|
|
if src_ip:
|
|
conditions.append("src_ip = ?")
|
|
params.append(src_ip)
|
|
if service:
|
|
conditions.append("service = ?")
|
|
params.append(service)
|
|
if from_ts:
|
|
conditions.append("timestamp >= ?")
|
|
params.append(from_ts)
|
|
if to_ts:
|
|
conditions.append("timestamp <= ?")
|
|
params.append(to_ts)
|
|
|
|
where = " WHERE " + " AND ".join(conditions) if conditions else ""
|
|
sql = f"SELECT * FROM events{where} ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
|
params.append(limit)
|
|
params.append(offset)
|
|
rows = conn.execute(sql, params).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_timeline(self, hours: int = 24, bucket_minutes: int = 5) -> list:
|
|
"""Get event counts per time bucket, grouped by severity."""
|
|
conn = self._get_conn()
|
|
since = time.time() - (hours * 3600)
|
|
bucket_s = bucket_minutes * 60
|
|
|
|
rows = conn.execute(
|
|
"""SELECT CAST(timestamp / ? AS INTEGER) * ? as bucket,
|
|
severity, COUNT(*) as cnt
|
|
FROM events WHERE timestamp >= ?
|
|
GROUP BY bucket, severity ORDER BY bucket""",
|
|
(bucket_s, bucket_s, since)
|
|
).fetchall()
|
|
|
|
# Group by bucket
|
|
buckets = {}
|
|
for r in rows:
|
|
b = r["bucket"]
|
|
if b not in buckets:
|
|
buckets[b] = {"time": b, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0, "total": 0}
|
|
buckets[b][r["severity"]] = r["cnt"]
|
|
buckets[b]["total"] += r["cnt"]
|
|
|
|
return sorted(buckets.values(), key=lambda x: x["time"])
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Get aggregated statistics (from memory for speed)."""
|
|
with self.lock:
|
|
return {
|
|
"total_events": self.total_count,
|
|
"by_type": dict(self.count_by_type),
|
|
"by_severity": dict(self.count_by_severity),
|
|
"by_device": dict(self.count_by_device),
|
|
"last_id": self._last_id,
|
|
}
|
|
|
|
def get_attackers(self, limit: int = 50) -> list:
|
|
"""Get top attackers sorted by event count, enriched with geo/vendor."""
|
|
with self.lock:
|
|
attackers = []
|
|
for ip, data in self.attackers.items():
|
|
attackers.append({
|
|
"ip": data["ip"], "mac": data["mac"],
|
|
"types": list(data["types"]),
|
|
"count": data["count"],
|
|
"first_seen": data["first_seen"],
|
|
"last_seen": data["last_seen"],
|
|
})
|
|
attackers.sort(key=lambda x: x["count"], reverse=True)
|
|
result = attackers[:limit]
|
|
if self._geo:
|
|
for atk in result:
|
|
self._geo.enrich_attacker(atk)
|
|
return result
|
|
|
|
def get_events_after(self, last_id: int, limit: int = 50) -> list:
|
|
"""Get events after a given ID (for SSE polling)."""
|
|
conn = self._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT * FROM events WHERE id > ? ORDER BY id LIMIT ?",
|
|
(last_id, limit)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_last_id(self) -> int:
|
|
"""Get last event ID."""
|
|
with self.lock:
|
|
return self._last_id
|
|
|
|
@staticmethod
|
|
def _csv_safe(val):
|
|
"""Escape a CSV cell to prevent formula injection in spreadsheet apps."""
|
|
s = str(val).replace(",", ";")
|
|
if s and s[0] in ("=", "+", "-", "@", "\t", "\r"):
|
|
s = "'" + s
|
|
return s
|
|
|
|
def export_events(self, fmt: str = "json", **filters) -> str:
|
|
"""Export events as JSON or CSV."""
|
|
events = self.search(**filters, limit=10000)
|
|
if fmt == "csv":
|
|
if not events:
|
|
return ""
|
|
cols = list(events[0].keys())
|
|
lines = [",".join(cols)]
|
|
for e in events:
|
|
lines.append(",".join(self._csv_safe(e.get(c, "")) for c in cols))
|
|
return "\n".join(lines)
|
|
return json.dumps(events, default=str)
|
|
|
|
def get_credential_intel(self):
|
|
"""Get credential intelligence summary."""
|
|
conn = self._get_conn()
|
|
top_usernames = conn.execute(
|
|
"SELECT username, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services "
|
|
"FROM events WHERE username IS NOT NULL "
|
|
"GROUP BY username ORDER BY cnt DESC LIMIT 20"
|
|
).fetchall()
|
|
top_passwords = conn.execute(
|
|
"SELECT password, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT service) as services "
|
|
"FROM events WHERE password IS NOT NULL "
|
|
"GROUP BY password ORDER BY cnt DESC LIMIT 20"
|
|
).fetchall()
|
|
top_combos = conn.execute(
|
|
"SELECT username, password, COUNT(*) as cnt, service "
|
|
"FROM events WHERE username IS NOT NULL AND password IS NOT NULL "
|
|
"GROUP BY username, password ORDER BY cnt DESC LIMIT 20"
|
|
).fetchall()
|
|
by_service = conn.execute(
|
|
"SELECT service, COUNT(DISTINCT username) as unique_users, "
|
|
"COUNT(*) as total_attempts "
|
|
"FROM events WHERE username IS NOT NULL AND service IS NOT NULL "
|
|
"GROUP BY service ORDER BY total_attempts DESC"
|
|
).fetchall()
|
|
return {
|
|
"top_usernames": [dict(r) for r in top_usernames],
|
|
"top_passwords": [dict(r) for r in top_passwords],
|
|
"top_combos": [dict(r) for r in top_combos],
|
|
"by_service": [dict(r) for r in by_service],
|
|
}
|
|
|
|
def get_mitre_coverage(self):
|
|
"""Get MITRE ATT&CK coverage summary."""
|
|
conn = self._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT mitre_techniques FROM events WHERE mitre_techniques IS NOT NULL"
|
|
).fetchall()
|
|
techniques = {}
|
|
tactics = {}
|
|
for r in rows:
|
|
try:
|
|
for t in json.loads(r["mitre_techniques"]):
|
|
tid = t["id"]
|
|
tactic = t["tactic"]
|
|
techniques[tid] = techniques.get(tid, 0) + 1
|
|
tactics[tactic] = tactics.get(tactic, 0) + 1
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
return {"techniques": techniques, "tactics": tactics}
|
|
|
|
# ============================================================
|
|
# Kill Chain Analysis
|
|
# ============================================================
|
|
|
|
def get_kill_chain_analysis(self, ip):
|
|
"""Analyze kill chain progression for a specific IP."""
|
|
conn = self._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT event_type, detail, timestamp, mitre_techniques "
|
|
"FROM events WHERE src_ip = ? ORDER BY timestamp",
|
|
(ip,)
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return {"ip": ip, "phases": {}, "score": 0, "max_phase": None,
|
|
"progression_pct": 0, "duration_seconds": 0,
|
|
"is_full_chain": False, "total_events": 0}
|
|
|
|
phases = {}
|
|
for row in rows:
|
|
phase = _classify_kill_chain_phase(row["event_type"], row["detail"])
|
|
if not phase:
|
|
continue
|
|
if phase not in phases:
|
|
phases[phase] = {
|
|
"count": 0, "first_seen": row["timestamp"],
|
|
"last_seen": row["timestamp"], "techniques": set()
|
|
}
|
|
p = phases[phase]
|
|
p["count"] += 1
|
|
p["last_seen"] = row["timestamp"]
|
|
if row["mitre_techniques"]:
|
|
try:
|
|
for t in json.loads(row["mitre_techniques"]):
|
|
p["techniques"].add(t["id"])
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
for p in phases.values():
|
|
p["techniques"] = list(p["techniques"])
|
|
|
|
score = sum(PHASE_SCORES.get(pid, 0) for pid in phases)
|
|
max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0)) if phases else None
|
|
max_order = PHASE_ORDER.get(max_phase, 0) if max_phase else 0
|
|
|
|
return {
|
|
"ip": ip,
|
|
"phases": phases,
|
|
"score": score,
|
|
"max_phase": max_phase,
|
|
"max_phase_order": max_order,
|
|
"progression_pct": round(max_order / 7 * 100),
|
|
"duration_seconds": round(rows[-1]["timestamp"] - rows[0]["timestamp"]),
|
|
"is_full_chain": max_order >= 5,
|
|
"total_events": len(rows),
|
|
}
|
|
|
|
def get_kill_chain_top(self, limit=20):
|
|
"""Get top attackers by kill chain score (single query, in-memory analysis)."""
|
|
conn = self._get_conn()
|
|
rows = conn.execute(
|
|
"SELECT src_ip, event_type, detail, timestamp, mitre_techniques "
|
|
"FROM events WHERE src_ip != '0.0.0.0' ORDER BY src_ip, timestamp"
|
|
).fetchall()
|
|
|
|
ip_events = defaultdict(list)
|
|
for r in rows:
|
|
ip_events[r["src_ip"]].append(r)
|
|
|
|
results = []
|
|
for ip, events in ip_events.items():
|
|
phases = {}
|
|
for row in events:
|
|
phase = _classify_kill_chain_phase(row["event_type"], row["detail"])
|
|
if not phase:
|
|
continue
|
|
if phase not in phases:
|
|
phases[phase] = {"count": 0, "first_seen": row["timestamp"],
|
|
"techniques": set()}
|
|
phases[phase]["count"] += 1
|
|
if row["mitre_techniques"]:
|
|
try:
|
|
for t in json.loads(row["mitre_techniques"]):
|
|
phases[phase]["techniques"].add(t["id"])
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
if not phases:
|
|
continue
|
|
|
|
for p in phases.values():
|
|
p["techniques"] = list(p["techniques"])
|
|
|
|
score = sum(PHASE_SCORES.get(pid, 0) for pid in phases)
|
|
max_phase = max(phases.keys(), key=lambda x: PHASE_ORDER.get(x, 0))
|
|
max_order = PHASE_ORDER.get(max_phase, 0)
|
|
|
|
results.append({
|
|
"ip": ip,
|
|
"phases": phases,
|
|
"score": score,
|
|
"max_phase": max_phase,
|
|
"max_phase_order": max_order,
|
|
"progression_pct": round(max_order / 7 * 100),
|
|
"duration_seconds": round(events[-1]["timestamp"] - events[0]["timestamp"]),
|
|
"is_full_chain": max_order >= 5,
|
|
"total_events": len(events),
|
|
})
|
|
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:limit]
|
|
|
|
# ============================================================
|
|
# Honeytoken Stats
|
|
# ============================================================
|
|
|
|
def get_honeytoken_stats(self):
|
|
"""Get honeytoken event statistics."""
|
|
conn = self._get_conn()
|
|
ht_events = conn.execute(
|
|
"SELECT * FROM events WHERE detail LIKE '%HONEYTOKEN%' "
|
|
"ORDER BY timestamp DESC"
|
|
).fetchall()
|
|
|
|
by_service = defaultdict(int)
|
|
by_type = defaultdict(int)
|
|
by_ip = defaultdict(lambda: {"count": 0, "services": set()})
|
|
|
|
for evt in ht_events:
|
|
e = dict(evt)
|
|
svc = e.get("service") or "unknown"
|
|
by_service[svc] += 1
|
|
|
|
detail = e.get("detail", "")
|
|
if "credential" in detail.lower():
|
|
by_type["credential"] += 1
|
|
elif "exfil" in detail.lower():
|
|
by_type["exfil_attempt"] += 1
|
|
else:
|
|
by_type["file_access"] += 1
|
|
|
|
ip = e.get("src_ip", "")
|
|
if ip:
|
|
by_ip[ip]["count"] += 1
|
|
by_ip[ip]["services"].add(svc)
|
|
|
|
top_attackers = sorted(by_ip.items(), key=lambda x: x[1]["count"], reverse=True)[:10]
|
|
|
|
return {
|
|
"total": len(ht_events),
|
|
"by_service": dict(by_service),
|
|
"by_type": dict(by_type),
|
|
"top_attackers": [
|
|
{"ip": ip, "count": d["count"], "services": list(d["services"])}
|
|
for ip, d in top_attackers
|
|
],
|
|
"recent": [dict(e) for e in ht_events[:10]],
|
|
}
|
|
|
|
# ============================================================
|
|
# Data Retention / Cleanup
|
|
# ============================================================
|
|
|
|
def cleanup_old_events(self, days=30):
|
|
"""Delete events older than `days` days. Returns count of deleted rows."""
|
|
cutoff = time.time() - (days * 86400)
|
|
conn = self._get_conn()
|
|
cur = conn.execute("DELETE FROM events WHERE timestamp < ?", (cutoff,))
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
if deleted > 0:
|
|
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
|
|
# Rebuild in-memory counters from DB
|
|
self._reload_counters()
|
|
print(f"[HP_STORE] Cleanup: deleted {deleted} events older than {days} days")
|
|
return deleted
|
|
|
|
def vacuum_db(self):
|
|
"""Reclaim disk space after cleanup."""
|
|
conn = self._get_conn()
|
|
conn.execute("VACUUM")
|
|
print("[HP_STORE] Database vacuumed")
|
|
|
|
def _reload_counters(self):
|
|
"""Reload in-memory counters from the database."""
|
|
conn = self._get_conn()
|
|
with self.lock:
|
|
row = conn.execute("SELECT COUNT(*) as cnt, MAX(id) as max_id FROM events").fetchone()
|
|
self.total_count = row["cnt"] or 0
|
|
self._last_id = row["max_id"] or 0
|
|
self.count_by_type.clear()
|
|
self.count_by_severity.clear()
|
|
self.count_by_device.clear()
|
|
for row in conn.execute("SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type"):
|
|
self.count_by_type[row["event_type"]] = row["cnt"]
|
|
for row in conn.execute("SELECT severity, COUNT(*) as cnt FROM events GROUP BY severity"):
|
|
self.count_by_severity[row["severity"]] = row["cnt"]
|
|
for row in conn.execute("SELECT device_id, COUNT(*) as cnt FROM events GROUP BY device_id"):
|
|
self.count_by_device[row["device_id"]] = row["cnt"]
|
|
|
|
def start_cleanup_timer(self, days=30, interval_hours=24):
|
|
"""Start a background thread that cleans old events periodically."""
|
|
def _cleanup_loop():
|
|
while True:
|
|
time.sleep(interval_hours * 3600)
|
|
try:
|
|
self.cleanup_old_events(days=days)
|
|
except Exception as e:
|
|
print(f"[HP_STORE] Cleanup error: {e}")
|
|
t = threading.Thread(target=_cleanup_loop, daemon=True)
|
|
t.start()
|
|
print(f"[HP_STORE] Cleanup timer started: every {interval_hours}h, retention={days} days")
|
|
|
|
def register_sse_queue(self, q):
|
|
"""Register a deque for SSE event push."""
|
|
with self.lock:
|
|
self._sse_queues.append(q)
|
|
|
|
def unregister_sse_queue(self, q):
|
|
"""Unregister an SSE queue."""
|
|
with self.lock:
|
|
try:
|
|
self._sse_queues.remove(q)
|
|
except ValueError:
|
|
pass
|