espilon-source/tools/C3PO/hp_dashboard/hp_routes.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

845 lines
33 KiB
Python

"""
Honeypot Dashboard Blueprint — Flask routes for C3PO integration.
Usage in C3PO server.py:
from hp_dashboard import create_hp_blueprint, HpStore, HpCommander, HpAlertEngine
hp_store = HpStore()
hp_alerts = HpAlertEngine()
hp_commander = HpCommander(
get_transport=lambda: transport,
get_registry=lambda: self.device_registry,
)
app.register_blueprint(create_hp_blueprint({
**base_config,
"hp_store": hp_store,
"hp_commander": hp_commander,
"hp_alerts": hp_alerts,
}))
"""
import os
import re
import time
import json
import hmac
import hashlib
import secrets
import tempfile
from collections import deque
from flask import Blueprint, jsonify, request, render_template, url_for, Response, send_from_directory
from .hp_commander import (HP_SERVICES, HP_MONITORS, SYSTEM_COMMANDS,
ALLOWED_COMMANDS, CONFIG_BANNER_SERVICES,
CONFIG_THRESHOLDS)
from .hp_store import KILL_CHAIN_PHASES
def _clamp(val, lo, hi):
"""Clamp a value between lo and hi."""
return max(lo, min(hi, val))
def _sanitize_str(val, max_len=256, default=""):
"""Strip and length-bound a string input. Returns default if not a string."""
if not isinstance(val, str):
return default
return val.strip()[:max_len]
def create_hp_blueprint(server_config):
"""
Create the honeypot dashboard blueprint.
server_config keys:
- hp_store: HpStore instance
- hp_commander: HpCommander instance
- hp_alerts: HpAlertEngine instance (optional)
- require_login (optional): auth decorator for HTML pages
"""
bp = Blueprint("honeypot", __name__,
static_folder="static",
static_url_path="/hp-static")
store = server_config["hp_store"]
commander = server_config["hp_commander"]
alerts = server_config.get("hp_alerts")
geo = server_config.get("hp_geo") # Optional HpGeoLookup
require_login = server_config.get("require_login", lambda f: f)
# Detect C3PO mode (base_config includes c2_root when running inside C3PO)
c3po_nav = "c2_root" in server_config
# Wire alert engine to store for kill chain analysis
if alerts and hasattr(alerts, "set_store"):
alerts.set_store(store)
# Endpoints exempt from auth (called by devices, not browsers)
_PUBLIC_ENDPOINTS = frozenset({
"api_hp_ota_firmware_download", "api_hp_health", "api_hp_csrf_token",
})
# ------------------------------------------------------------------
# CSRF protection — token-based for all state-changing requests
# ------------------------------------------------------------------
_csrf_secret = secrets.token_bytes(32)
def _csrf_generate():
"""Generate a signed CSRF token."""
nonce = secrets.token_hex(16)
sig = hmac.new(_csrf_secret, nonce.encode(), hashlib.sha256).hexdigest()[:32]
return f"{nonce}.{sig}"
def _csrf_validate(token):
"""Validate a CSRF token signature."""
if not token or "." not in token:
return False
nonce, sig = token.rsplit(".", 1)
expected = hmac.new(_csrf_secret, nonce.encode(), hashlib.sha256).hexdigest()[:32]
return hmac.compare_digest(sig, expected)
@bp.before_request
def _csrf_check():
"""Enforce CSRF token on state-changing methods."""
if request.method in ("GET", "HEAD", "OPTIONS"):
return None
# Public endpoints skip CSRF (device-to-server)
if request.endpoint in _PUBLIC_ENDPOINTS:
return None
# File uploads use multipart — accept token from form field or header
token = (request.headers.get("X-CSRF-Token")
or (request.form.get("_csrf_token") if request.form else None))
if not _csrf_validate(token):
return jsonify({"error": "CSRF token missing or invalid"}), 403
@bp.route("/api/honeypot/csrf-token")
def api_hp_csrf_token():
"""Get a CSRF token for subsequent POST requests."""
return jsonify({"token": _csrf_generate()})
# ============================================================
# READ-ONLY API (preserved from v1)
# ============================================================
@bp.route("/api/honeypot/events")
def api_hp_events():
"""Get recent honeypot events."""
limit = _clamp(request.args.get("limit", 100, type=int), 1, 1000)
event_type = request.args.get("type", None)
severity = request.args.get("severity", None)
src_ip = request.args.get("ip", None)
service = request.args.get("service", None)
offset = max(0, request.args.get("offset", 0, type=int))
events = store.get_recent_events(
limit=limit, event_type=event_type, severity=severity,
src_ip=src_ip, service=service, offset=offset,
)
return jsonify({"events": events, "count": len(events)})
@bp.route("/api/honeypot/stats")
def api_hp_stats():
"""Get aggregated honeypot statistics."""
stats = store.get_stats()
if alerts:
stats["alert_count"] = alerts.get_unacknowledged_count()
return jsonify(stats)
@bp.route("/api/honeypot/attackers")
def api_hp_attackers():
"""Get top attackers."""
limit = _clamp(request.args.get("limit", 50, type=int), 1, 1000)
attackers = store.get_attackers(limit=limit)
return jsonify({"attackers": attackers, "count": len(attackers)})
# ============================================================
# NEW: Event Detail + Attacker Profile
# ============================================================
@bp.route("/api/honeypot/events/<int:event_id>")
def api_hp_event_detail(event_id):
"""Get single event with related events."""
evt = store.get_event_by_id(event_id)
if not evt:
return jsonify({"error": "Not found"}), 404
return jsonify(evt)
@bp.route("/api/honeypot/attacker/<ip>")
def api_hp_attacker_profile(ip):
"""Full attacker profile."""
profile = store.get_attacker_profile(ip)
return jsonify(profile)
# ============================================================
# NEW: Sessions
# ============================================================
@bp.route("/api/honeypot/sessions")
def api_hp_sessions():
"""Get grouped sessions."""
limit = _clamp(request.args.get("limit", 50, type=int), 1, 1000)
src_ip = request.args.get("ip", None)
sessions = store.get_sessions(limit=limit, src_ip=src_ip)
return jsonify({"sessions": sessions, "count": len(sessions)})
@bp.route("/api/honeypot/sessions/<session_id>")
def api_hp_session_events(session_id):
"""Get all events for a session."""
events = store.get_session_events(session_id)
return jsonify({"events": events, "count": len(events)})
# ============================================================
# NEW: Search + Timeline + Export
# ============================================================
@bp.route("/api/honeypot/search")
def api_hp_search():
"""Multi-criteria search."""
q_raw = request.args.get("q")
q_clean = _sanitize_str(q_raw, max_len=200) if q_raw else None
events = store.search(
q=q_clean,
event_type=_sanitize_str(request.args.get("type"), max_len=64),
severity=_sanitize_str(request.args.get("severity"), max_len=16),
src_ip=_sanitize_str(request.args.get("ip"), max_len=45),
service=_sanitize_str(request.args.get("service"), max_len=32),
from_ts=request.args.get("from", type=float),
to_ts=request.args.get("to", type=float),
limit=_clamp(request.args.get("limit", 100, type=int), 1, 1000),
offset=_clamp(request.args.get("offset", 0, type=int), 0, 100000),
)
return jsonify({"events": events, "count": len(events)})
@bp.route("/api/honeypot/timeline")
def api_hp_timeline():
"""Event counts per time bucket by severity."""
hours = _clamp(request.args.get("hours", 24, type=int), 1, 168)
bucket = _clamp(request.args.get("bucket", 5, type=int), 1, 60)
data = store.get_timeline(hours=hours, bucket_minutes=bucket)
return jsonify({"timeline": data})
@bp.route("/api/honeypot/export")
def api_hp_export():
"""Export events as CSV or JSON."""
fmt = request.args.get("format", "json")
content = store.export_events(
fmt=fmt,
q=request.args.get("q"),
event_type=request.args.get("type"),
severity=request.args.get("severity"),
src_ip=request.args.get("ip"),
service=request.args.get("service"),
from_ts=request.args.get("from", type=float),
to_ts=request.args.get("to", type=float),
)
if fmt == "csv":
return Response(
content,
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=honeypot_events.csv"},
)
return Response(content, mimetype="application/json")
# ============================================================
# NEW: SSE Event Stream
# ============================================================
@bp.route("/api/honeypot/stream")
def api_hp_stream():
"""Server-Sent Events stream for real-time updates."""
_sev_map = {"LOW": 0, "MEDIUM": 1, "HIGH": 2, "CRITICAL": 3}
min_sev_name = request.args.get("min_severity", "MEDIUM").upper()
min_sev = _sev_map.get(min_sev_name, 1)
def generate():
q = deque(maxlen=100)
store.register_sse_queue(q)
try:
# Send initial keepalive
yield "data: {\"type\":\"connected\"}\n\n"
while True:
if q:
evt = q.popleft()
evt_sev = _sev_map.get((evt.get("severity") or "LOW").upper(), 0)
if evt_sev < min_sev:
continue
yield f"data: {json.dumps(evt, default=str)}\n\n"
else:
# Keepalive every 15s
yield ": keepalive\n\n"
# Use a short sleep to avoid busy-wait
import time as _time
_time.sleep(0.5)
except GeneratorExit:
pass
finally:
store.unregister_sse_queue(q)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
# ============================================================
# NEW: Alert Rules & Active Alerts
# ============================================================
@bp.route("/api/honeypot/alerts/rules")
def api_hp_alert_rules():
"""List alert rules."""
if not alerts:
return jsonify({"rules": []})
return jsonify({"rules": alerts.get_rules()})
@bp.route("/api/honeypot/alerts/rules", methods=["POST"])
def api_hp_add_alert_rule():
"""Add a custom alert rule."""
if not alerts:
return jsonify({"error": "Alert engine not configured"}), 500
data = request.get_json(silent=True) or {}
# Sanitize string fields in rule data
for field in ("name", "description", "event_type", "severity",
"src_ip", "service"):
if field in data and isinstance(data[field], str):
data[field] = _sanitize_str(data[field], max_len=256)
rule = alerts.add_rule(data)
return jsonify({"ok": True, "rule": rule})
@bp.route("/api/honeypot/alerts/rules/<rule_id>", methods=["DELETE"])
def api_hp_delete_alert_rule(rule_id):
"""Delete an alert rule."""
if not alerts:
return jsonify({"error": "Alert engine not configured"}), 500
ok = alerts.delete_rule(rule_id)
if ok:
return jsonify({"ok": True})
return jsonify({"error": "Rule not found"}), 404
@bp.route("/api/honeypot/alerts/active")
def api_hp_active_alerts():
"""Get active alerts."""
if not alerts:
return jsonify({"alerts": []})
return jsonify({"alerts": alerts.get_active_alerts()})
@bp.route("/api/honeypot/alerts/ack/<int:alert_id>", methods=["POST"])
def api_hp_ack_alert(alert_id):
"""Acknowledge an alert."""
if not alerts:
return jsonify({"error": "Alert engine not configured"}), 500
ok = alerts.acknowledge_alert(alert_id)
if ok:
return jsonify({"ok": True})
return jsonify({"error": "Alert not found"}), 404
# ============================================================
# NEW: Credentials, MITRE, Webhooks
# ============================================================
@bp.route("/api/honeypot/credentials")
def api_hp_credentials():
if not store:
return jsonify({"error": "No store"}), 500
return jsonify(store.get_credential_intel())
@bp.route("/api/honeypot/mitre")
def api_hp_mitre():
if not store:
return jsonify({"error": "No store"}), 500
return jsonify(store.get_mitre_coverage())
@bp.route("/api/honeypot/webhooks")
def api_hp_webhooks_get():
if not alerts:
return jsonify({"webhooks": []})
return jsonify({"webhooks": alerts.get_webhooks()})
@bp.route("/api/honeypot/webhooks", methods=["POST"])
def api_hp_webhooks_add():
if not alerts:
return jsonify({"error": "No alert engine"}), 500
data = request.get_json(silent=True) or {}
url = _sanitize_str(data.get("url"), max_len=512)
if not url:
return jsonify({"error": "URL required"}), 400
name = _sanitize_str(data.get("name"), max_len=128)
wh = alerts.add_webhook(url, name)
if "error" in wh:
return jsonify(wh), 400
return jsonify({"ok": True, "webhook": wh})
@bp.route("/api/honeypot/webhooks/<int:wh_id>", methods=["DELETE"])
def api_hp_webhooks_delete(wh_id):
if not alerts:
return jsonify({"error": "No alert engine"}), 500
ok = alerts.remove_webhook(wh_id)
return jsonify({"ok": ok})
# ============================================================
# Device & Service State APIs (preserved from v1)
# ============================================================
@bp.route("/api/honeypot/devices")
def api_hp_devices():
"""List connected honeypot devices."""
devices = commander.get_honeypot_devices()
return jsonify({"devices": devices, "count": len(devices)})
@bp.route("/api/honeypot/services")
def api_hp_services():
"""Get cached service states for all devices."""
device_id = request.args.get("device_id", None)
states = commander.get_service_states(device_id)
return jsonify({
"services": states,
"definitions": {
"services": {k: {"port": v["port"]} for k, v in HP_SERVICES.items()},
"monitors": list(HP_MONITORS.keys()),
},
})
@bp.route("/api/honeypot/history")
def api_hp_history():
"""Get command history."""
limit = _clamp(request.args.get("limit", 50, type=int), 1, 500)
history = commander.get_command_history(limit=limit)
return jsonify({"history": history, "count": len(history)})
# ============================================================
# Command Dispatch APIs (POST) — preserved from v1
# ============================================================
@bp.route("/api/honeypot/command", methods=["POST"])
def api_hp_send_command():
"""Send a single command."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64)
command_name = _sanitize_str(data.get("command"), max_len=64)
argv = data.get("argv", [])
if isinstance(argv, list):
argv = [_sanitize_str(a, max_len=128) for a in argv]
if not device_id or not command_name:
return jsonify({"error": "device_id and command required"}), 400
if command_name not in ALLOWED_COMMANDS:
return jsonify({"error": f"Unknown command: {command_name}"}), 400
rid = commander.send_command(device_id, command_name, argv)
if rid:
return jsonify({"ok": True, "request_id": rid})
return jsonify({"error": "Failed to send (device offline?)"}), 500
@bp.route("/api/honeypot/start_all", methods=["POST"])
def api_hp_start_all():
"""Start all services + monitors via single bulk command."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64) or None
request_ids = []
if device_id:
rid = commander.send_command(device_id, "hp_start_all")
if rid:
request_ids.append(rid)
else:
rids = commander.send_to_all_devices("hp_start_all")
request_ids.extend(rids)
return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)})
@bp.route("/api/honeypot/stop_all", methods=["POST"])
def api_hp_stop_all():
"""Stop all services + monitors via single bulk command."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64) or None
request_ids = []
if device_id:
rid = commander.send_command(device_id, "hp_stop_all")
if rid:
request_ids.append(rid)
else:
rids = commander.send_to_all_devices("hp_stop_all")
request_ids.extend(rids)
return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)})
@bp.route("/api/honeypot/refresh_status", methods=["POST"])
def api_hp_refresh_status():
"""Refresh cached service state."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64) or None
request_ids = []
for svc_name in HP_SERVICES:
if device_id:
rid = commander.send_service_command(device_id, svc_name, "status")
if rid:
request_ids.append(rid)
else:
rids = commander.send_service_to_all(svc_name, "status")
request_ids.extend(rids)
for mon_name in HP_MONITORS:
if device_id:
rid = commander.send_monitor_command(device_id, mon_name, "status")
if rid:
request_ids.append(rid)
else:
rids = commander.send_monitor_to_all(mon_name, "status")
request_ids.extend(rids)
return jsonify({"ok": True, "request_ids": request_ids, "count": len(request_ids)})
# ============================================================
# Kill Chain & Honeytoken APIs
# ============================================================
@bp.route("/api/honeypot/killchain")
def api_hp_killchain_top():
"""Get top attackers by kill chain score."""
limit = _clamp(request.args.get("limit", 20, type=int), 1, 200)
results = store.get_kill_chain_top(limit=limit)
return jsonify({"attackers": results, "phases": KILL_CHAIN_PHASES})
@bp.route("/api/honeypot/killchain/<ip>")
def api_hp_killchain_detail(ip):
"""Get kill chain analysis for a specific IP."""
analysis = store.get_kill_chain_analysis(ip)
return jsonify({**analysis, "phase_defs": KILL_CHAIN_PHASES})
@bp.route("/api/honeypot/honeytokens")
def api_hp_honeytokens():
"""Get honeytoken event statistics."""
return jsonify(store.get_honeytoken_stats())
# ============================================================
# Runtime Config APIs
# ============================================================
@bp.route("/api/honeypot/config/<device_id>")
def api_hp_config_list(device_id):
"""List all runtime config for a device."""
cfg_type = request.args.get("type", "")
rid = commander.config_list(device_id, cfg_type)
if rid:
return jsonify({"ok": True, "request_id": rid})
return jsonify({"error": "Failed to send"}), 500
@bp.route("/api/honeypot/config/<device_id>", methods=["POST"])
def api_hp_config_set(device_id):
"""Set a config value."""
data = request.get_json(silent=True) or {}
cfg_type = _sanitize_str(data.get("type"), max_len=32)
key = _sanitize_str(data.get("key"), max_len=64)
value = _sanitize_str(data.get("value"), max_len=256)
if not cfg_type or not key:
return jsonify({"error": "type and key required"}), 400
# Whitelist cfg_type and key to prevent arbitrary config injection
if cfg_type not in ("banner", "threshold"):
return jsonify({"error": f"Invalid config type: {cfg_type}"}), 400
if cfg_type == "banner" and key not in CONFIG_BANNER_SERVICES:
return jsonify({"error": f"Invalid banner service: {key}"}), 400
if cfg_type == "threshold":
if key not in CONFIG_THRESHOLDS:
return jsonify({"error": f"Invalid threshold key: {key}"}), 400
# Validate threshold value is a positive integer
try:
int_val = int(value)
if int_val <= 0:
raise ValueError
except (ValueError, TypeError):
return jsonify({"error": "Threshold value must be a positive integer"}), 400
rid = commander.config_set(device_id, cfg_type, key, str(value))
if rid:
return jsonify({"ok": True, "request_id": rid})
return jsonify({"error": "Failed to send"}), 500
@bp.route("/api/honeypot/config/<device_id>/reset", methods=["POST"])
def api_hp_config_reset(device_id):
"""Reset all runtime config to defaults."""
rid = commander.config_reset(device_id)
if rid:
return jsonify({"ok": True, "request_id": rid})
return jsonify({"error": "Failed to send"}), 500
@bp.route("/api/honeypot/config/meta")
def api_hp_config_meta():
"""Get config metadata (available banners, thresholds, defaults)."""
return jsonify({
"banner_services": CONFIG_BANNER_SERVICES,
"thresholds": CONFIG_THRESHOLDS,
})
# ============================================================
# OTA Firmware Management
# ============================================================
# Firmware storage directory (sibling to hp_dashboard/)
_fw_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"firmwares")
_SAFE_NAME = re.compile(r'^[\w\-\.]+\.bin$')
_MAX_FW_SIZE = 2 * 1024 * 1024 # 2 MB
@bp.route("/api/honeypot/ota/upload", methods=["POST"])
def api_hp_ota_upload():
"""Upload a firmware .bin file."""
if "file" not in request.files:
return jsonify({"error": "No file part"}), 400
f = request.files["file"]
if not f.filename or not f.filename.endswith(".bin"):
return jsonify({"error": "Only .bin files accepted"}), 400
safe_name = f.filename.replace(" ", "_")
if not _SAFE_NAME.match(safe_name):
return jsonify({"error": "Invalid filename"}), 400
os.makedirs(_fw_dir, exist_ok=True)
# Read into memory to check size before writing
data = f.read()
if len(data) > _MAX_FW_SIZE:
return jsonify({"error": f"File too large ({len(data)} bytes, max {_MAX_FW_SIZE})"}), 400
if len(data) == 0:
return jsonify({"error": "Empty file"}), 400
# Atomic write: write to temp file first, then rename into place.
# This prevents serving a partially-written firmware if a reader
# hits the file mid-upload (or if the process crashes).
path = os.path.join(_fw_dir, safe_name)
fd, tmp_path = tempfile.mkstemp(dir=_fw_dir, suffix=".tmp")
try:
with os.fdopen(fd, "wb") as out:
out.write(data)
os.rename(tmp_path, path)
except BaseException:
# Clean up temp file on any failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# Compute and store SHA256 hash for firmware integrity verification
sha256_hex = hashlib.sha256(data).hexdigest()
sha_path = path + ".sha256"
with open(sha_path, "w") as sf:
sf.write(sha256_hex)
return jsonify({"ok": True, "filename": safe_name, "size": len(data),
"sha256": sha256_hex})
@bp.route("/api/honeypot/ota/firmwares")
def api_hp_ota_firmwares():
"""List available firmware files."""
if not os.path.isdir(_fw_dir):
return jsonify({"firmwares": []})
firmwares = []
for name in sorted(os.listdir(_fw_dir)):
if name.endswith(".bin"):
path = os.path.join(_fw_dir, name)
stat = os.stat(path)
fw_entry = {
"name": name,
"size": stat.st_size,
"uploaded_at": stat.st_mtime,
}
sha_path = path + ".sha256"
if os.path.isfile(sha_path):
try:
with open(sha_path) as sf:
fw_entry["sha256"] = sf.read().strip()
except OSError:
pass
firmwares.append(fw_entry)
return jsonify({"firmwares": firmwares})
@bp.route("/api/honeypot/ota/firmware/<filename>")
@bp.route("/fw/<filename>")
def api_hp_ota_firmware_download(filename):
"""Serve a firmware binary (called by ESP32 during OTA)."""
if not _SAFE_NAME.match(filename):
return jsonify({"error": "Invalid filename"}), 400
if not os.path.isdir(_fw_dir):
return jsonify({"error": "No firmwares directory"}), 404
path = os.path.join(_fw_dir, filename)
if not os.path.isfile(path):
return jsonify({"error": "Firmware not found"}), 404
return send_from_directory(_fw_dir, filename,
mimetype="application/octet-stream")
@bp.route("/api/honeypot/ota/firmware/<filename>", methods=["DELETE"])
def api_hp_ota_firmware_delete(filename):
"""Delete a firmware file."""
if not _SAFE_NAME.match(filename):
return jsonify({"error": "Invalid filename"}), 400
path = os.path.join(_fw_dir, filename)
if not os.path.isfile(path):
return jsonify({"error": "Firmware not found"}), 404
os.remove(path)
sha_path = path + ".sha256"
if os.path.isfile(sha_path):
os.remove(sha_path)
return jsonify({"ok": True})
@bp.route("/api/honeypot/ota/flash", methods=["POST"])
def api_hp_ota_flash():
"""Trigger OTA update on a device."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64)
filename = _sanitize_str(data.get("filename"), max_len=128)
if not device_id or not filename:
return jsonify({"error": "device_id and filename required"}), 400
if not _SAFE_NAME.match(filename):
return jsonify({"error": "Invalid filename"}), 400
path = os.path.join(_fw_dir, filename)
if not os.path.isfile(path):
return jsonify({"error": "Firmware not found"}), 404
# Build a short download URL (argv max 64 bytes in nanopb).
# Use the real server IP so ESP32 can reach it (not "localhost").
import socket as _sock
host = request.host
hostname = host.split(":")[0]
port = host.split(":")[1] if ":" in host else "5000"
if hostname in ("localhost", "127.0.0.1"):
try:
s = _sock.socket(_sock.AF_INET, _sock.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
hostname = s.getsockname()[0]
s.close()
except Exception:
pass
# HP_OTA_HTTPS: Set to "false" to use http:// instead of https://
# Defaults to "true" for secure firmware downloads
use_https = os.environ.get("HP_OTA_HTTPS", "true").lower() == "true"
scheme = "https" if use_https else "http"
fw_url = f"{scheme}://{hostname}:{port}/fw/{filename}"
# Enforce nanopb argv limit (64 bytes max per argument)
if len(fw_url.encode("utf-8")) > 64:
return jsonify({"error": f"Firmware URL too long ({len(fw_url)} chars, max 64)"}), 400
# Read or compute SHA256 hash for firmware integrity verification
sha_path = path + ".sha256"
if os.path.isfile(sha_path):
with open(sha_path) as sf:
sha256_hex = sf.read().strip()
else:
with open(path, "rb") as bf:
sha256_hex = hashlib.sha256(bf.read()).hexdigest()
rid = commander.send_command(device_id, "ota_update", [fw_url, sha256_hex])
if rid:
return jsonify({"ok": True, "request_id": rid, "url": fw_url})
return jsonify({"error": "Failed to send (device offline?)"}), 500
@bp.route("/api/honeypot/ota/rollback", methods=["POST"])
def api_hp_ota_rollback():
"""Trigger firmware rollback on a device."""
data = request.get_json(silent=True) or {}
device_id = _sanitize_str(data.get("device_id"), max_len=64)
if not device_id:
return jsonify({"error": "device_id required"}), 400
rid = commander.send_command(device_id, "ota_rollback")
if rid:
return jsonify({"ok": True, "request_id": rid})
return jsonify({"error": "Failed to send (device offline?)"}), 500
# ============================================================
# Geo-IP API
# ============================================================
@bp.route("/api/honeypot/geo/<ip>")
def api_hp_geo(ip):
"""Get geo-IP and vendor info for an IP address."""
if not geo:
return jsonify({"error": "Geo lookup not configured"}), 501
result = geo.lookup_ip(ip)
# Also look up vendor if we have a MAC for this IP
with store.lock:
atk = store.attackers.get(ip, {})
result["vendor"] = geo.lookup_mac_vendor(atk.get("mac", ""))
return jsonify(result)
# ============================================================
# Health / Monitoring
# ============================================================
_bp_start_time = time.time()
@bp.route("/api/honeypot/health")
def api_hp_health():
"""Health check endpoint for monitoring (Prometheus, Uptime Kuma, etc.)."""
stats = store.get_stats()
now = time.time()
# Events in the last hour
one_hour_ago = now - 3600
try:
conn = store._get_conn()
row = conn.execute(
"SELECT COUNT(*) as cnt FROM events WHERE timestamp >= ?",
(one_hour_ago,)
).fetchone()
events_last_hour = row["cnt"] if row else 0
except Exception:
events_last_hour = 0
# DB file size
db_size_mb = 0
try:
db_size_mb = round(os.path.getsize(store.db_path) / (1024 * 1024), 2)
except OSError:
pass
# SSE client count
with store.lock:
sse_clients = len(store._sse_queues)
# Device count
devices_connected = 0
try:
registry = commander._get_registry()
if registry:
devices_connected = len(registry.all())
except Exception:
pass
return jsonify({
"status": "ok",
"uptime_s": round(now - _bp_start_time),
"devices_connected": devices_connected,
"events_total": stats.get("total_events", 0),
"events_last_hour": events_last_hour,
"alerts_unacked": alerts.get_unacknowledged_count() if alerts else 0,
"db_size_mb": db_size_mb,
"sse_clients": sse_clients,
})
# ============================================================
# Dashboard HTML
# ============================================================
@bp.route("/honeypot")
def hp_dashboard():
"""Serve the honeypot management dashboard."""
static_url = url_for("honeypot.static", filename="")
return render_template("honeypot.html", static_url=static_url,
active_page="honeypot")
# ============================================================
# Apply require_login to all API routes (except public ones)
# ============================================================
for endpoint, view_func in list(bp.view_functions.items()):
if endpoint not in _PUBLIC_ENDPOINTS:
bp.view_functions[endpoint] = require_login(view_func)
return bp