Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
217 lines
6.8 KiB
Python
217 lines
6.8 KiB
Python
"""Serial monitor API — stream ESP32 serial logs via SSE."""
|
|
|
|
import os
|
|
import threading
|
|
import time
|
|
import queue
|
|
from flask import Blueprint, Response, jsonify, request
|
|
|
|
# TTY port → device_id mapping (set at startup from deploy.json or manually)
|
|
_port_map = {}
|
|
_port_map_lock = threading.Lock()
|
|
|
|
# Active serial readers: port → { "thread": ..., "subscribers": [...], "stop": Event }
|
|
_readers = {}
|
|
_readers_lock = threading.Lock()
|
|
|
|
BAUD_RATE = 115200
|
|
|
|
|
|
def _serial_reader_thread(port, stop_event, subscribers, subscribers_lock):
|
|
"""Background thread that reads from a serial port and fans out to subscribers."""
|
|
import serial
|
|
|
|
try:
|
|
ser = serial.Serial(port, BAUD_RATE, timeout=0.5)
|
|
except Exception as e:
|
|
msg = f"[monitor] Failed to open {port}: {e}\n"
|
|
with subscribers_lock:
|
|
for q in subscribers:
|
|
q.put(msg)
|
|
return
|
|
|
|
buf = b""
|
|
while not stop_event.is_set():
|
|
try:
|
|
chunk = ser.read(256)
|
|
if not chunk:
|
|
continue
|
|
|
|
buf += chunk
|
|
while b"\n" in buf:
|
|
line, buf = buf.split(b"\n", 1)
|
|
text = line.decode("utf-8", errors="replace").rstrip("\r")
|
|
with subscribers_lock:
|
|
dead = []
|
|
for i, q in enumerate(subscribers):
|
|
try:
|
|
q.put_nowait(text)
|
|
except queue.Full:
|
|
dead.append(i)
|
|
for i in reversed(dead):
|
|
subscribers.pop(i)
|
|
except Exception:
|
|
if stop_event.is_set():
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
ser.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _ensure_reader(port):
|
|
"""Start a serial reader thread for a port if not already running."""
|
|
with _readers_lock:
|
|
if port in _readers and _readers[port]["thread"].is_alive():
|
|
return _readers[port]
|
|
|
|
stop_event = threading.Event()
|
|
subscribers = []
|
|
subs_lock = threading.Lock()
|
|
|
|
t = threading.Thread(
|
|
target=_serial_reader_thread,
|
|
args=(port, stop_event, subscribers, subs_lock),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
_readers[port] = {
|
|
"thread": t,
|
|
"stop": stop_event,
|
|
"subscribers": subscribers,
|
|
"subscribers_lock": subs_lock,
|
|
}
|
|
return _readers[port]
|
|
|
|
|
|
def _stop_reader(port):
|
|
"""Stop a serial reader thread."""
|
|
with _readers_lock:
|
|
if port in _readers:
|
|
_readers[port]["stop"].set()
|
|
del _readers[port]
|
|
|
|
|
|
def load_port_map_from_deploy(c2_root):
|
|
"""Try to load port→device_id mapping from deploy.json."""
|
|
import json
|
|
deploy_path = os.path.join(os.path.dirname(c2_root), "deploy.json")
|
|
try:
|
|
with open(deploy_path) as f:
|
|
cfg = json.load(f)
|
|
for dev in cfg.get("devices", []):
|
|
port = dev.get("port", "")
|
|
did = dev.get("device_id", "")
|
|
if port and did:
|
|
_port_map[port] = did
|
|
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
|
|
def create_monitor_blueprint(server_config):
|
|
"""
|
|
Create the serial monitor API blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- get_device_registry: Callable returning device registry
|
|
- require_api_auth: Auth decorator
|
|
- require_login: Login decorator (for SSE)
|
|
"""
|
|
bp = Blueprint("api_monitor", __name__, url_prefix="/api")
|
|
|
|
get_registry = server_config["get_device_registry"]
|
|
require_api_auth = server_config["require_api_auth"]
|
|
require_login = server_config["require_login"]
|
|
|
|
# Auto-load port mapping from deploy.json
|
|
c2_root = server_config.get("c2_root", "")
|
|
load_port_map_from_deploy(c2_root)
|
|
|
|
@bp.route("/monitor/ports", methods=["GET"])
|
|
@require_api_auth
|
|
def list_ports():
|
|
"""List available serial ports and their device mappings."""
|
|
ports = []
|
|
for dev in sorted(os.listdir("/dev")):
|
|
if dev.startswith("ttyUSB") or dev.startswith("ttyACM"):
|
|
port = f"/dev/{dev}"
|
|
with _port_map_lock:
|
|
device_id = _port_map.get(port, "")
|
|
with _readers_lock:
|
|
active = port in _readers and _readers[port]["thread"].is_alive()
|
|
ports.append({
|
|
"port": port,
|
|
"device_id": device_id,
|
|
"monitoring": active,
|
|
})
|
|
return jsonify({"ports": ports})
|
|
|
|
@bp.route("/monitor/ports/map", methods=["POST"])
|
|
@require_api_auth
|
|
def set_port_map():
|
|
"""Set port → device_id mapping.
|
|
JSON body: {"mapping": {"/dev/ttyUSB0": "esp-fakeap", ...}}
|
|
"""
|
|
data = request.get_json()
|
|
if not data or "mapping" not in data:
|
|
return jsonify({"error": "mapping required"}), 400
|
|
with _port_map_lock:
|
|
_port_map.update(data["mapping"])
|
|
return jsonify({"ok": True, "mapping": _port_map})
|
|
|
|
@bp.route("/monitor/stream/<path:port>")
|
|
@require_login
|
|
def stream_serial(port):
|
|
"""SSE endpoint: stream serial logs from a port.
|
|
Example: GET /api/monitor/stream/dev/ttyUSB0
|
|
"""
|
|
real_port = "/" + port # reconstruct /dev/ttyUSBx
|
|
if not os.path.exists(real_port):
|
|
return jsonify({"error": f"Port {real_port} not found"}), 404
|
|
|
|
reader = _ensure_reader(real_port)
|
|
q = queue.Queue(maxsize=500)
|
|
|
|
with reader["subscribers_lock"]:
|
|
reader["subscribers"].append(q)
|
|
|
|
def generate():
|
|
yield f"data: [monitor] Connected to {real_port} @ {BAUD_RATE} baud\n\n"
|
|
try:
|
|
while True:
|
|
try:
|
|
line = q.get(timeout=30)
|
|
yield f"data: {line}\n\n"
|
|
except queue.Empty:
|
|
yield ": keepalive\n\n"
|
|
except GeneratorExit:
|
|
# Client disconnected, remove subscriber
|
|
with reader["subscribers_lock"]:
|
|
try:
|
|
reader["subscribers"].remove(q)
|
|
except ValueError:
|
|
pass
|
|
|
|
return Response(
|
|
generate(),
|
|
mimetype="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
@bp.route("/monitor/stop/<path:port>", methods=["POST"])
|
|
@require_api_auth
|
|
def stop_monitor(port):
|
|
"""Stop monitoring a serial port."""
|
|
real_port = "/" + port
|
|
_stop_reader(real_port)
|
|
return jsonify({"ok": True, "port": real_port})
|
|
|
|
return bp
|