"""Generic command API routes — send any command to ESP32 devices.""" import re import time import uuid import threading from flask import Blueprint, jsonify, request from proto.c2_pb2 import Command # In-memory store for command results (request_id → record) _command_store = {} _store_lock = threading.Lock() # Auto-purge completed commands older than 5 minutes _PURGE_AGE = 300 def _purge_old(): now = time.time() with _store_lock: expired = [k for k, v in _command_store.items() if now - v["created_at"] > _PURGE_AGE] for k in expired: del _command_store[k] def create_commands_blueprint(server_config): """ Create the commands API blueprint. Args: server_config: Dict with keys: - get_device_registry: Callable returning device registry - get_transport: Callable returning transport instance - get_session: Callable returning session instance - require_api_auth: Auth decorator - limiter: Flask-Limiter instance """ bp = Blueprint("api_commands", __name__, url_prefix="/api") get_registry = server_config["get_device_registry"] get_transport = server_config["get_transport"] get_session = server_config["get_session"] require_api_auth = server_config["require_api_auth"] limiter = server_config["limiter"] @bp.route("/commands", methods=["POST"]) @require_api_auth @limiter.limit("60 per minute") def send_command(): """ Send a command to one or more devices. JSON body: { "device_ids": ["abc123"] or "all", "command": "ping", "argv": ["8.8.8.8"] // optional } Returns: { "results": [ {"device_id": "abc123", "status": "ok", "request_id": "..."}, ... ] } """ data = request.get_json() if not data: return jsonify({"error": "JSON body required"}), 400 command_name = data.get("command", "") if not isinstance(command_name, str): return jsonify({"error": "command must be a string"}), 400 command_name = command_name.strip() if not command_name: return jsonify({"error": "command is required"}), 400 if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]{0,63}$', command_name): return jsonify({"error": "Invalid command name"}), 400 device_ids = data.get("device_ids", []) if device_ids != "all": if not isinstance(device_ids, list): return jsonify({"error": "device_ids must be a list or \"all\""}), 400 if len(device_ids) > 100: return jsonify({"error": "Too many device_ids (max 100)"}), 400 for did in device_ids: if not isinstance(did, str) or not did.strip(): return jsonify({"error": "Each device_id must be a non-empty string"}), 400 argv = data.get("argv", []) if not isinstance(argv, list): return jsonify({"error": "argv must be a list"}), 400 if len(argv) > 10: return jsonify({"error": "Too many arguments (max 10)"}), 400 for i, a in enumerate(argv): if not isinstance(a, (str, int, float)): return jsonify({"error": f"argv[{i}] must be a string or number"}), 400 if isinstance(a, str) and len(a) > 256: return jsonify({"error": f"argv[{i}] too long (max 256 chars)"}), 400 registry = get_registry() transport = get_transport() session = get_session() if not registry or not transport: return jsonify({"error": "C2 not ready"}), 503 # "all" → broadcast to every connected device if device_ids == "all": device_ids = [d.id for d in registry.all() if d.status == "Connected"] if not device_ids: return jsonify({"error": "device_ids is required (list or \"all\")"}), 400 _purge_old() results = [] for did in device_ids: device = registry.get(did) if not device: results.append({"device_id": did, "status": "error", "message": "Device not found"}) continue if device.status != "Connected": results.append({"device_id": did, "status": "error", "message": "Device not connected"}) continue try: req_id = f"web-{did}-{uuid.uuid4().hex[:8]}" cmd = Command() cmd.device_id = did cmd.command_name = command_name cmd.request_id = req_id for a in argv: cmd.argv.append(str(a)) # Shared output list so transport writes are visible to API shared_output = [] now = time.time() # Track in session so Transport routes responses back if session: session.active_commands[req_id] = { "device_id": did, "command": command_name, "start_time": now, "status": "pending", "output": shared_output, } # Also track in our web store for polling (same output list) with _store_lock: _command_store[req_id] = { "device_id": did, "command": command_name, "argv": argv, "status": "pending", "output": shared_output, "created_at": now, } transport.send_command(device.sock, cmd, did) results.append({"device_id": did, "status": "ok", "request_id": req_id}) except Exception as e: results.append({"device_id": did, "status": "error", "message": str(e)}) return jsonify({"results": results}) @bp.route("/commands/", methods=["GET"]) @require_api_auth def get_command_status(request_id): """ Poll the result of a previously sent command. Returns: { "request_id": "...", "device_id": "...", "command": "ping", "status": "pending" | "completed", "output": ["line1", "line2"] } """ session = get_session() # Check our web store (shares output list with session.active_commands) with _store_lock: if request_id in _command_store: rec = _command_store[request_id] # Sync status: if removed from session, it completed if rec["status"] == "pending" and session: if request_id in session.active_commands: rec["status"] = session.active_commands[request_id]["status"] else: rec["status"] = "completed" return jsonify({ "request_id": request_id, **rec, }) return jsonify({"error": "Command not found"}), 404 @bp.route("/commands", methods=["GET"]) @require_api_auth def list_commands(): """List recent commands and their statuses.""" session = get_session() commands = [] with _store_lock: for req_id, rec in sorted(_command_store.items(), key=lambda x: x[1]["created_at"], reverse=True): # Sync completion status if session and req_id not in session.active_commands: if rec["status"] == "pending": rec["status"] = "completed" commands.append({"request_id": req_id, **rec}) return jsonify({"commands": commands[:50]}) return bp