espilon-source/tools/C3PO/web/mlat.py
Eun0us 8b6c1cd53d ε - ChaCha20-Poly1305 AEAD + HKDF crypto upgrade + C3PO rewrite + docs
Crypto:
- Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD
- HKDF-SHA256 key derivation from per-device factory NVS master keys
- Random 12-byte nonce per message (ESP32 hardware RNG)
- crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2)
- Custom partition table with factory NVS (fctry at 0x10000)

Firmware:
- crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt
- crypto_init() at boot with esp_restart() on failure
- Fix command_t initializations across all modules (sub/help fields)
- Clean CMakeLists dependencies for ESP-IDF v5.3.2

C3PO (C2):
- Rename tools/c2 + tools/c3po -> tools/C3PO
- Per-device CryptoContext with HKDF key derivation
- KeyStore (keys.json) for master key management
- Transport parses device_id:base64(...) wire format

Tools:
- New tools/provisioning/provision.py for factory NVS key generation
- Updated flasher with mbedtls config for v5.3.2

Docs:
- Update all READMEs for new crypto, C3PO paths, provisioning
- Update roadmap, architecture diagrams, security sections
- Update CONTRIBUTING.md project structure
2026-02-10 21:28:45 +01:00

450 lines
15 KiB
Python

"""MLAT (Multilateration) engine for device positioning with GPS support."""
import threading
import time
import re
import math
from typing import Optional, Tuple
import numpy as np
from scipy.optimize import minimize
class MlatEngine:
"""
Calculates target position from multiple scanner RSSI readings.
Supports both:
- GPS coordinates (lat, lon) for outdoor tracking
- Local coordinates (x, y in meters) for indoor tracking
Uses the log-distance path loss model to convert RSSI to distance,
then weighted least squares optimization for position estimation.
"""
# Earth radius in meters (for GPS calculations)
EARTH_RADIUS = 6371000
def __init__(self, rssi_at_1m: float = -40, path_loss_n: float = 2.5, smoothing_window: int = 5):
"""
Initialize the MLAT engine.
Args:
rssi_at_1m: RSSI value at 1 meter distance (calibration, typically -40 to -50)
path_loss_n: Path loss exponent (2.0 free space, 2.5-3.5 indoors)
smoothing_window: Number of readings to average for noise reduction
"""
self.rssi_at_1m = rssi_at_1m
self.path_loss_n = path_loss_n
self.smoothing_window = smoothing_window
# Thread safety lock
self._lock = threading.Lock()
# Scanner data: {scanner_id: {"position": {"lat": x, "lon": y} or {"x": x, "y": y}, ...}}
self.scanners: dict = {}
# Last calculated target position
self._last_target: Optional[dict] = None
self._last_calculation: float = 0
# Coordinate mode: 'gps' or 'local'
self._coord_mode = 'gps'
@staticmethod
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate distance between two GPS points using Haversine formula.
Args:
lat1, lon1: First point (degrees)
lat2, lon2: Second point (degrees)
Returns:
Distance in meters
"""
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return MlatEngine.EARTH_RADIUS * c
@staticmethod
def meters_to_degrees(meters: float, latitude: float) -> Tuple[float, float]:
"""
Convert meters to approximate degrees at a given latitude.
Args:
meters: Distance in meters
latitude: Reference latitude (for longitude scaling)
Returns:
(delta_lat, delta_lon) in degrees
"""
delta_lat = meters / 111320 # ~111.32 km per degree latitude
delta_lon = meters / (111320 * math.cos(math.radians(latitude)))
return delta_lat, delta_lon
def parse_mlat_message(self, scanner_id: str, message: str) -> bool:
"""
Parse MLAT message from ESP32 device.
New format with coordinate type prefix:
MLAT:G;<lat>;<lon>;<rssi> - GPS coordinates
MLAT:L;<x>;<y>;<rssi> - Local coordinates (meters)
Legacy format (backward compatible):
MLAT:<lat>;<lon>;<rssi> - Treated as GPS
Args:
scanner_id: Device ID that sent the message
message: Raw message content (without MLAT: prefix)
Returns:
True if successfully parsed, False otherwise
"""
# New format with type prefix: G;lat;lon;rssi or L;x;y;rssi
pattern_new = re.compile(r'^([GL]);([0-9.+-]+);([0-9.+-]+);(-?\d+)$')
match = pattern_new.match(message)
if match:
coord_type = match.group(1)
c1 = float(match.group(2))
c2 = float(match.group(3))
rssi = int(match.group(4))
if coord_type == 'G':
self.add_reading_gps(scanner_id, c1, c2, rssi)
else: # 'L' - local
self.add_reading(scanner_id, c1, c2, rssi)
return True
# Legacy format: lat;lon;rssi (backward compatible - treat as GPS)
pattern_legacy = re.compile(r'^([0-9.+-]+);([0-9.+-]+);(-?\d+)$')
match = pattern_legacy.match(message)
if match:
lat = float(match.group(1))
lon = float(match.group(2))
rssi = int(match.group(3))
self.add_reading_gps(scanner_id, lat, lon, rssi)
return True
return False
def parse_data(self, raw_data: str) -> int:
"""
Parse raw MLAT data from HTTP POST.
Format: SCANNER_ID;(lat,lon);rssi
Example: ESP3;(48.8566,2.3522);-45
Args:
raw_data: Raw text data with one or more readings
Returns:
Number of readings successfully processed
"""
pattern = re.compile(r'^(\w+);\(([0-9.+-]+),([0-9.+-]+)\);(-?\d+)$')
count = 0
timestamp = time.time()
for line in raw_data.strip().split('\n'):
line = line.strip()
if not line:
continue
match = pattern.match(line)
if match:
scanner_id = match.group(1)
lat = float(match.group(2))
lon = float(match.group(3))
rssi = int(match.group(4))
self.add_reading_gps(scanner_id, lat, lon, rssi, timestamp)
count += 1
return count
def add_reading_gps(self, scanner_id: str, lat: float, lon: float, rssi: int, timestamp: float = None):
"""
Add a new RSSI reading from a scanner with GPS coordinates.
Args:
scanner_id: Unique identifier for the scanner
lat: Latitude of the scanner
lon: Longitude of the scanner
rssi: RSSI value (negative dBm)
timestamp: Reading timestamp (defaults to current time)
"""
if timestamp is None:
timestamp = time.time()
with self._lock:
if scanner_id not in self.scanners:
self.scanners[scanner_id] = {
"position": {"lat": lat, "lon": lon},
"rssi_history": [],
"last_seen": timestamp
}
scanner = self.scanners[scanner_id]
scanner["position"] = {"lat": lat, "lon": lon}
scanner["rssi_history"].append(rssi)
scanner["last_seen"] = timestamp
# Keep only recent readings for smoothing
if len(scanner["rssi_history"]) > self.smoothing_window:
scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:]
self._coord_mode = 'gps'
def add_reading(self, scanner_id: str, x: float, y: float, rssi: int, timestamp: float = None):
"""
Add a new RSSI reading from a scanner with local coordinates.
Args:
scanner_id: Unique identifier for the scanner
x: X coordinate of the scanner (meters)
y: Y coordinate of the scanner (meters)
rssi: RSSI value (negative dBm)
timestamp: Reading timestamp (defaults to current time)
"""
if timestamp is None:
timestamp = time.time()
with self._lock:
if scanner_id not in self.scanners:
self.scanners[scanner_id] = {
"position": {"x": x, "y": y},
"rssi_history": [],
"last_seen": timestamp
}
scanner = self.scanners[scanner_id]
scanner["position"] = {"x": x, "y": y}
scanner["rssi_history"].append(rssi)
scanner["last_seen"] = timestamp
if len(scanner["rssi_history"]) > self.smoothing_window:
scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:]
self._coord_mode = 'local'
def rssi_to_distance(self, rssi: float) -> float:
"""
Convert RSSI to estimated distance using log-distance path loss model.
d = 10^((RSSI_1m - RSSI) / (10 * n))
Args:
rssi: RSSI value (negative dBm)
Returns:
Estimated distance in meters
"""
return 10 ** ((self.rssi_at_1m - rssi) / (10 * self.path_loss_n))
def calculate_position(self) -> dict:
"""
Calculate target position using multilateration.
Requires at least 3 active scanners with recent readings.
Uses weighted least squares optimization.
Returns:
dict with position, confidence, and scanner info, or error
"""
# Snapshot scanner data under lock
with self._lock:
active_scanners = [
(sid, {
"position": dict(s["position"]),
"rssi_history": list(s["rssi_history"]),
"last_seen": s["last_seen"]
})
for sid, s in self.scanners.items()
if s["rssi_history"]
]
if len(active_scanners) < 3:
return {
"error": f"Need at least 3 active scanners (have {len(active_scanners)})",
"scanners_count": len(active_scanners)
}
# Determine coordinate mode from first scanner
first_pos = active_scanners[0][1]["position"]
is_gps = "lat" in first_pos
# Prepare data arrays
positions = []
distances = []
weights = []
# Reference point for GPS conversion (centroid)
if is_gps:
ref_lat = sum(s["position"]["lat"] for _, s in active_scanners) / len(active_scanners)
ref_lon = sum(s["position"]["lon"] for _, s in active_scanners) / len(active_scanners)
for scanner_id, scanner in active_scanners:
pos = scanner["position"]
if is_gps:
# Convert GPS to local meters relative to reference
x = self.haversine_distance(ref_lat, ref_lon, ref_lat, pos["lon"])
if pos["lon"] < ref_lon:
x = -x
y = self.haversine_distance(ref_lat, ref_lon, pos["lat"], ref_lon)
if pos["lat"] < ref_lat:
y = -y
else:
x, y = pos["x"], pos["y"]
# Average RSSI for noise reduction
avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"])
distance = self.rssi_to_distance(avg_rssi)
positions.append([x, y])
distances.append(distance)
# Weight by signal strength (stronger signal = more reliable)
weights.append(1.0 / (abs(avg_rssi) ** 2))
positions = np.array(positions)
distances = np.array(distances)
weights = np.array(weights)
weights = weights / weights.sum()
# Cost function
def cost_function(point):
x, y = point
estimated_distances = np.sqrt((positions[:, 0] - x)**2 + (positions[:, 1] - y)**2)
errors = (estimated_distances - distances) ** 2
return np.sum(weights * errors)
# Initial guess: weighted centroid
x0 = np.sum(weights * positions[:, 0])
y0 = np.sum(weights * positions[:, 1])
# Optimize
result = minimize(cost_function, [x0, y0], method='L-BFGS-B')
if result.success:
target_x, target_y = result.x
confidence = 1.0 / (1.0 + result.fun)
if is_gps:
# Convert back to GPS
delta_lat, delta_lon = self.meters_to_degrees(1, ref_lat)
target_lat = ref_lat + target_y * delta_lat
target_lon = ref_lon + target_x * delta_lon
self._last_target = {
"lat": round(float(target_lat), 6),
"lon": round(float(target_lon), 6)
}
else:
self._last_target = {
"x": round(float(target_x), 2),
"y": round(float(target_y), 2)
}
with self._lock:
self._last_calculation = time.time()
return {
"position": self._last_target,
"confidence": round(float(confidence), 3),
"scanners_used": len(active_scanners),
"calculated_at": self._last_calculation
}
else:
return {
"error": "Optimization failed",
"details": result.message
}
def get_state(self) -> dict:
"""
Get the current state of the MLAT system.
Returns:
dict with scanner info and last target position
"""
now = time.time()
scanners_data = []
with self._lock:
scanners_snapshot = dict(self.scanners)
last_target = self._last_target
last_calc = self._last_calculation
coord_mode = self._coord_mode
for scanner_id, scanner in scanners_snapshot.items():
avg_rssi = None
distance = None
if scanner["rssi_history"]:
avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"])
distance = round(self.rssi_to_distance(avg_rssi), 2)
avg_rssi = round(avg_rssi, 1)
scanners_data.append({
"id": scanner_id,
"position": scanner["position"],
"last_rssi": avg_rssi,
"estimated_distance": distance,
"last_seen": scanner["last_seen"],
"age_seconds": round(now - scanner["last_seen"], 1)
})
result = {
"scanners": scanners_data,
"scanners_count": len(scanners_data),
"target": None,
"config": {
"rssi_at_1m": self.rssi_at_1m,
"path_loss_n": self.path_loss_n,
"smoothing_window": self.smoothing_window
},
"coord_mode": coord_mode
}
# Add target if available
if last_target and (now - last_calc) < 60:
result["target"] = {
"position": last_target,
"calculated_at": last_calc,
"age_seconds": round(now - last_calc, 1)
}
return result
def update_config(self, rssi_at_1m: float = None, path_loss_n: float = None, smoothing_window: int = None):
"""
Update MLAT configuration parameters.
Args:
rssi_at_1m: New RSSI at 1m value
path_loss_n: New path loss exponent
smoothing_window: New smoothing window size
"""
if rssi_at_1m is not None:
self.rssi_at_1m = rssi_at_1m
if path_loss_n is not None:
self.path_loss_n = path_loss_n
if smoothing_window is not None:
self.smoothing_window = max(1, smoothing_window)
def clear(self):
"""Clear all scanner data and reset state."""
with self._lock:
self.scanners.clear()
self._last_target = None
self._last_calculation = 0