"""MLAT (Multilateration) engine for device positioning with GPS support.""" import threading import time import re import math from typing import Optional, Tuple import numpy as np from scipy.optimize import minimize class MlatEngine: """ Calculates target position from multiple scanner RSSI readings. Supports both: - GPS coordinates (lat, lon) for outdoor tracking - Local coordinates (x, y in meters) for indoor tracking Uses the log-distance path loss model to convert RSSI to distance, then weighted least squares optimization for position estimation. """ # Earth radius in meters (for GPS calculations) EARTH_RADIUS = 6371000 def __init__(self, rssi_at_1m: float = -40, path_loss_n: float = 2.5, smoothing_window: int = 5): """ Initialize the MLAT engine. Args: rssi_at_1m: RSSI value at 1 meter distance (calibration, typically -40 to -50) path_loss_n: Path loss exponent (2.0 free space, 2.5-3.5 indoors) smoothing_window: Number of readings to average for noise reduction """ self.rssi_at_1m = rssi_at_1m self.path_loss_n = path_loss_n self.smoothing_window = smoothing_window # Thread safety lock self._lock = threading.Lock() # Scanner data: {scanner_id: {"position": {"lat": x, "lon": y} or {"x": x, "y": y}, ...}} self.scanners: dict = {} # Last calculated target position self._last_target: Optional[dict] = None self._last_calculation: float = 0 # Coordinate mode: 'gps' or 'local' self._coord_mode = 'gps' @staticmethod def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate distance between two GPS points using Haversine formula. Args: lat1, lon1: First point (degrees) lat2, lon2: Second point (degrees) Returns: Distance in meters """ lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = (math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return MlatEngine.EARTH_RADIUS * c @staticmethod def meters_to_degrees(meters: float, latitude: float) -> Tuple[float, float]: """ Convert meters to approximate degrees at a given latitude. Args: meters: Distance in meters latitude: Reference latitude (for longitude scaling) Returns: (delta_lat, delta_lon) in degrees """ delta_lat = meters / 111320 # ~111.32 km per degree latitude delta_lon = meters / (111320 * math.cos(math.radians(latitude))) return delta_lat, delta_lon def parse_mlat_message(self, scanner_id: str, message: str) -> bool: """ Parse MLAT message from ESP32 device. New format with coordinate type prefix: MLAT:G;;; - GPS coordinates MLAT:L;;; - Local coordinates (meters) Legacy format (backward compatible): MLAT:;; - Treated as GPS Args: scanner_id: Device ID that sent the message message: Raw message content (without MLAT: prefix) Returns: True if successfully parsed, False otherwise """ # New format with type prefix: G;lat;lon;rssi or L;x;y;rssi pattern_new = re.compile(r'^([GL]);([0-9.+-]+);([0-9.+-]+);(-?\d+)$') match = pattern_new.match(message) if match: coord_type = match.group(1) c1 = float(match.group(2)) c2 = float(match.group(3)) rssi = int(match.group(4)) if coord_type == 'G': self.add_reading_gps(scanner_id, c1, c2, rssi) else: # 'L' - local self.add_reading(scanner_id, c1, c2, rssi) return True # Legacy format: lat;lon;rssi (backward compatible - treat as GPS) pattern_legacy = re.compile(r'^([0-9.+-]+);([0-9.+-]+);(-?\d+)$') match = pattern_legacy.match(message) if match: lat = float(match.group(1)) lon = float(match.group(2)) rssi = int(match.group(3)) self.add_reading_gps(scanner_id, lat, lon, rssi) return True return False def parse_data(self, raw_data: str) -> int: """ Parse raw MLAT data from HTTP POST. Format: SCANNER_ID;(lat,lon);rssi Example: ESP3;(48.8566,2.3522);-45 Args: raw_data: Raw text data with one or more readings Returns: Number of readings successfully processed """ pattern = re.compile(r'^(\w+);\(([0-9.+-]+),([0-9.+-]+)\);(-?\d+)$') count = 0 timestamp = time.time() for line in raw_data.strip().split('\n'): line = line.strip() if not line: continue match = pattern.match(line) if match: scanner_id = match.group(1) lat = float(match.group(2)) lon = float(match.group(3)) rssi = int(match.group(4)) self.add_reading_gps(scanner_id, lat, lon, rssi, timestamp) count += 1 return count def add_reading_gps(self, scanner_id: str, lat: float, lon: float, rssi: int, timestamp: float = None): """ Add a new RSSI reading from a scanner with GPS coordinates. Args: scanner_id: Unique identifier for the scanner lat: Latitude of the scanner lon: Longitude of the scanner rssi: RSSI value (negative dBm) timestamp: Reading timestamp (defaults to current time) """ if timestamp is None: timestamp = time.time() with self._lock: if scanner_id not in self.scanners: self.scanners[scanner_id] = { "position": {"lat": lat, "lon": lon}, "rssi_history": [], "last_seen": timestamp } scanner = self.scanners[scanner_id] scanner["position"] = {"lat": lat, "lon": lon} scanner["rssi_history"].append(rssi) scanner["last_seen"] = timestamp # Keep only recent readings for smoothing if len(scanner["rssi_history"]) > self.smoothing_window: scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:] self._coord_mode = 'gps' def add_reading(self, scanner_id: str, x: float, y: float, rssi: int, timestamp: float = None): """ Add a new RSSI reading from a scanner with local coordinates. Args: scanner_id: Unique identifier for the scanner x: X coordinate of the scanner (meters) y: Y coordinate of the scanner (meters) rssi: RSSI value (negative dBm) timestamp: Reading timestamp (defaults to current time) """ if timestamp is None: timestamp = time.time() with self._lock: if scanner_id not in self.scanners: self.scanners[scanner_id] = { "position": {"x": x, "y": y}, "rssi_history": [], "last_seen": timestamp } scanner = self.scanners[scanner_id] scanner["position"] = {"x": x, "y": y} scanner["rssi_history"].append(rssi) scanner["last_seen"] = timestamp if len(scanner["rssi_history"]) > self.smoothing_window: scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:] self._coord_mode = 'local' def rssi_to_distance(self, rssi: float) -> float: """ Convert RSSI to estimated distance using log-distance path loss model. d = 10^((RSSI_1m - RSSI) / (10 * n)) Args: rssi: RSSI value (negative dBm) Returns: Estimated distance in meters """ return 10 ** ((self.rssi_at_1m - rssi) / (10 * self.path_loss_n)) def calculate_position(self) -> dict: """ Calculate target position using multilateration. Requires at least 3 active scanners with recent readings. Uses weighted least squares optimization. Returns: dict with position, confidence, and scanner info, or error """ # Snapshot scanner data under lock with self._lock: active_scanners = [ (sid, { "position": dict(s["position"]), "rssi_history": list(s["rssi_history"]), "last_seen": s["last_seen"] }) for sid, s in self.scanners.items() if s["rssi_history"] ] if len(active_scanners) < 3: return { "error": f"Need at least 3 active scanners (have {len(active_scanners)})", "scanners_count": len(active_scanners) } # Determine coordinate mode from first scanner first_pos = active_scanners[0][1]["position"] is_gps = "lat" in first_pos # Prepare data arrays positions = [] distances = [] weights = [] # Reference point for GPS conversion (centroid) if is_gps: ref_lat = sum(s["position"]["lat"] for _, s in active_scanners) / len(active_scanners) ref_lon = sum(s["position"]["lon"] for _, s in active_scanners) / len(active_scanners) for scanner_id, scanner in active_scanners: pos = scanner["position"] if is_gps: # Convert GPS to local meters relative to reference x = self.haversine_distance(ref_lat, ref_lon, ref_lat, pos["lon"]) if pos["lon"] < ref_lon: x = -x y = self.haversine_distance(ref_lat, ref_lon, pos["lat"], ref_lon) if pos["lat"] < ref_lat: y = -y else: x, y = pos["x"], pos["y"] # Average RSSI for noise reduction avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"]) distance = self.rssi_to_distance(avg_rssi) positions.append([x, y]) distances.append(distance) # Weight by signal strength (stronger signal = more reliable) weights.append(1.0 / (abs(avg_rssi) ** 2)) positions = np.array(positions) distances = np.array(distances) weights = np.array(weights) weights = weights / weights.sum() # Cost function def cost_function(point): x, y = point estimated_distances = np.sqrt((positions[:, 0] - x)**2 + (positions[:, 1] - y)**2) errors = (estimated_distances - distances) ** 2 return np.sum(weights * errors) # Initial guess: weighted centroid x0 = np.sum(weights * positions[:, 0]) y0 = np.sum(weights * positions[:, 1]) # Optimize result = minimize(cost_function, [x0, y0], method='L-BFGS-B') if result.success: target_x, target_y = result.x confidence = 1.0 / (1.0 + result.fun) if is_gps: # Convert back to GPS delta_lat, delta_lon = self.meters_to_degrees(1, ref_lat) target_lat = ref_lat + target_y * delta_lat target_lon = ref_lon + target_x * delta_lon self._last_target = { "lat": round(float(target_lat), 6), "lon": round(float(target_lon), 6) } else: self._last_target = { "x": round(float(target_x), 2), "y": round(float(target_y), 2) } with self._lock: self._last_calculation = time.time() return { "position": self._last_target, "confidence": round(float(confidence), 3), "scanners_used": len(active_scanners), "calculated_at": self._last_calculation } else: return { "error": "Optimization failed", "details": result.message } def get_state(self) -> dict: """ Get the current state of the MLAT system. Returns: dict with scanner info and last target position """ now = time.time() scanners_data = [] with self._lock: scanners_snapshot = dict(self.scanners) last_target = self._last_target last_calc = self._last_calculation coord_mode = self._coord_mode for scanner_id, scanner in scanners_snapshot.items(): avg_rssi = None distance = None if scanner["rssi_history"]: avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"]) distance = round(self.rssi_to_distance(avg_rssi), 2) avg_rssi = round(avg_rssi, 1) scanners_data.append({ "id": scanner_id, "position": scanner["position"], "last_rssi": avg_rssi, "estimated_distance": distance, "last_seen": scanner["last_seen"], "age_seconds": round(now - scanner["last_seen"], 1) }) result = { "scanners": scanners_data, "scanners_count": len(scanners_data), "target": None, "config": { "rssi_at_1m": self.rssi_at_1m, "path_loss_n": self.path_loss_n, "smoothing_window": self.smoothing_window }, "coord_mode": coord_mode } # Add target if available if last_target and (now - last_calc) < 60: result["target"] = { "position": last_target, "calculated_at": last_calc, "age_seconds": round(now - last_calc, 1) } return result def update_config(self, rssi_at_1m: float = None, path_loss_n: float = None, smoothing_window: int = None): """ Update MLAT configuration parameters. Args: rssi_at_1m: New RSSI at 1m value path_loss_n: New path loss exponent smoothing_window: New smoothing window size """ if rssi_at_1m is not None: self.rssi_at_1m = rssi_at_1m if path_loss_n is not None: self.path_loss_n = path_loss_n if smoothing_window is not None: self.smoothing_window = max(1, smoothing_window) def clear(self): """Clear all scanner data and reset state.""" with self._lock: self.scanners.clear() self._last_target = None self._last_calculation = 0