"""Device API routes.""" import json import time from flask import Blueprint, jsonify, Response def create_devices_blueprint(server_config): """ Create the devices API blueprint. Args: server_config: Dict with keys: - get_device_registry: Callable returning device registry - require_api_auth: Auth decorator """ bp = Blueprint("api_devices", __name__, url_prefix="/api") get_registry = server_config["get_device_registry"] require_api_auth = server_config["require_api_auth"] def _serialize_devices(): registry = get_registry() if registry is None: return {"devices": [], "count": 0} now = time.time() devices = [] for d in registry.all(): devices.append({ "id": d.id, "ip": d.address[0] if d.address else "unknown", "port": d.address[1] if d.address else 0, "status": d.status, "chip": d.chip, "modules": d.modules, "connected_at": d.connected_at, "last_seen": d.last_seen, "connected_for_seconds": round(now - d.connected_at, 1), "last_seen_ago_seconds": round(now - d.last_seen, 1) }) return {"devices": devices, "count": len(devices)} @bp.route("/devices") @require_api_auth def list_devices(): return jsonify(_serialize_devices()) @bp.route("/devices/stream") @require_api_auth def stream_devices(): def generate(): while True: data = _serialize_devices() yield f"data: {json.dumps(data)}\n\n" time.sleep(3) return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) return bp