Crypto: - Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD - HKDF-SHA256 key derivation from per-device factory NVS master keys - Random 12-byte nonce per message (ESP32 hardware RNG) - crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2) - Custom partition table with factory NVS (fctry at 0x10000) Firmware: - crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt - crypto_init() at boot with esp_restart() on failure - Fix command_t initializations across all modules (sub/help fields) - Clean CMakeLists dependencies for ESP-IDF v5.3.2 C3PO (C2): - Rename tools/c2 + tools/c3po -> tools/C3PO - Per-device CryptoContext with HKDF key derivation - KeyStore (keys.json) for master key management - Transport parses device_id:base64(...) wire format Tools: - New tools/provisioning/provision.py for factory NVS key generation - Updated flasher with mbedtls config for v5.3.2 Docs: - Update all READMEs for new crypto, C3PO paths, provisioning - Update roadmap, architecture diagrams, security sections - Update CONTRIBUTING.md project structure
86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
"""MLAT (Multilateration) API routes."""
|
|
|
|
import time
|
|
from flask import Blueprint, jsonify, request
|
|
|
|
|
|
def create_mlat_blueprint(server_config):
|
|
"""
|
|
Create the MLAT API blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- get_mlat_engine: Callable returning MLAT engine
|
|
- require_api_auth: Auth decorator
|
|
"""
|
|
bp = Blueprint("api_mlat", __name__, url_prefix="/api/mlat")
|
|
|
|
get_engine = server_config["get_mlat_engine"]
|
|
require_api_auth = server_config["require_api_auth"]
|
|
|
|
@bp.route("/collect", methods=["POST"])
|
|
@require_api_auth
|
|
def collect():
|
|
"""Receive MLAT readings from scanners."""
|
|
engine = get_engine()
|
|
raw_data = request.get_data(as_text=True)
|
|
count = engine.parse_data(raw_data)
|
|
|
|
if count > 0:
|
|
engine.calculate_position()
|
|
|
|
return jsonify({
|
|
"status": "ok",
|
|
"readings_processed": count
|
|
})
|
|
|
|
@bp.route("/state")
|
|
@require_api_auth
|
|
def state():
|
|
"""Get current MLAT state (scanners + target position)."""
|
|
engine = get_engine()
|
|
state = engine.get_state()
|
|
|
|
# Auto-calculate if we have enough scanners but no target
|
|
if state["target"] is None and state["scanners_count"] >= 3:
|
|
result = engine.calculate_position()
|
|
if "position" in result:
|
|
state["target"] = {
|
|
"position": result["position"],
|
|
"confidence": result.get("confidence", 0),
|
|
"calculated_at": result.get("calculated_at", time.time()),
|
|
"age_seconds": 0
|
|
}
|
|
|
|
return jsonify(state)
|
|
|
|
@bp.route("/config", methods=["GET", "POST"])
|
|
@require_api_auth
|
|
def config():
|
|
"""Get or update MLAT configuration."""
|
|
engine = get_engine()
|
|
|
|
if request.method == "POST":
|
|
data = request.get_json() or {}
|
|
engine.update_config(
|
|
rssi_at_1m=data.get("rssi_at_1m"),
|
|
path_loss_n=data.get("path_loss_n"),
|
|
smoothing_window=data.get("smoothing_window")
|
|
)
|
|
|
|
return jsonify({
|
|
"rssi_at_1m": engine.rssi_at_1m,
|
|
"path_loss_n": engine.path_loss_n,
|
|
"smoothing_window": engine.smoothing_window
|
|
})
|
|
|
|
@bp.route("/clear", methods=["POST"])
|
|
@require_api_auth
|
|
def clear():
|
|
"""Clear all scanner data."""
|
|
engine = get_engine()
|
|
engine.clear()
|
|
return jsonify({"status": "ok"})
|
|
|
|
return bp
|