""" Honeypot Dashboard Blueprint — Flask routes for C3PO integration. Usage in C3PO server.py: from hp_dashboard import create_hp_blueprint, HpStore, HpCommander, HpAlertEngine hp_store = HpStore() hp_alerts = HpAlertEngine() hp_commander = HpCommander( get_transport=lambda: transport, get_registry=lambda: self.device_registry, ) app.register_blueprint(create_hp_blueprint({ **base_config, "hp_store": hp_store, "hp_commander": hp_commander, "hp_alerts": hp_alerts, })) """ import os import re import time import json import hmac import hashlib import secrets import tempfile from collections import deque from flask import Blueprint, jsonify, request, render_template, url_for, Response, send_from_directory from .hp_commander import (HP_SERVICES, HP_MONITORS, SYSTEM_COMMANDS, ALLOWED_COMMANDS, CONFIG_BANNER_SERVICES, CONFIG_THRESHOLDS) from .hp_store import KILL_CHAIN_PHASES def _clamp(val, lo, hi): """Clamp a value between lo and hi.""" return max(lo, min(hi, val)) def _sanitize_str(val, max_len=256, default=""): """Strip and length-bound a string input. Returns default if not a string.""" if not isinstance(val, str): return default return val.strip()[:max_len] def create_hp_blueprint(server_config): """ Create the honeypot dashboard blueprint. server_config keys: - hp_store: HpStore instance - hp_commander: HpCommander instance - hp_alerts: HpAlertEngine instance (optional) - require_login (optional): auth decorator for HTML pages """ bp = Blueprint("honeypot", __name__, static_folder="static", static_url_path="/hp-static") store = server_config["hp_store"] commander = server_config["hp_commander"] alerts = server_config.get("hp_alerts") geo = server_config.get("hp_geo") # Optional HpGeoLookup require_login = server_config.get("require_login", lambda f: f) # Detect C3PO mode (base_config includes c2_root when running inside C3PO) c3po_nav = "c2_root" in server_config # Wire alert engine to store for kill chain analysis if alerts and hasattr(alerts, "set_store"): alerts.set_store(store) # Endpoints exempt from auth (called by devices, not browsers) _PUBLIC_ENDPOINTS = frozenset({ "api_hp_ota_firmware_download", "api_hp_health", "api_hp_csrf_token", }) # ------------------------------------------------------------------ # CSRF protection — token-based for all state-changing requests # ------------------------------------------------------------------ _csrf_secret = secrets.token_bytes(32) def _csrf_generate(): """Generate a signed CSRF token.""" nonce = secrets.token_hex(16) sig = hmac.new(_csrf_secret, nonce.encode(), hashlib.sha256).hexdigest()[:32] return f"{nonce}.{sig}" def _csrf_validate(token): """Validate a CSRF token signature.""" if not token or "." not in token: return False nonce, sig = token.rsplit(".", 1) expected = hmac.new(_csrf_secret, nonce.encode(), hashlib.sha256).hexdigest()[:32] return hmac.compare_digest(sig, expected) @bp.before_request def _csrf_check(): """Enforce CSRF token on state-changing methods.""" if request.method in ("GET", "HEAD", "OPTIONS"): return None # Public endpoints skip CSRF (device-to-server) if request.endpoint in _PUBLIC_ENDPOINTS: return None # File uploads use multipart — accept token from form field or header token = (request.headers.get("X-CSRF-Token") or (request.form.get("_csrf_token") if request.form else None)) if not _csrf_validate(token): return jsonify({"error": "CSRF token missing or invalid"}), 403 @bp.route("/api/honeypot/csrf-token") def api_hp_csrf_token(): """Get a CSRF token for subsequent POST requests.""" return jsonify({"token": _csrf_generate()}) # ============================================================ # READ-ONLY API (preserved from v1) # ============================================================ @bp.route("/api/honeypot/events") def api_hp_events(): """Get recent honeypot events.""" limit = _clamp(request.args.get("limit", 100, type=int), 1, 1000) event_type = request.args.get("type", None) severity = request.args.get("severity", None) src_ip = request.args.get("ip", None) service = request.args.get("service", None) offset = max(0, request.args.get("offset", 0, type=int)) events = store.get_recent_events( limit=limit, event_type=event_type, severity=severity, src_ip=src_ip, service=service, offset=offset, ) return jsonify({"events": events, "count": len(events)}) @bp.route("/api/honeypot/stats") def api_hp_stats(): """Get aggregated honeypot statistics.""" stats = store.get_stats() if alerts: stats["alert_count"] = alerts.get_unacknowledged_count() return jsonify(stats) @bp.route("/api/honeypot/attackers") def api_hp_attackers(): """Get top attackers.""" limit = _clamp(request.args.get("limit", 50, type=int), 1, 1000) attackers = store.get_attackers(limit=limit) return jsonify({"attackers": attackers, "count": len(attackers)}) # ============================================================ # NEW: Event Detail + Attacker Profile # ============================================================ @bp.route("/api/honeypot/events/") def api_hp_event_detail(event_id): """Get single event with related events.""" evt = store.get_event_by_id(event_id) if not evt: return jsonify({"error": "Not found"}), 404 return jsonify(evt) @bp.route("/api/honeypot/attacker/") def api_hp_attacker_profile(ip): """Full attacker profile.""" profile = store.get_attacker_profile(ip) return jsonify(profile) # ============================================================ # NEW: Sessions # ============================================================ @bp.route("/api/honeypot/sessions") def api_hp_sessions(): """Get grouped sessions.""" limit = _clamp(request.args.get("limit", 50, type=int), 1, 1000) src_ip = request.args.get("ip", None) sessions = store.get_sessions(limit=limit, src_ip=src_ip) return jsonify({"sessions": sessions, "count": len(sessions)}) @bp.route("/api/honeypot/sessions/") def api_hp_session_events(session_id): """Get all events for a session.""" events = store.get_session_events(session_id) return jsonify({"events": events, "count": len(events)}) # ============================================================ # NEW: Search + Timeline + Export # ============================================================ @bp.route("/api/honeypot/search") def api_hp_search(): """Multi-criteria search.""" q_raw = request.args.get("q") q_clean = _sanitize_str(q_raw, max_len=200) if q_raw else None events = store.search( q=q_clean, event_type=_sanitize_str(request.args.get("type"), max_len=64), severity=_sanitize_str(request.args.get("severity"), max_len=16), src_ip=_sanitize_str(request.args.get("ip"), max_len=45), service=_sanitize_str(request.args.get("service"), max_len=32), from_ts=request.args.get("from", type=float), to_ts=request.args.get("to", type=float), limit=_clamp(request.args.get("limit", 100, type=int), 1, 1000), offset=_clamp(request.args.get("offset", 0, type=int), 0, 100000), ) return jsonify({"events": events, "count": len(events)}) @bp.route("/api/honeypot/timeline") def api_hp_timeline(): """Event counts per time bucket by severity.""" hours = _clamp(request.args.get("hours", 24, type=int), 1, 168) bucket = _clamp(request.args.get("bucket", 5, type=int), 1, 60) data = store.get_timeline(hours=hours, bucket_minutes=bucket) return jsonify({"timeline": data}) @bp.route("/api/honeypot/export") def api_hp_export(): """Export events as CSV or JSON.""" fmt = request.args.get("format", "json") content = store.export_events( fmt=fmt, q=request.args.get("q"), event_type=request.args.get("type"), severity=request.args.get("severity"), src_ip=request.args.get("ip"), service=request.args.get("service"), from_ts=request.args.get("from", type=float), to_ts=request.args.get("to", type=float), ) if fmt == "csv": return Response( content, mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=honeypot_events.csv"}, ) return Response(content, mimetype="application/json") # ============================================================ # NEW: SSE Event Stream # ============================================================ @bp.route("/api/honeypot/stream") def api_hp_stream(): """Server-Sent Events stream for real-time updates.""" _sev_map = {"LOW": 0, "MEDIUM": 1, "HIGH": 2, "CRITICAL": 3} min_sev_name = request.args.get("min_severity", "MEDIUM").upper() min_sev = _sev_map.get(min_sev_name, 1) def generate(): q = deque(maxlen=100) store.register_sse_queue(q) try: # Send initial keepalive yield "data: {\"type\":\"connected\"}\n\n" while True: if q: evt = q.popleft() evt_sev = _sev_map.get((evt.get("severity") or "LOW").upper(), 0) if evt_sev < min_sev: continue yield f"data: {json.dumps(evt, default=str)}\n\n" else: # Keepalive every 15s yield ": keepalive\n\n" # Use a short sleep to avoid busy-wait import time as _time _time.sleep(0.5) except GeneratorExit: pass finally: store.unregister_sse_queue(q) return Response( generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) # ============================================================ # NEW: Alert Rules & Active Alerts # ============================================================ @bp.route("/api/honeypot/alerts/rules") def api_hp_alert_rules(): """List alert rules.""" if not alerts: return jsonify({"rules": []}) return jsonify({"rules": alerts.get_rules()}) @bp.route("/api/honeypot/alerts/rules", methods=["POST"]) def api_hp_add_alert_rule(): """Add a custom alert rule.""" if not alerts: return jsonify({"error": "Alert engine not configured"}), 500 data = request.get_json(silent=True) or {} # Sanitize string fields in rule data for field in ("name", "description", "event_type", "severity", "src_ip", "service"): if field in data and isinstance(data[field], str): data[field] = _sanitize_str(data[field], max_len=256) rule = alerts.add_rule(data) return jsonify({"ok": True, "rule": rule}) @bp.route("/api/honeypot/alerts/rules/", methods=["DELETE"]) def api_hp_delete_alert_rule(rule_id): """Delete an alert rule.""" if not alerts: return jsonify({"error": "Alert engine not configured"}), 500 ok = alerts.delete_rule(rule_id) if ok: return jsonify({"ok": True}) return jsonify({"error": "Rule not found"}), 404 @bp.route("/api/honeypot/alerts/active") def api_hp_active_alerts(): """Get active alerts.""" if not alerts: return jsonify({"alerts": []}) return jsonify({"alerts": alerts.get_active_alerts()}) @bp.route("/api/honeypot/alerts/ack/", methods=["POST"]) def api_hp_ack_alert(alert_id): """Acknowledge an alert.""" if not alerts: return jsonify({"error": "Alert engine not configured"}), 500 ok = alerts.acknowledge_alert(alert_id) if ok: return jsonify({"ok": True}) return jsonify({"error": "Alert not found"}), 404 # ============================================================ # NEW: Credentials, MITRE, Webhooks # ============================================================ @bp.route("/api/honeypot/credentials") def api_hp_credentials(): if not store: return jsonify({"error": "No store"}), 500 return jsonify(store.get_credential_intel()) @bp.route("/api/honeypot/mitre") def api_hp_mitre(): if not store: return jsonify({"error": "No store"}), 500 return jsonify(store.get_mitre_coverage()) @bp.route("/api/honeypot/webhooks") def api_hp_webhooks_get(): if not alerts: return jsonify({"webhooks": []}) return jsonify({"webhooks": alerts.get_webhooks()}) @bp.route("/api/honeypot/webhooks", methods=["POST"]) def api_hp_webhooks_add(): if not alerts: return jsonify({"error": "No alert engine"}), 500 data = request.get_json(silent=True) or {} url = _sanitize_str(data.get("url"), max_len=512) if not url: return jsonify({"error": "URL required"}), 400 name = _sanitize_str(data.get("name"), max_len=128) wh = alerts.add_webhook(url, name) if "error" in wh: return jsonify(wh), 400 return jsonify({"ok": True, "webhook": wh}) @bp.route("/api/honeypot/webhooks/", methods=["DELETE"]) def api_hp_webhooks_delete(wh_id): if not alerts: return jsonify({"error": "No alert engine"}), 500 ok = alerts.remove_webhook(wh_id) return jsonify({"ok": ok}) # ============================================================ # Device & Service State APIs (preserved from v1) # ============================================================ @bp.route("/api/honeypot/devices") def api_hp_devices(): """List connected honeypot devices.""" devices = commander.get_honeypot_devices() return jsonify({"devices": devices, "count": len(devices)}) @bp.route("/api/honeypot/services") def api_hp_services(): """Get cached service states for all devices.""" device_id = request.args.get("device_id", None) states = commander.get_service_states(device_id) return jsonify({ "services": states, "definitions": { "services": {k: {"port": v["port"]} for k, v in HP_SERVICES.items()}, "monitors": list(HP_MONITORS.keys()), }, }) @bp.route("/api/honeypot/history") def api_hp_history(): """Get command history.""" limit = _clamp(request.args.get("limit", 50, type=int), 1, 500) history = commander.get_command_history(limit=limit) return jsonify({"history": history, "count": len(history)}) # ============================================================ # Command Dispatch APIs (POST) — preserved from v1 # ============================================================ @bp.route("/api/honeypot/command", methods=["POST"]) def api_hp_send_command(): """Send a single command.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) command_name = _sanitize_str(data.get("command"), max_len=64) argv = data.get("argv", []) if isinstance(argv, list): argv = [_sanitize_str(a, max_len=128) for a in argv] if not device_id or not command_name: return jsonify({"error": "device_id and command required"}), 400 if command_name not in ALLOWED_COMMANDS: return jsonify({"error": f"Unknown command: {command_name}"}), 400 rid = commander.send_command(device_id, command_name, argv) if rid: return jsonify({"ok": True, "request_id": rid}) return jsonify({"error": "Failed to send (device offline?)"}), 500 @bp.route("/api/honeypot/start_all", methods=["POST"]) def api_hp_start_all(): """Start all services + monitors via single bulk command.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) or None request_ids = [] if device_id: rid = commander.send_command(device_id, "hp_start_all") if rid: request_ids.append(rid) else: rids = commander.send_to_all_devices("hp_start_all") request_ids.extend(rids) return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)}) @bp.route("/api/honeypot/stop_all", methods=["POST"]) def api_hp_stop_all(): """Stop all services + monitors via single bulk command.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) or None request_ids = [] if device_id: rid = commander.send_command(device_id, "hp_stop_all") if rid: request_ids.append(rid) else: rids = commander.send_to_all_devices("hp_stop_all") request_ids.extend(rids) return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)}) @bp.route("/api/honeypot/refresh_status", methods=["POST"]) def api_hp_refresh_status(): """Refresh cached service state.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) or None request_ids = [] for svc_name in HP_SERVICES: if device_id: rid = commander.send_service_command(device_id, svc_name, "status") if rid: request_ids.append(rid) else: rids = commander.send_service_to_all(svc_name, "status") request_ids.extend(rids) for mon_name in HP_MONITORS: if device_id: rid = commander.send_monitor_command(device_id, mon_name, "status") if rid: request_ids.append(rid) else: rids = commander.send_monitor_to_all(mon_name, "status") request_ids.extend(rids) return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)}) # ============================================================ # Kill Chain & Honeytoken APIs # ============================================================ @bp.route("/api/honeypot/killchain") def api_hp_killchain_top(): """Get top attackers by kill chain score.""" limit = _clamp(request.args.get("limit", 20, type=int), 1, 200) results = store.get_kill_chain_top(limit=limit) return jsonify({"attackers": results, "phases": KILL_CHAIN_PHASES}) @bp.route("/api/honeypot/killchain/") def api_hp_killchain_detail(ip): """Get kill chain analysis for a specific IP.""" analysis = store.get_kill_chain_analysis(ip) return jsonify({**analysis, "phase_defs": KILL_CHAIN_PHASES}) @bp.route("/api/honeypot/honeytokens") def api_hp_honeytokens(): """Get honeytoken event statistics.""" return jsonify(store.get_honeytoken_stats()) # ============================================================ # Runtime Config APIs # ============================================================ @bp.route("/api/honeypot/config/") def api_hp_config_list(device_id): """List all runtime config for a device.""" cfg_type = request.args.get("type", "") rid = commander.config_list(device_id, cfg_type) if rid: return jsonify({"ok": True, "request_id": rid}) return jsonify({"error": "Failed to send"}), 500 @bp.route("/api/honeypot/config/", methods=["POST"]) def api_hp_config_set(device_id): """Set a config value.""" data = request.get_json(silent=True) or {} cfg_type = _sanitize_str(data.get("type"), max_len=32) key = _sanitize_str(data.get("key"), max_len=64) value = _sanitize_str(data.get("value"), max_len=256) if not cfg_type or not key: return jsonify({"error": "type and key required"}), 400 # Whitelist cfg_type and key to prevent arbitrary config injection if cfg_type not in ("banner", "threshold"): return jsonify({"error": f"Invalid config type: {cfg_type}"}), 400 if cfg_type == "banner" and key not in CONFIG_BANNER_SERVICES: return jsonify({"error": f"Invalid banner service: {key}"}), 400 if cfg_type == "threshold": if key not in CONFIG_THRESHOLDS: return jsonify({"error": f"Invalid threshold key: {key}"}), 400 # Validate threshold value is a positive integer try: int_val = int(value) if int_val <= 0: raise ValueError except (ValueError, TypeError): return jsonify({"error": "Threshold value must be a positive integer"}), 400 rid = commander.config_set(device_id, cfg_type, key, str(value)) if rid: return jsonify({"ok": True, "request_id": rid}) return jsonify({"error": "Failed to send"}), 500 @bp.route("/api/honeypot/config//reset", methods=["POST"]) def api_hp_config_reset(device_id): """Reset all runtime config to defaults.""" rid = commander.config_reset(device_id) if rid: return jsonify({"ok": True, "request_id": rid}) return jsonify({"error": "Failed to send"}), 500 @bp.route("/api/honeypot/config/meta") def api_hp_config_meta(): """Get config metadata (available banners, thresholds, defaults).""" return jsonify({ "banner_services": CONFIG_BANNER_SERVICES, "thresholds": CONFIG_THRESHOLDS, }) # ============================================================ # OTA Firmware Management # ============================================================ # Firmware storage directory (sibling to hp_dashboard/) _fw_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "firmwares") _SAFE_NAME = re.compile(r'^[\w\-\.]+\.bin$') _MAX_FW_SIZE = 2 * 1024 * 1024 # 2 MB @bp.route("/api/honeypot/ota/upload", methods=["POST"]) def api_hp_ota_upload(): """Upload a firmware .bin file.""" if "file" not in request.files: return jsonify({"error": "No file part"}), 400 f = request.files["file"] if not f.filename or not f.filename.endswith(".bin"): return jsonify({"error": "Only .bin files accepted"}), 400 safe_name = f.filename.replace(" ", "_") if not _SAFE_NAME.match(safe_name): return jsonify({"error": "Invalid filename"}), 400 os.makedirs(_fw_dir, exist_ok=True) # Read into memory to check size before writing data = f.read() if len(data) > _MAX_FW_SIZE: return jsonify({"error": f"File too large ({len(data)} bytes, max {_MAX_FW_SIZE})"}), 400 if len(data) == 0: return jsonify({"error": "Empty file"}), 400 # Atomic write: write to temp file first, then rename into place. # This prevents serving a partially-written firmware if a reader # hits the file mid-upload (or if the process crashes). path = os.path.join(_fw_dir, safe_name) fd, tmp_path = tempfile.mkstemp(dir=_fw_dir, suffix=".tmp") try: with os.fdopen(fd, "wb") as out: out.write(data) os.rename(tmp_path, path) except BaseException: # Clean up temp file on any failure try: os.unlink(tmp_path) except OSError: pass raise # Compute and store SHA256 hash for firmware integrity verification sha256_hex = hashlib.sha256(data).hexdigest() sha_path = path + ".sha256" with open(sha_path, "w") as sf: sf.write(sha256_hex) return jsonify({"ok": True, "filename": safe_name, "size": len(data), "sha256": sha256_hex}) @bp.route("/api/honeypot/ota/firmwares") def api_hp_ota_firmwares(): """List available firmware files.""" if not os.path.isdir(_fw_dir): return jsonify({"firmwares": []}) firmwares = [] for name in sorted(os.listdir(_fw_dir)): if name.endswith(".bin"): path = os.path.join(_fw_dir, name) stat = os.stat(path) fw_entry = { "name": name, "size": stat.st_size, "uploaded_at": stat.st_mtime, } sha_path = path + ".sha256" if os.path.isfile(sha_path): try: with open(sha_path) as sf: fw_entry["sha256"] = sf.read().strip() except OSError: pass firmwares.append(fw_entry) return jsonify({"firmwares": firmwares}) @bp.route("/api/honeypot/ota/firmware/") @bp.route("/fw/") def api_hp_ota_firmware_download(filename): """Serve a firmware binary (called by ESP32 during OTA).""" if not _SAFE_NAME.match(filename): return jsonify({"error": "Invalid filename"}), 400 if not os.path.isdir(_fw_dir): return jsonify({"error": "No firmwares directory"}), 404 path = os.path.join(_fw_dir, filename) if not os.path.isfile(path): return jsonify({"error": "Firmware not found"}), 404 return send_from_directory(_fw_dir, filename, mimetype="application/octet-stream") @bp.route("/api/honeypot/ota/firmware/", methods=["DELETE"]) def api_hp_ota_firmware_delete(filename): """Delete a firmware file.""" if not _SAFE_NAME.match(filename): return jsonify({"error": "Invalid filename"}), 400 path = os.path.join(_fw_dir, filename) if not os.path.isfile(path): return jsonify({"error": "Firmware not found"}), 404 os.remove(path) sha_path = path + ".sha256" if os.path.isfile(sha_path): os.remove(sha_path) return jsonify({"ok": True}) @bp.route("/api/honeypot/ota/flash", methods=["POST"]) def api_hp_ota_flash(): """Trigger OTA update on a device.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) filename = _sanitize_str(data.get("filename"), max_len=128) if not device_id or not filename: return jsonify({"error": "device_id and filename required"}), 400 if not _SAFE_NAME.match(filename): return jsonify({"error": "Invalid filename"}), 400 path = os.path.join(_fw_dir, filename) if not os.path.isfile(path): return jsonify({"error": "Firmware not found"}), 404 # Build a short download URL (argv max 64 bytes in nanopb). # Use the real server IP so ESP32 can reach it (not "localhost"). import socket as _sock host = request.host hostname = host.split(":")[0] port = host.split(":")[1] if ":" in host else "5000" if hostname in ("localhost", "127.0.0.1"): try: s = _sock.socket(_sock.AF_INET, _sock.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) hostname = s.getsockname()[0] s.close() except Exception: pass # HP_OTA_HTTPS: Set to "false" to use http:// instead of https:// # Defaults to "true" for secure firmware downloads use_https = os.environ.get("HP_OTA_HTTPS", "true").lower() == "true" scheme = "https" if use_https else "http" fw_url = f"{scheme}://{hostname}:{port}/fw/{filename}" # Enforce nanopb argv limit (64 bytes max per argument) if len(fw_url.encode("utf-8")) > 64: return jsonify({"error": f"Firmware URL too long ({len(fw_url)} chars, max 64)"}), 400 # Read or compute SHA256 hash for firmware integrity verification sha_path = path + ".sha256" if os.path.isfile(sha_path): with open(sha_path) as sf: sha256_hex = sf.read().strip() else: with open(path, "rb") as bf: sha256_hex = hashlib.sha256(bf.read()).hexdigest() rid = commander.send_command(device_id, "ota_update", [fw_url, sha256_hex]) if rid: return jsonify({"ok": True, "request_id": rid, "url": fw_url}) return jsonify({"error": "Failed to send (device offline?)"}), 500 @bp.route("/api/honeypot/ota/rollback", methods=["POST"]) def api_hp_ota_rollback(): """Trigger firmware rollback on a device.""" data = request.get_json(silent=True) or {} device_id = _sanitize_str(data.get("device_id"), max_len=64) if not device_id: return jsonify({"error": "device_id required"}), 400 rid = commander.send_command(device_id, "ota_rollback") if rid: return jsonify({"ok": True, "request_id": rid}) return jsonify({"error": "Failed to send (device offline?)"}), 500 # ============================================================ # Geo-IP API # ============================================================ @bp.route("/api/honeypot/geo/") def api_hp_geo(ip): """Get geo-IP and vendor info for an IP address.""" if not geo: return jsonify({"error": "Geo lookup not configured"}), 501 result = geo.lookup_ip(ip) # Also look up vendor if we have a MAC for this IP with store.lock: atk = store.attackers.get(ip, {}) result["vendor"] = geo.lookup_mac_vendor(atk.get("mac", "")) return jsonify(result) # ============================================================ # Health / Monitoring # ============================================================ _bp_start_time = time.time() @bp.route("/api/honeypot/health") def api_hp_health(): """Health check endpoint for monitoring (Prometheus, Uptime Kuma, etc.).""" stats = store.get_stats() now = time.time() # Events in the last hour one_hour_ago = now - 3600 try: conn = store._get_conn() row = conn.execute( "SELECT COUNT(*) as cnt FROM events WHERE timestamp >= ?", (one_hour_ago,) ).fetchone() events_last_hour = row["cnt"] if row else 0 except Exception: events_last_hour = 0 # DB file size db_size_mb = 0 try: db_size_mb = round(os.path.getsize(store.db_path) / (1024 * 1024), 2) except OSError: pass # SSE client count with store.lock: sse_clients = len(store._sse_queues) # Device count devices_connected = 0 try: registry = commander._get_registry() if registry: devices_connected = len(registry.all()) except Exception: pass return jsonify({ "status": "ok", "uptime_s": round(now - _bp_start_time), "devices_connected": devices_connected, "events_total": stats.get("total_events", 0), "events_last_hour": events_last_hour, "alerts_unacked": alerts.get_unacknowledged_count() if alerts else 0, "db_size_mb": db_size_mb, "sse_clients": sse_clients, }) # ============================================================ # Dashboard HTML # ============================================================ @bp.route("/honeypot") def hp_dashboard(): """Serve the honeypot management dashboard.""" static_url = url_for("honeypot.static", filename="") return render_template("honeypot.html", static_url=static_url, active_page="honeypot") # ============================================================ # Apply require_login to all API routes (except public ones) # ============================================================ for endpoint, view_func in list(bp.view_functions.items()): if endpoint not in _PUBLIC_ENDPOINTS: bp.view_functions[endpoint] = require_login(view_func) return bp