espilon-source/tools/C3PO/web/routes/api_ota.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

177 lines
6.5 KiB
Python

"""OTA firmware update API routes."""
import hmac
import os
import time
import uuid
from flask import Blueprint, jsonify, request, send_from_directory
from werkzeug.utils import secure_filename
from proto.c2_pb2 import Command
def create_ota_blueprint(server_config):
"""
Create the OTA API blueprint.
Args:
server_config: Dict with keys:
- get_device_registry: Callable returning device registry
- get_transport: Callable returning transport instance
- require_login: Auth decorator
- require_api_auth: Auth decorator
- c2_root: C2 root directory path
"""
bp = Blueprint("api_ota", __name__, url_prefix="/api/ota")
get_registry = server_config["get_device_registry"]
get_transport = server_config["get_transport"]
require_api_auth = server_config["require_api_auth"]
limiter = server_config["limiter"]
c2_root = server_config["c2_root"]
firmware_dir = os.path.join(c2_root, "firmware")
os.makedirs(firmware_dir, exist_ok=True)
@bp.route("/deploy", methods=["POST"])
@require_api_auth
@limiter.limit("10 per minute")
def deploy():
"""Send ota_update command to one or more devices."""
data = request.get_json()
if not data:
return jsonify({"error": "JSON body required"}), 400
url = data.get("url", "").strip()
device_ids = data.get("device_ids", [])
if not url:
return jsonify({"error": "url is required"}), 400
if not device_ids:
return jsonify({"error": "device_ids is required"}), 400
registry = get_registry()
transport = get_transport()
if not registry or not transport:
return jsonify({"error": "Registry or transport not available"}), 503
results = []
for did in device_ids:
device = registry.get(did)
if not device:
results.append({"device_id": did, "status": "error", "message": "Device not found"})
continue
if device.status != "Connected":
results.append({"device_id": did, "status": "error", "message": "Device not connected"})
continue
try:
req_id = f"ota-{did}-{uuid.uuid4().hex[:8]}"
cmd = Command()
cmd.device_id = did
cmd.command_name = "ota_update"
cmd.request_id = req_id
cmd.argv.append(url)
transport.send_command(device.sock, cmd, did)
results.append({"device_id": did, "status": "ok", "request_id": req_id})
except Exception as e:
results.append({"device_id": did, "status": "error", "message": str(e)})
return jsonify({"results": results})
@bp.route("/upload", methods=["POST"])
@require_api_auth
@limiter.limit("5 per minute")
def upload():
"""Upload a firmware binary to C3PO for serving."""
if "file" not in request.files:
return jsonify({"error": "No file uploaded"}), 400
f = request.files["file"]
if not f.filename:
return jsonify({"error": "No filename"}), 400
custom_name = request.form.get("name", "").strip()
if custom_name:
if not custom_name.endswith(".bin"):
custom_name += ".bin"
filename = secure_filename(custom_name)
else:
filename = secure_filename(f.filename)
if not filename.endswith(".bin"):
return jsonify({"error": "Only .bin files allowed"}), 400
filepath = os.path.join(firmware_dir, filename)
f.save(filepath)
size = os.path.getsize(filepath)
return jsonify({
"filename": filename,
"size": size,
"url": f"/api/ota/firmware/{filename}",
})
@bp.route("/firmware/<filename>")
@require_api_auth
def serve_firmware(filename):
"""Serve a firmware binary file (requires API auth)."""
filename = secure_filename(filename)
return send_from_directory(firmware_dir, filename)
@bp.route("/fw/<filename>")
def serve_firmware_short(filename):
"""Short URL for firmware download (ESP32 OTA + authenticated users).
Checks either Bearer token or session login."""
from flask import session as flask_session
from streams.config import MULTILAT_AUTH_TOKEN
auth = request.headers.get("Authorization", "")
token_param = request.args.get("token", "")
logged_in = flask_session.get("logged_in", False)
valid_bearer = auth == f"Bearer {MULTILAT_AUTH_TOKEN}"
valid_token = token_param and hmac.compare_digest(token_param, MULTILAT_AUTH_TOKEN)
if not (logged_in or valid_bearer or valid_token):
return jsonify({"error": "Unauthorized"}), 401
filename = secure_filename(filename)
return send_from_directory(firmware_dir, filename)
@bp.route("/dl/<filename>")
def serve_firmware_public(filename):
"""Public firmware download for ESP32 OTA (no auth required).
The device cannot set HTTP headers, and query-param auth may fail
with the IDF HTTP client. Bind to LAN only."""
filename = secure_filename(filename)
filepath = os.path.join(firmware_dir, filename)
if not os.path.exists(filepath):
return jsonify({"error": "File not found"}), 404
return send_from_directory(firmware_dir, filename)
@bp.route("/firmware", methods=["GET"])
@require_api_auth
def list_firmware():
"""List available firmware files."""
files = []
if os.path.isdir(firmware_dir):
for f in sorted(os.listdir(firmware_dir)):
if f.endswith(".bin"):
fpath = os.path.join(firmware_dir, f)
files.append({
"filename": f,
"size": os.path.getsize(fpath),
"modified": os.path.getmtime(fpath),
})
return jsonify({"firmware": files})
@bp.route("/firmware/<filename>", methods=["DELETE"])
@require_api_auth
def delete_firmware(filename):
"""Delete a firmware file."""
filename = secure_filename(filename)
filepath = os.path.join(firmware_dir, filename)
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({"status": "deleted"})
return jsonify({"error": "File not found"}), 404
return bp