espilon-source/tools/C3PO/web/routes/api_commands.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

230 lines
8.1 KiB
Python

"""Generic command API routes — send any command to ESP32 devices."""
import re
import time
import uuid
import threading
from flask import Blueprint, jsonify, request
from proto.c2_pb2 import Command
# In-memory store for command results (request_id → record)
_command_store = {}
_store_lock = threading.Lock()
# Auto-purge completed commands older than 5 minutes
_PURGE_AGE = 300
def _purge_old():
now = time.time()
with _store_lock:
expired = [k for k, v in _command_store.items()
if now - v["created_at"] > _PURGE_AGE]
for k in expired:
del _command_store[k]
def create_commands_blueprint(server_config):
"""
Create the commands API blueprint.
Args:
server_config: Dict with keys:
- get_device_registry: Callable returning device registry
- get_transport: Callable returning transport instance
- get_session: Callable returning session instance
- require_api_auth: Auth decorator
- limiter: Flask-Limiter instance
"""
bp = Blueprint("api_commands", __name__, url_prefix="/api")
get_registry = server_config["get_device_registry"]
get_transport = server_config["get_transport"]
get_session = server_config["get_session"]
require_api_auth = server_config["require_api_auth"]
limiter = server_config["limiter"]
@bp.route("/commands", methods=["POST"])
@require_api_auth
@limiter.limit("60 per minute")
def send_command():
"""
Send a command to one or more devices.
JSON body:
{
"device_ids": ["abc123"] or "all",
"command": "ping",
"argv": ["8.8.8.8"] // optional
}
Returns:
{
"results": [
{"device_id": "abc123", "status": "ok", "request_id": "..."},
...
]
}
"""
data = request.get_json()
if not data:
return jsonify({"error": "JSON body required"}), 400
command_name = data.get("command", "")
if not isinstance(command_name, str):
return jsonify({"error": "command must be a string"}), 400
command_name = command_name.strip()
if not command_name:
return jsonify({"error": "command is required"}), 400
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]{0,63}$', command_name):
return jsonify({"error": "Invalid command name"}), 400
device_ids = data.get("device_ids", [])
if device_ids != "all":
if not isinstance(device_ids, list):
return jsonify({"error": "device_ids must be a list or \"all\""}), 400
if len(device_ids) > 100:
return jsonify({"error": "Too many device_ids (max 100)"}), 400
for did in device_ids:
if not isinstance(did, str) or not did.strip():
return jsonify({"error": "Each device_id must be a non-empty string"}), 400
argv = data.get("argv", [])
if not isinstance(argv, list):
return jsonify({"error": "argv must be a list"}), 400
if len(argv) > 10:
return jsonify({"error": "Too many arguments (max 10)"}), 400
for i, a in enumerate(argv):
if not isinstance(a, (str, int, float)):
return jsonify({"error": f"argv[{i}] must be a string or number"}), 400
if isinstance(a, str) and len(a) > 256:
return jsonify({"error": f"argv[{i}] too long (max 256 chars)"}), 400
registry = get_registry()
transport = get_transport()
session = get_session()
if not registry or not transport:
return jsonify({"error": "C2 not ready"}), 503
# "all" → broadcast to every connected device
if device_ids == "all":
device_ids = [d.id for d in registry.all() if d.status == "Connected"]
if not device_ids:
return jsonify({"error": "device_ids is required (list or \"all\")"}), 400
_purge_old()
results = []
for did in device_ids:
device = registry.get(did)
if not device:
results.append({"device_id": did, "status": "error",
"message": "Device not found"})
continue
if device.status != "Connected":
results.append({"device_id": did, "status": "error",
"message": "Device not connected"})
continue
try:
req_id = f"web-{did}-{uuid.uuid4().hex[:8]}"
cmd = Command()
cmd.device_id = did
cmd.command_name = command_name
cmd.request_id = req_id
for a in argv:
cmd.argv.append(str(a))
# Shared output list so transport writes are visible to API
shared_output = []
now = time.time()
# Track in session so Transport routes responses back
if session:
session.active_commands[req_id] = {
"device_id": did,
"command": command_name,
"start_time": now,
"status": "pending",
"output": shared_output,
}
# Also track in our web store for polling (same output list)
with _store_lock:
_command_store[req_id] = {
"device_id": did,
"command": command_name,
"argv": argv,
"status": "pending",
"output": shared_output,
"created_at": now,
}
transport.send_command(device.sock, cmd, did)
results.append({"device_id": did, "status": "ok",
"request_id": req_id})
except Exception as e:
results.append({"device_id": did, "status": "error",
"message": str(e)})
return jsonify({"results": results})
@bp.route("/commands/<request_id>", methods=["GET"])
@require_api_auth
def get_command_status(request_id):
"""
Poll the result of a previously sent command.
Returns:
{
"request_id": "...",
"device_id": "...",
"command": "ping",
"status": "pending" | "completed",
"output": ["line1", "line2"]
}
"""
session = get_session()
# Check our web store (shares output list with session.active_commands)
with _store_lock:
if request_id in _command_store:
rec = _command_store[request_id]
# Sync status: if removed from session, it completed
if rec["status"] == "pending" and session:
if request_id in session.active_commands:
rec["status"] = session.active_commands[request_id]["status"]
else:
rec["status"] = "completed"
return jsonify({
"request_id": request_id,
**rec,
})
return jsonify({"error": "Command not found"}), 404
@bp.route("/commands", methods=["GET"])
@require_api_auth
def list_commands():
"""List recent commands and their statuses."""
session = get_session()
commands = []
with _store_lock:
for req_id, rec in sorted(_command_store.items(),
key=lambda x: x[1]["created_at"],
reverse=True):
# Sync completion status
if session and req_id not in session.active_commands:
if rec["status"] == "pending":
rec["status"] = "completed"
commands.append({"request_id": req_id, **rec})
return jsonify({"commands": commands[:50]})
return bp