espilon-source/tools/C3PO/web/routes/api_monitor.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

217 lines
6.8 KiB
Python

"""Serial monitor API — stream ESP32 serial logs via SSE."""
import os
import threading
import time
import queue
from flask import Blueprint, Response, jsonify, request
# TTY port → device_id mapping (set at startup from deploy.json or manually)
_port_map = {}
_port_map_lock = threading.Lock()
# Active serial readers: port → { "thread": ..., "subscribers": [...], "stop": Event }
_readers = {}
_readers_lock = threading.Lock()
BAUD_RATE = 115200
def _serial_reader_thread(port, stop_event, subscribers, subscribers_lock):
"""Background thread that reads from a serial port and fans out to subscribers."""
import serial
try:
ser = serial.Serial(port, BAUD_RATE, timeout=0.5)
except Exception as e:
msg = f"[monitor] Failed to open {port}: {e}\n"
with subscribers_lock:
for q in subscribers:
q.put(msg)
return
buf = b""
while not stop_event.is_set():
try:
chunk = ser.read(256)
if not chunk:
continue
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
text = line.decode("utf-8", errors="replace").rstrip("\r")
with subscribers_lock:
dead = []
for i, q in enumerate(subscribers):
try:
q.put_nowait(text)
except queue.Full:
dead.append(i)
for i in reversed(dead):
subscribers.pop(i)
except Exception:
if stop_event.is_set():
break
time.sleep(0.1)
try:
ser.close()
except Exception:
pass
def _ensure_reader(port):
"""Start a serial reader thread for a port if not already running."""
with _readers_lock:
if port in _readers and _readers[port]["thread"].is_alive():
return _readers[port]
stop_event = threading.Event()
subscribers = []
subs_lock = threading.Lock()
t = threading.Thread(
target=_serial_reader_thread,
args=(port, stop_event, subscribers, subs_lock),
daemon=True,
)
t.start()
_readers[port] = {
"thread": t,
"stop": stop_event,
"subscribers": subscribers,
"subscribers_lock": subs_lock,
}
return _readers[port]
def _stop_reader(port):
"""Stop a serial reader thread."""
with _readers_lock:
if port in _readers:
_readers[port]["stop"].set()
del _readers[port]
def load_port_map_from_deploy(c2_root):
"""Try to load port→device_id mapping from deploy.json."""
import json
deploy_path = os.path.join(os.path.dirname(c2_root), "deploy.json")
try:
with open(deploy_path) as f:
cfg = json.load(f)
for dev in cfg.get("devices", []):
port = dev.get("port", "")
did = dev.get("device_id", "")
if port and did:
_port_map[port] = did
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
def create_monitor_blueprint(server_config):
"""
Create the serial monitor API blueprint.
Args:
server_config: Dict with keys:
- get_device_registry: Callable returning device registry
- require_api_auth: Auth decorator
- require_login: Login decorator (for SSE)
"""
bp = Blueprint("api_monitor", __name__, url_prefix="/api")
get_registry = server_config["get_device_registry"]
require_api_auth = server_config["require_api_auth"]
require_login = server_config["require_login"]
# Auto-load port mapping from deploy.json
c2_root = server_config.get("c2_root", "")
load_port_map_from_deploy(c2_root)
@bp.route("/monitor/ports", methods=["GET"])
@require_api_auth
def list_ports():
"""List available serial ports and their device mappings."""
ports = []
for dev in sorted(os.listdir("/dev")):
if dev.startswith("ttyUSB") or dev.startswith("ttyACM"):
port = f"/dev/{dev}"
with _port_map_lock:
device_id = _port_map.get(port, "")
with _readers_lock:
active = port in _readers and _readers[port]["thread"].is_alive()
ports.append({
"port": port,
"device_id": device_id,
"monitoring": active,
})
return jsonify({"ports": ports})
@bp.route("/monitor/ports/map", methods=["POST"])
@require_api_auth
def set_port_map():
"""Set port → device_id mapping.
JSON body: {"mapping": {"/dev/ttyUSB0": "esp-fakeap", ...}}
"""
data = request.get_json()
if not data or "mapping" not in data:
return jsonify({"error": "mapping required"}), 400
with _port_map_lock:
_port_map.update(data["mapping"])
return jsonify({"ok": True, "mapping": _port_map})
@bp.route("/monitor/stream/<path:port>")
@require_login
def stream_serial(port):
"""SSE endpoint: stream serial logs from a port.
Example: GET /api/monitor/stream/dev/ttyUSB0
"""
real_port = "/" + port # reconstruct /dev/ttyUSBx
if not os.path.exists(real_port):
return jsonify({"error": f"Port {real_port} not found"}), 404
reader = _ensure_reader(real_port)
q = queue.Queue(maxsize=500)
with reader["subscribers_lock"]:
reader["subscribers"].append(q)
def generate():
yield f"data: [monitor] Connected to {real_port} @ {BAUD_RATE} baud\n\n"
try:
while True:
try:
line = q.get(timeout=30)
yield f"data: {line}\n\n"
except queue.Empty:
yield ": keepalive\n\n"
except GeneratorExit:
# Client disconnected, remove subscriber
with reader["subscribers_lock"]:
try:
reader["subscribers"].remove(q)
except ValueError:
pass
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@bp.route("/monitor/stop/<path:port>", methods=["POST"])
@require_api_auth
def stop_monitor(port):
"""Stop monitoring a serial port."""
real_port = "/" + port
_stop_reader(real_port)
return jsonify({"ok": True, "port": real_port})
return bp