Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
177 lines
6.5 KiB
Python
177 lines
6.5 KiB
Python
"""OTA firmware update API routes."""
|
|
|
|
import hmac
|
|
import os
|
|
import time
|
|
import uuid
|
|
from flask import Blueprint, jsonify, request, send_from_directory
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from proto.c2_pb2 import Command
|
|
|
|
|
|
def create_ota_blueprint(server_config):
|
|
"""
|
|
Create the OTA API blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- get_device_registry: Callable returning device registry
|
|
- get_transport: Callable returning transport instance
|
|
- require_login: Auth decorator
|
|
- require_api_auth: Auth decorator
|
|
- c2_root: C2 root directory path
|
|
"""
|
|
bp = Blueprint("api_ota", __name__, url_prefix="/api/ota")
|
|
|
|
get_registry = server_config["get_device_registry"]
|
|
get_transport = server_config["get_transport"]
|
|
require_api_auth = server_config["require_api_auth"]
|
|
limiter = server_config["limiter"]
|
|
c2_root = server_config["c2_root"]
|
|
|
|
firmware_dir = os.path.join(c2_root, "firmware")
|
|
os.makedirs(firmware_dir, exist_ok=True)
|
|
|
|
@bp.route("/deploy", methods=["POST"])
|
|
@require_api_auth
|
|
@limiter.limit("10 per minute")
|
|
def deploy():
|
|
"""Send ota_update command to one or more devices."""
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "JSON body required"}), 400
|
|
|
|
url = data.get("url", "").strip()
|
|
device_ids = data.get("device_ids", [])
|
|
|
|
if not url:
|
|
return jsonify({"error": "url is required"}), 400
|
|
if not device_ids:
|
|
return jsonify({"error": "device_ids is required"}), 400
|
|
|
|
registry = get_registry()
|
|
transport = get_transport()
|
|
if not registry or not transport:
|
|
return jsonify({"error": "Registry or transport not available"}), 503
|
|
|
|
results = []
|
|
for did in device_ids:
|
|
device = registry.get(did)
|
|
if not device:
|
|
results.append({"device_id": did, "status": "error", "message": "Device not found"})
|
|
continue
|
|
if device.status != "Connected":
|
|
results.append({"device_id": did, "status": "error", "message": "Device not connected"})
|
|
continue
|
|
|
|
try:
|
|
req_id = f"ota-{did}-{uuid.uuid4().hex[:8]}"
|
|
cmd = Command()
|
|
cmd.device_id = did
|
|
cmd.command_name = "ota_update"
|
|
cmd.request_id = req_id
|
|
cmd.argv.append(url)
|
|
transport.send_command(device.sock, cmd, did)
|
|
results.append({"device_id": did, "status": "ok", "request_id": req_id})
|
|
except Exception as e:
|
|
results.append({"device_id": did, "status": "error", "message": str(e)})
|
|
|
|
return jsonify({"results": results})
|
|
|
|
@bp.route("/upload", methods=["POST"])
|
|
@require_api_auth
|
|
@limiter.limit("5 per minute")
|
|
def upload():
|
|
"""Upload a firmware binary to C3PO for serving."""
|
|
if "file" not in request.files:
|
|
return jsonify({"error": "No file uploaded"}), 400
|
|
|
|
f = request.files["file"]
|
|
if not f.filename:
|
|
return jsonify({"error": "No filename"}), 400
|
|
|
|
custom_name = request.form.get("name", "").strip()
|
|
if custom_name:
|
|
if not custom_name.endswith(".bin"):
|
|
custom_name += ".bin"
|
|
filename = secure_filename(custom_name)
|
|
else:
|
|
filename = secure_filename(f.filename)
|
|
|
|
if not filename.endswith(".bin"):
|
|
return jsonify({"error": "Only .bin files allowed"}), 400
|
|
|
|
filepath = os.path.join(firmware_dir, filename)
|
|
f.save(filepath)
|
|
size = os.path.getsize(filepath)
|
|
|
|
return jsonify({
|
|
"filename": filename,
|
|
"size": size,
|
|
"url": f"/api/ota/firmware/{filename}",
|
|
})
|
|
|
|
@bp.route("/firmware/<filename>")
|
|
@require_api_auth
|
|
def serve_firmware(filename):
|
|
"""Serve a firmware binary file (requires API auth)."""
|
|
filename = secure_filename(filename)
|
|
return send_from_directory(firmware_dir, filename)
|
|
|
|
@bp.route("/fw/<filename>")
|
|
def serve_firmware_short(filename):
|
|
"""Short URL for firmware download (ESP32 OTA + authenticated users).
|
|
Checks either Bearer token or session login."""
|
|
from flask import session as flask_session
|
|
from streams.config import MULTILAT_AUTH_TOKEN
|
|
auth = request.headers.get("Authorization", "")
|
|
token_param = request.args.get("token", "")
|
|
logged_in = flask_session.get("logged_in", False)
|
|
valid_bearer = auth == f"Bearer {MULTILAT_AUTH_TOKEN}"
|
|
valid_token = token_param and hmac.compare_digest(token_param, MULTILAT_AUTH_TOKEN)
|
|
if not (logged_in or valid_bearer or valid_token):
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
filename = secure_filename(filename)
|
|
return send_from_directory(firmware_dir, filename)
|
|
|
|
@bp.route("/dl/<filename>")
|
|
def serve_firmware_public(filename):
|
|
"""Public firmware download for ESP32 OTA (no auth required).
|
|
The device cannot set HTTP headers, and query-param auth may fail
|
|
with the IDF HTTP client. Bind to LAN only."""
|
|
filename = secure_filename(filename)
|
|
filepath = os.path.join(firmware_dir, filename)
|
|
if not os.path.exists(filepath):
|
|
return jsonify({"error": "File not found"}), 404
|
|
return send_from_directory(firmware_dir, filename)
|
|
|
|
@bp.route("/firmware", methods=["GET"])
|
|
@require_api_auth
|
|
def list_firmware():
|
|
"""List available firmware files."""
|
|
files = []
|
|
if os.path.isdir(firmware_dir):
|
|
for f in sorted(os.listdir(firmware_dir)):
|
|
if f.endswith(".bin"):
|
|
fpath = os.path.join(firmware_dir, f)
|
|
files.append({
|
|
"filename": f,
|
|
"size": os.path.getsize(fpath),
|
|
"modified": os.path.getmtime(fpath),
|
|
})
|
|
return jsonify({"firmware": files})
|
|
|
|
@bp.route("/firmware/<filename>", methods=["DELETE"])
|
|
@require_api_auth
|
|
def delete_firmware(filename):
|
|
"""Delete a firmware file."""
|
|
filename = secure_filename(filename)
|
|
filepath = os.path.join(firmware_dir, filename)
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
return jsonify({"status": "deleted"})
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
return bp
|