"""MLAT (Multilateration) engine for device positioning with GPS support.""" import time import re import math from typing import Optional, Tuple import numpy as np from scipy.optimize import minimize class MlatEngine: """ Calculates target position from multiple scanner RSSI readings. Supports both: - GPS coordinates (lat, lon) for outdoor tracking - Local coordinates (x, y in meters) for indoor tracking Uses the log-distance path loss model to convert RSSI to distance, then weighted least squares optimization for position estimation. """ # Earth radius in meters (for GPS calculations) EARTH_RADIUS = 6371000 def __init__(self, rssi_at_1m: float = -40, path_loss_n: float = 2.5, smoothing_window: int = 5): """ Initialize the MLAT engine. Args: rssi_at_1m: RSSI value at 1 meter distance (calibration, typically -40 to -50) path_loss_n: Path loss exponent (2.0 free space, 2.5-3.5 indoors) smoothing_window: Number of readings to average for noise reduction """ self.rssi_at_1m = rssi_at_1m self.path_loss_n = path_loss_n self.smoothing_window = smoothing_window # Scanner data: {scanner_id: {"position": {"lat": x, "lon": y} or {"x": x, "y": y}, ...}} self.scanners: dict = {} # Last calculated target position self._last_target: Optional[dict] = None self._last_calculation: float = 0 # Coordinate mode: 'gps' or 'local' self._coord_mode = 'gps' @staticmethod def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate distance between two GPS points using Haversine formula. Args: lat1, lon1: First point (degrees) lat2, lon2: Second point (degrees) Returns: Distance in meters """ lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = (math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return MlatEngine.EARTH_RADIUS * c @staticmethod def meters_to_degrees(meters: float, latitude: float) -> Tuple[float, float]: """ Convert meters to approximate degrees at a given latitude. Args: meters: Distance in meters latitude: Reference latitude (for longitude scaling) Returns: (delta_lat, delta_lon) in degrees """ delta_lat = meters / 111320 # ~111.32 km per degree latitude delta_lon = meters / (111320 * math.cos(math.radians(latitude))) return delta_lat, delta_lon def parse_mlat_message(self, scanner_id: str, message: str) -> bool: """ Parse MLAT message from ESP32 device. New format with coordinate type prefix: MLAT:G;;; - GPS coordinates MLAT:L;;; - Local coordinates (meters) Legacy format (backward compatible): MLAT:;; - Treated as GPS Args: scanner_id: Device ID that sent the message message: Raw message content (without MLAT: prefix) Returns: True if successfully parsed, False otherwise """ # New format with type prefix: G;lat;lon;rssi or L;x;y;rssi pattern_new = re.compile(r'^([GL]);([0-9.+-]+);([0-9.+-]+);(-?\d+)$') match = pattern_new.match(message) if match: coord_type = match.group(1) c1 = float(match.group(2)) c2 = float(match.group(3)) rssi = int(match.group(4)) if coord_type == 'G': self.add_reading_gps(scanner_id, c1, c2, rssi) else: # 'L' - local self.add_reading(scanner_id, c1, c2, rssi) return True # Legacy format: lat;lon;rssi (backward compatible - treat as GPS) pattern_legacy = re.compile(r'^([0-9.+-]+);([0-9.+-]+);(-?\d+)$') match = pattern_legacy.match(message) if match: lat = float(match.group(1)) lon = float(match.group(2)) rssi = int(match.group(3)) self.add_reading_gps(scanner_id, lat, lon, rssi) return True return False def parse_data(self, raw_data: str) -> int: """ Parse raw MLAT data from HTTP POST. Format: SCANNER_ID;(lat,lon);rssi Example: ESP3;(48.8566,2.3522);-45 Args: raw_data: Raw text data with one or more readings Returns: Number of readings successfully processed """ pattern = re.compile(r'^(\w+);\(([0-9.+-]+),([0-9.+-]+)\);(-?\d+)$') count = 0 timestamp = time.time() for line in raw_data.strip().split('\n'): line = line.strip() if not line: continue match = pattern.match(line) if match: scanner_id = match.group(1) lat = float(match.group(2)) lon = float(match.group(3)) rssi = int(match.group(4)) self.add_reading_gps(scanner_id, lat, lon, rssi, timestamp) count += 1 return count def add_reading_gps(self, scanner_id: str, lat: float, lon: float, rssi: int, timestamp: float = None): """ Add a new RSSI reading from a scanner with GPS coordinates. Args: scanner_id: Unique identifier for the scanner lat: Latitude of the scanner lon: Longitude of the scanner rssi: RSSI value (negative dBm) timestamp: Reading timestamp (defaults to current time) """ if timestamp is None: timestamp = time.time() if scanner_id not in self.scanners: self.scanners[scanner_id] = { "position": {"lat": lat, "lon": lon}, "rssi_history": [], "last_seen": timestamp } scanner = self.scanners[scanner_id] scanner["position"] = {"lat": lat, "lon": lon} scanner["rssi_history"].append(rssi) scanner["last_seen"] = timestamp # Keep only recent readings for smoothing if len(scanner["rssi_history"]) > self.smoothing_window: scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:] self._coord_mode = 'gps' def add_reading(self, scanner_id: str, x: float, y: float, rssi: int, timestamp: float = None): """ Add a new RSSI reading from a scanner with local coordinates. Args: scanner_id: Unique identifier for the scanner x: X coordinate of the scanner (meters) y: Y coordinate of the scanner (meters) rssi: RSSI value (negative dBm) timestamp: Reading timestamp (defaults to current time) """ if timestamp is None: timestamp = time.time() if scanner_id not in self.scanners: self.scanners[scanner_id] = { "position": {"x": x, "y": y}, "rssi_history": [], "last_seen": timestamp } scanner = self.scanners[scanner_id] scanner["position"] = {"x": x, "y": y} scanner["rssi_history"].append(rssi) scanner["last_seen"] = timestamp if len(scanner["rssi_history"]) > self.smoothing_window: scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:] self._coord_mode = 'local' def rssi_to_distance(self, rssi: float) -> float: """ Convert RSSI to estimated distance using log-distance path loss model. d = 10^((RSSI_1m - RSSI) / (10 * n)) Args: rssi: RSSI value (negative dBm) Returns: Estimated distance in meters """ return 10 ** ((self.rssi_at_1m - rssi) / (10 * self.path_loss_n)) def calculate_position(self) -> dict: """ Calculate target position using multilateration. Requires at least 3 active scanners with recent readings. Uses weighted least squares optimization. Returns: dict with position, confidence, and scanner info, or error """ # Get active scanners (those with readings) active_scanners = [ (sid, s) for sid, s in self.scanners.items() if s["rssi_history"] ] if len(active_scanners) < 3: return { "error": f"Need at least 3 active scanners (have {len(active_scanners)})", "scanners_count": len(active_scanners) } # Determine coordinate mode from first scanner first_pos = active_scanners[0][1]["position"] is_gps = "lat" in first_pos # Prepare data arrays positions = [] distances = [] weights = [] # Reference point for GPS conversion (centroid) if is_gps: ref_lat = sum(s["position"]["lat"] for _, s in active_scanners) / len(active_scanners) ref_lon = sum(s["position"]["lon"] for _, s in active_scanners) / len(active_scanners) for scanner_id, scanner in active_scanners: pos = scanner["position"] if is_gps: # Convert GPS to local meters relative to reference x = self.haversine_distance(ref_lat, ref_lon, ref_lat, pos["lon"]) if pos["lon"] < ref_lon: x = -x y = self.haversine_distance(ref_lat, ref_lon, pos["lat"], ref_lon) if pos["lat"] < ref_lat: y = -y else: x, y = pos["x"], pos["y"] # Average RSSI for noise reduction avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"]) distance = self.rssi_to_distance(avg_rssi) positions.append([x, y]) distances.append(distance) # Weight by signal strength (stronger signal = more reliable) weights.append(1.0 / (abs(avg_rssi) ** 2)) positions = np.array(positions) distances = np.array(distances) weights = np.array(weights) weights = weights / weights.sum() # Cost function def cost_function(point): x, y = point estimated_distances = np.sqrt((positions[:, 0] - x)**2 + (positions[:, 1] - y)**2) errors = (estimated_distances - distances) ** 2 return np.sum(weights * errors) # Initial guess: weighted centroid x0 = np.sum(weights * positions[:, 0]) y0 = np.sum(weights * positions[:, 1]) # Optimize result = minimize(cost_function, [x0, y0], method='L-BFGS-B') if result.success: target_x, target_y = result.x confidence = 1.0 / (1.0 + result.fun) if is_gps: # Convert back to GPS delta_lat, delta_lon = self.meters_to_degrees(1, ref_lat) target_lat = ref_lat + target_y * delta_lat target_lon = ref_lon + target_x * delta_lon self._last_target = { "lat": round(float(target_lat), 6), "lon": round(float(target_lon), 6) } else: self._last_target = { "x": round(float(target_x), 2), "y": round(float(target_y), 2) } self._last_calculation = time.time() return { "position": self._last_target, "confidence": round(float(confidence), 3), "scanners_used": len(active_scanners), "calculated_at": self._last_calculation } else: return { "error": "Optimization failed", "details": result.message } def get_state(self) -> dict: """ Get the current state of the MLAT system. Returns: dict with scanner info and last target position """ now = time.time() scanners_data = [] for scanner_id, scanner in self.scanners.items(): avg_rssi = None distance = None if scanner["rssi_history"]: avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"]) distance = round(self.rssi_to_distance(avg_rssi), 2) avg_rssi = round(avg_rssi, 1) scanners_data.append({ "id": scanner_id, "position": scanner["position"], "last_rssi": avg_rssi, "estimated_distance": distance, "last_seen": scanner["last_seen"], "age_seconds": round(now - scanner["last_seen"], 1) }) result = { "scanners": scanners_data, "scanners_count": len(scanners_data), "target": None, "config": { "rssi_at_1m": self.rssi_at_1m, "path_loss_n": self.path_loss_n, "smoothing_window": self.smoothing_window }, "coord_mode": self._coord_mode } # Add target if available if self._last_target and (now - self._last_calculation) < 60: result["target"] = { "position": self._last_target, "calculated_at": self._last_calculation, "age_seconds": round(now - self._last_calculation, 1) } return result def update_config(self, rssi_at_1m: float = None, path_loss_n: float = None, smoothing_window: int = None): """ Update MLAT configuration parameters. Args: rssi_at_1m: New RSSI at 1m value path_loss_n: New path loss exponent smoothing_window: New smoothing window size """ if rssi_at_1m is not None: self.rssi_at_1m = rssi_at_1m if path_loss_n is not None: self.path_loss_n = path_loss_n if smoothing_window is not None: self.smoothing_window = max(1, smoothing_window) def clear(self): """Clear all scanner data and reset state.""" self.scanners.clear() self._last_target = None self._last_calculation = 0