"""Serial monitor API — stream ESP32 serial logs via SSE.""" import os import threading import time import queue from flask import Blueprint, Response, jsonify, request # TTY port → device_id mapping (set at startup from deploy.json or manually) _port_map = {} _port_map_lock = threading.Lock() # Active serial readers: port → { "thread": ..., "subscribers": [...], "stop": Event } _readers = {} _readers_lock = threading.Lock() BAUD_RATE = 115200 def _serial_reader_thread(port, stop_event, subscribers, subscribers_lock): """Background thread that reads from a serial port and fans out to subscribers.""" import serial try: ser = serial.Serial(port, BAUD_RATE, timeout=0.5) except Exception as e: msg = f"[monitor] Failed to open {port}: {e}\n" with subscribers_lock: for q in subscribers: q.put(msg) return buf = b"" while not stop_event.is_set(): try: chunk = ser.read(256) if not chunk: continue buf += chunk while b"\n" in buf: line, buf = buf.split(b"\n", 1) text = line.decode("utf-8", errors="replace").rstrip("\r") with subscribers_lock: dead = [] for i, q in enumerate(subscribers): try: q.put_nowait(text) except queue.Full: dead.append(i) for i in reversed(dead): subscribers.pop(i) except Exception: if stop_event.is_set(): break time.sleep(0.1) try: ser.close() except Exception: pass def _ensure_reader(port): """Start a serial reader thread for a port if not already running.""" with _readers_lock: if port in _readers and _readers[port]["thread"].is_alive(): return _readers[port] stop_event = threading.Event() subscribers = [] subs_lock = threading.Lock() t = threading.Thread( target=_serial_reader_thread, args=(port, stop_event, subscribers, subs_lock), daemon=True, ) t.start() _readers[port] = { "thread": t, "stop": stop_event, "subscribers": subscribers, "subscribers_lock": subs_lock, } return _readers[port] def _stop_reader(port): """Stop a serial reader thread.""" with _readers_lock: if port in _readers: _readers[port]["stop"].set() del _readers[port] def load_port_map_from_deploy(c2_root): """Try to load port→device_id mapping from deploy.json.""" import json deploy_path = os.path.join(os.path.dirname(c2_root), "deploy.json") try: with open(deploy_path) as f: cfg = json.load(f) for dev in cfg.get("devices", []): port = dev.get("port", "") did = dev.get("device_id", "") if port and did: _port_map[port] = did except (FileNotFoundError, json.JSONDecodeError, KeyError): pass def create_monitor_blueprint(server_config): """ Create the serial monitor API blueprint. Args: server_config: Dict with keys: - get_device_registry: Callable returning device registry - require_api_auth: Auth decorator - require_login: Login decorator (for SSE) """ bp = Blueprint("api_monitor", __name__, url_prefix="/api") get_registry = server_config["get_device_registry"] require_api_auth = server_config["require_api_auth"] require_login = server_config["require_login"] # Auto-load port mapping from deploy.json c2_root = server_config.get("c2_root", "") load_port_map_from_deploy(c2_root) @bp.route("/monitor/ports", methods=["GET"]) @require_api_auth def list_ports(): """List available serial ports and their device mappings.""" ports = [] for dev in sorted(os.listdir("/dev")): if dev.startswith("ttyUSB") or dev.startswith("ttyACM"): port = f"/dev/{dev}" with _port_map_lock: device_id = _port_map.get(port, "") with _readers_lock: active = port in _readers and _readers[port]["thread"].is_alive() ports.append({ "port": port, "device_id": device_id, "monitoring": active, }) return jsonify({"ports": ports}) @bp.route("/monitor/ports/map", methods=["POST"]) @require_api_auth def set_port_map(): """Set port → device_id mapping. JSON body: {"mapping": {"/dev/ttyUSB0": "esp-fakeap", ...}} """ data = request.get_json() if not data or "mapping" not in data: return jsonify({"error": "mapping required"}), 400 with _port_map_lock: _port_map.update(data["mapping"]) return jsonify({"ok": True, "mapping": _port_map}) @bp.route("/monitor/stream/") @require_login def stream_serial(port): """SSE endpoint: stream serial logs from a port. Example: GET /api/monitor/stream/dev/ttyUSB0 """ real_port = "/" + port # reconstruct /dev/ttyUSBx if not os.path.exists(real_port): return jsonify({"error": f"Port {real_port} not found"}), 404 reader = _ensure_reader(real_port) q = queue.Queue(maxsize=500) with reader["subscribers_lock"]: reader["subscribers"].append(q) def generate(): yield f"data: [monitor] Connected to {real_port} @ {BAUD_RATE} baud\n\n" try: while True: try: line = q.get(timeout=30) yield f"data: {line}\n\n" except queue.Empty: yield ": keepalive\n\n" except GeneratorExit: # Client disconnected, remove subscriber with reader["subscribers_lock"]: try: reader["subscribers"].remove(q) except ValueError: pass return Response( generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) @bp.route("/monitor/stop/", methods=["POST"]) @require_api_auth def stop_monitor(port): """Stop monitoring a serial port.""" real_port = "/" + port _stop_reader(real_port) return jsonify({"ok": True, "port": real_port}) return bp