Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
109 lines
3.9 KiB
Python
109 lines
3.9 KiB
Python
"""OTA firmware build API routes."""
|
|
|
|
import re
|
|
from flask import Blueprint, jsonify, request
|
|
|
|
|
|
def create_build_blueprint(server_config):
|
|
"""
|
|
Create the build API blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- build_manager: BuildManager instance
|
|
- require_api_auth: Auth decorator
|
|
"""
|
|
bp = Blueprint("api_build", __name__, url_prefix="/api/ota/build")
|
|
|
|
build_manager = server_config["build_manager"]
|
|
require_api_auth = server_config["require_api_auth"]
|
|
limiter = server_config["limiter"]
|
|
|
|
@bp.route("/defaults", methods=["GET"])
|
|
@require_api_auth
|
|
def get_defaults():
|
|
"""Return deploy.json defaults for form pre-population."""
|
|
return jsonify(build_manager.get_defaults())
|
|
|
|
@bp.route("/start", methods=["POST"])
|
|
@require_api_auth
|
|
@limiter.limit("3 per minute")
|
|
def start_build():
|
|
"""Trigger a firmware build for a device."""
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "JSON body required"}), 400
|
|
|
|
device_id = data.get("device_id", "")
|
|
if not isinstance(device_id, str):
|
|
return jsonify({"error": "device_id must be a string"}), 400
|
|
device_id = device_id.strip()
|
|
if not device_id:
|
|
return jsonify({"error": "device_id is required"}), 400
|
|
if not re.match(r'^[a-zA-Z0-9_-]{1,32}$', device_id):
|
|
return jsonify({"error": "Invalid device_id (alphanumeric/hyphens/underscores, max 32 chars)"}), 400
|
|
|
|
hostname = data.get("hostname", "")
|
|
if not isinstance(hostname, str):
|
|
return jsonify({"error": "hostname must be a string"}), 400
|
|
hostname = hostname.strip()
|
|
if hostname and not re.match(r'^[a-zA-Z0-9_-]{1,63}$', hostname):
|
|
return jsonify({"error": "Invalid hostname"}), 400
|
|
|
|
modules = data.get("modules", {})
|
|
if not isinstance(modules, dict):
|
|
return jsonify({"error": "modules must be an object"}), 400
|
|
VALID_MODULES = {"network", "fakeap", "honeypot", "recon", "recon_camera",
|
|
"recon_ble_trilat", "redteam", "ota", "canbus"}
|
|
for key, val in modules.items():
|
|
if key not in VALID_MODULES:
|
|
return jsonify({"error": f"Unknown module: {key}"}), 400
|
|
if not isinstance(val, bool):
|
|
return jsonify({"error": f"Module '{key}' must be a boolean"}), 400
|
|
|
|
server_cfg = data.get("server", {})
|
|
if not isinstance(server_cfg, dict):
|
|
return jsonify({"error": "server must be an object"}), 400
|
|
|
|
network_cfg = data.get("network", {})
|
|
if not isinstance(network_cfg, dict):
|
|
return jsonify({"error": "network must be an object"}), 400
|
|
|
|
ota_cfg = data.get("ota", {"enabled": True, "allow_http": False})
|
|
if not isinstance(ota_cfg, dict):
|
|
return jsonify({"error": "ota must be an object"}), 400
|
|
|
|
config = {
|
|
"device_id": device_id,
|
|
"hostname": hostname,
|
|
"modules": modules,
|
|
"server": server_cfg,
|
|
"network": network_cfg,
|
|
"ota": ota_cfg,
|
|
}
|
|
|
|
ok, msg = build_manager.start_build(config)
|
|
if not ok:
|
|
return jsonify({"error": msg}), 409
|
|
return jsonify({"status": "started", "device_id": device_id})
|
|
|
|
@bp.route("/status", methods=["GET"])
|
|
@require_api_auth
|
|
def build_status():
|
|
"""Return current build state."""
|
|
return jsonify(build_manager.state)
|
|
|
|
@bp.route("/log", methods=["GET"])
|
|
@require_api_auth
|
|
def build_log():
|
|
"""Return build log lines (supports offset for incremental fetch)."""
|
|
offset = request.args.get("offset", 0, type=int)
|
|
lines = build_manager.get_log(offset)
|
|
return jsonify({
|
|
"lines": lines,
|
|
"offset": offset,
|
|
"total": offset + len(lines),
|
|
})
|
|
|
|
return bp
|