430 lines
14 KiB
Python
430 lines
14 KiB
Python
"""MLAT (Multilateration) engine for device positioning with GPS support."""
|
|
|
|
import time
|
|
import re
|
|
import math
|
|
from typing import Optional, Tuple
|
|
import numpy as np
|
|
from scipy.optimize import minimize
|
|
|
|
|
|
class MlatEngine:
|
|
"""
|
|
Calculates target position from multiple scanner RSSI readings.
|
|
|
|
Supports both:
|
|
- GPS coordinates (lat, lon) for outdoor tracking
|
|
- Local coordinates (x, y in meters) for indoor tracking
|
|
|
|
Uses the log-distance path loss model to convert RSSI to distance,
|
|
then weighted least squares optimization for position estimation.
|
|
"""
|
|
|
|
# Earth radius in meters (for GPS calculations)
|
|
EARTH_RADIUS = 6371000
|
|
|
|
def __init__(self, rssi_at_1m: float = -40, path_loss_n: float = 2.5, smoothing_window: int = 5):
|
|
"""
|
|
Initialize the MLAT engine.
|
|
|
|
Args:
|
|
rssi_at_1m: RSSI value at 1 meter distance (calibration, typically -40 to -50)
|
|
path_loss_n: Path loss exponent (2.0 free space, 2.5-3.5 indoors)
|
|
smoothing_window: Number of readings to average for noise reduction
|
|
"""
|
|
self.rssi_at_1m = rssi_at_1m
|
|
self.path_loss_n = path_loss_n
|
|
self.smoothing_window = smoothing_window
|
|
|
|
# Scanner data: {scanner_id: {"position": {"lat": x, "lon": y} or {"x": x, "y": y}, ...}}
|
|
self.scanners: dict = {}
|
|
|
|
# Last calculated target position
|
|
self._last_target: Optional[dict] = None
|
|
self._last_calculation: float = 0
|
|
|
|
# Coordinate mode: 'gps' or 'local'
|
|
self._coord_mode = 'gps'
|
|
|
|
@staticmethod
|
|
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""
|
|
Calculate distance between two GPS points using Haversine formula.
|
|
|
|
Args:
|
|
lat1, lon1: First point (degrees)
|
|
lat2, lon2: Second point (degrees)
|
|
|
|
Returns:
|
|
Distance in meters
|
|
"""
|
|
lat1_rad = math.radians(lat1)
|
|
lat2_rad = math.radians(lat2)
|
|
delta_lat = math.radians(lat2 - lat1)
|
|
delta_lon = math.radians(lon2 - lon1)
|
|
|
|
a = (math.sin(delta_lat / 2) ** 2 +
|
|
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2)
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
return MlatEngine.EARTH_RADIUS * c
|
|
|
|
@staticmethod
|
|
def meters_to_degrees(meters: float, latitude: float) -> Tuple[float, float]:
|
|
"""
|
|
Convert meters to approximate degrees at a given latitude.
|
|
|
|
Args:
|
|
meters: Distance in meters
|
|
latitude: Reference latitude (for longitude scaling)
|
|
|
|
Returns:
|
|
(delta_lat, delta_lon) in degrees
|
|
"""
|
|
delta_lat = meters / 111320 # ~111.32 km per degree latitude
|
|
delta_lon = meters / (111320 * math.cos(math.radians(latitude)))
|
|
return delta_lat, delta_lon
|
|
|
|
def parse_mlat_message(self, scanner_id: str, message: str) -> bool:
|
|
"""
|
|
Parse MLAT message from ESP32 device.
|
|
|
|
New format with coordinate type prefix:
|
|
MLAT:G;<lat>;<lon>;<rssi> - GPS coordinates
|
|
MLAT:L;<x>;<y>;<rssi> - Local coordinates (meters)
|
|
|
|
Legacy format (backward compatible):
|
|
MLAT:<lat>;<lon>;<rssi> - Treated as GPS
|
|
|
|
Args:
|
|
scanner_id: Device ID that sent the message
|
|
message: Raw message content (without MLAT: prefix)
|
|
|
|
Returns:
|
|
True if successfully parsed, False otherwise
|
|
"""
|
|
# New format with type prefix: G;lat;lon;rssi or L;x;y;rssi
|
|
pattern_new = re.compile(r'^([GL]);([0-9.+-]+);([0-9.+-]+);(-?\d+)$')
|
|
match = pattern_new.match(message)
|
|
|
|
if match:
|
|
coord_type = match.group(1)
|
|
c1 = float(match.group(2))
|
|
c2 = float(match.group(3))
|
|
rssi = int(match.group(4))
|
|
|
|
if coord_type == 'G':
|
|
self.add_reading_gps(scanner_id, c1, c2, rssi)
|
|
else: # 'L' - local
|
|
self.add_reading(scanner_id, c1, c2, rssi)
|
|
return True
|
|
|
|
# Legacy format: lat;lon;rssi (backward compatible - treat as GPS)
|
|
pattern_legacy = re.compile(r'^([0-9.+-]+);([0-9.+-]+);(-?\d+)$')
|
|
match = pattern_legacy.match(message)
|
|
|
|
if match:
|
|
lat = float(match.group(1))
|
|
lon = float(match.group(2))
|
|
rssi = int(match.group(3))
|
|
self.add_reading_gps(scanner_id, lat, lon, rssi)
|
|
return True
|
|
|
|
return False
|
|
|
|
def parse_data(self, raw_data: str) -> int:
|
|
"""
|
|
Parse raw MLAT data from HTTP POST.
|
|
|
|
Format: SCANNER_ID;(lat,lon);rssi
|
|
Example: ESP3;(48.8566,2.3522);-45
|
|
|
|
Args:
|
|
raw_data: Raw text data with one or more readings
|
|
|
|
Returns:
|
|
Number of readings successfully processed
|
|
"""
|
|
pattern = re.compile(r'^(\w+);\(([0-9.+-]+),([0-9.+-]+)\);(-?\d+)$')
|
|
count = 0
|
|
timestamp = time.time()
|
|
|
|
for line in raw_data.strip().split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
match = pattern.match(line)
|
|
if match:
|
|
scanner_id = match.group(1)
|
|
lat = float(match.group(2))
|
|
lon = float(match.group(3))
|
|
rssi = int(match.group(4))
|
|
|
|
self.add_reading_gps(scanner_id, lat, lon, rssi, timestamp)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def add_reading_gps(self, scanner_id: str, lat: float, lon: float, rssi: int, timestamp: float = None):
|
|
"""
|
|
Add a new RSSI reading from a scanner with GPS coordinates.
|
|
|
|
Args:
|
|
scanner_id: Unique identifier for the scanner
|
|
lat: Latitude of the scanner
|
|
lon: Longitude of the scanner
|
|
rssi: RSSI value (negative dBm)
|
|
timestamp: Reading timestamp (defaults to current time)
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time.time()
|
|
|
|
if scanner_id not in self.scanners:
|
|
self.scanners[scanner_id] = {
|
|
"position": {"lat": lat, "lon": lon},
|
|
"rssi_history": [],
|
|
"last_seen": timestamp
|
|
}
|
|
|
|
scanner = self.scanners[scanner_id]
|
|
scanner["position"] = {"lat": lat, "lon": lon}
|
|
scanner["rssi_history"].append(rssi)
|
|
scanner["last_seen"] = timestamp
|
|
|
|
# Keep only recent readings for smoothing
|
|
if len(scanner["rssi_history"]) > self.smoothing_window:
|
|
scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:]
|
|
|
|
self._coord_mode = 'gps'
|
|
|
|
def add_reading(self, scanner_id: str, x: float, y: float, rssi: int, timestamp: float = None):
|
|
"""
|
|
Add a new RSSI reading from a scanner with local coordinates.
|
|
|
|
Args:
|
|
scanner_id: Unique identifier for the scanner
|
|
x: X coordinate of the scanner (meters)
|
|
y: Y coordinate of the scanner (meters)
|
|
rssi: RSSI value (negative dBm)
|
|
timestamp: Reading timestamp (defaults to current time)
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time.time()
|
|
|
|
if scanner_id not in self.scanners:
|
|
self.scanners[scanner_id] = {
|
|
"position": {"x": x, "y": y},
|
|
"rssi_history": [],
|
|
"last_seen": timestamp
|
|
}
|
|
|
|
scanner = self.scanners[scanner_id]
|
|
scanner["position"] = {"x": x, "y": y}
|
|
scanner["rssi_history"].append(rssi)
|
|
scanner["last_seen"] = timestamp
|
|
|
|
if len(scanner["rssi_history"]) > self.smoothing_window:
|
|
scanner["rssi_history"] = scanner["rssi_history"][-self.smoothing_window:]
|
|
|
|
self._coord_mode = 'local'
|
|
|
|
def rssi_to_distance(self, rssi: float) -> float:
|
|
"""
|
|
Convert RSSI to estimated distance using log-distance path loss model.
|
|
|
|
d = 10^((RSSI_1m - RSSI) / (10 * n))
|
|
|
|
Args:
|
|
rssi: RSSI value (negative dBm)
|
|
|
|
Returns:
|
|
Estimated distance in meters
|
|
"""
|
|
return 10 ** ((self.rssi_at_1m - rssi) / (10 * self.path_loss_n))
|
|
|
|
def calculate_position(self) -> dict:
|
|
"""
|
|
Calculate target position using multilateration.
|
|
|
|
Requires at least 3 active scanners with recent readings.
|
|
Uses weighted least squares optimization.
|
|
|
|
Returns:
|
|
dict with position, confidence, and scanner info, or error
|
|
"""
|
|
# Get active scanners (those with readings)
|
|
active_scanners = [
|
|
(sid, s) for sid, s in self.scanners.items()
|
|
if s["rssi_history"]
|
|
]
|
|
|
|
if len(active_scanners) < 3:
|
|
return {
|
|
"error": f"Need at least 3 active scanners (have {len(active_scanners)})",
|
|
"scanners_count": len(active_scanners)
|
|
}
|
|
|
|
# Determine coordinate mode from first scanner
|
|
first_pos = active_scanners[0][1]["position"]
|
|
is_gps = "lat" in first_pos
|
|
|
|
# Prepare data arrays
|
|
positions = []
|
|
distances = []
|
|
weights = []
|
|
|
|
# Reference point for GPS conversion (centroid)
|
|
if is_gps:
|
|
ref_lat = sum(s["position"]["lat"] for _, s in active_scanners) / len(active_scanners)
|
|
ref_lon = sum(s["position"]["lon"] for _, s in active_scanners) / len(active_scanners)
|
|
|
|
for scanner_id, scanner in active_scanners:
|
|
pos = scanner["position"]
|
|
|
|
if is_gps:
|
|
# Convert GPS to local meters relative to reference
|
|
x = self.haversine_distance(ref_lat, ref_lon, ref_lat, pos["lon"])
|
|
if pos["lon"] < ref_lon:
|
|
x = -x
|
|
y = self.haversine_distance(ref_lat, ref_lon, pos["lat"], ref_lon)
|
|
if pos["lat"] < ref_lat:
|
|
y = -y
|
|
else:
|
|
x, y = pos["x"], pos["y"]
|
|
|
|
# Average RSSI for noise reduction
|
|
avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"])
|
|
distance = self.rssi_to_distance(avg_rssi)
|
|
|
|
positions.append([x, y])
|
|
distances.append(distance)
|
|
|
|
# Weight by signal strength (stronger signal = more reliable)
|
|
weights.append(1.0 / (abs(avg_rssi) ** 2))
|
|
|
|
positions = np.array(positions)
|
|
distances = np.array(distances)
|
|
weights = np.array(weights)
|
|
weights = weights / weights.sum()
|
|
|
|
# Cost function
|
|
def cost_function(point):
|
|
x, y = point
|
|
estimated_distances = np.sqrt((positions[:, 0] - x)**2 + (positions[:, 1] - y)**2)
|
|
errors = (estimated_distances - distances) ** 2
|
|
return np.sum(weights * errors)
|
|
|
|
# Initial guess: weighted centroid
|
|
x0 = np.sum(weights * positions[:, 0])
|
|
y0 = np.sum(weights * positions[:, 1])
|
|
|
|
# Optimize
|
|
result = minimize(cost_function, [x0, y0], method='L-BFGS-B')
|
|
|
|
if result.success:
|
|
target_x, target_y = result.x
|
|
confidence = 1.0 / (1.0 + result.fun)
|
|
|
|
if is_gps:
|
|
# Convert back to GPS
|
|
delta_lat, delta_lon = self.meters_to_degrees(1, ref_lat)
|
|
target_lat = ref_lat + target_y * delta_lat
|
|
target_lon = ref_lon + target_x * delta_lon
|
|
|
|
self._last_target = {
|
|
"lat": round(float(target_lat), 6),
|
|
"lon": round(float(target_lon), 6)
|
|
}
|
|
else:
|
|
self._last_target = {
|
|
"x": round(float(target_x), 2),
|
|
"y": round(float(target_y), 2)
|
|
}
|
|
|
|
self._last_calculation = time.time()
|
|
|
|
return {
|
|
"position": self._last_target,
|
|
"confidence": round(float(confidence), 3),
|
|
"scanners_used": len(active_scanners),
|
|
"calculated_at": self._last_calculation
|
|
}
|
|
else:
|
|
return {
|
|
"error": "Optimization failed",
|
|
"details": result.message
|
|
}
|
|
|
|
def get_state(self) -> dict:
|
|
"""
|
|
Get the current state of the MLAT system.
|
|
|
|
Returns:
|
|
dict with scanner info and last target position
|
|
"""
|
|
now = time.time()
|
|
scanners_data = []
|
|
|
|
for scanner_id, scanner in self.scanners.items():
|
|
avg_rssi = None
|
|
distance = None
|
|
|
|
if scanner["rssi_history"]:
|
|
avg_rssi = sum(scanner["rssi_history"]) / len(scanner["rssi_history"])
|
|
distance = round(self.rssi_to_distance(avg_rssi), 2)
|
|
avg_rssi = round(avg_rssi, 1)
|
|
|
|
scanners_data.append({
|
|
"id": scanner_id,
|
|
"position": scanner["position"],
|
|
"last_rssi": avg_rssi,
|
|
"estimated_distance": distance,
|
|
"last_seen": scanner["last_seen"],
|
|
"age_seconds": round(now - scanner["last_seen"], 1)
|
|
})
|
|
|
|
result = {
|
|
"scanners": scanners_data,
|
|
"scanners_count": len(scanners_data),
|
|
"target": None,
|
|
"config": {
|
|
"rssi_at_1m": self.rssi_at_1m,
|
|
"path_loss_n": self.path_loss_n,
|
|
"smoothing_window": self.smoothing_window
|
|
},
|
|
"coord_mode": self._coord_mode
|
|
}
|
|
|
|
# Add target if available
|
|
if self._last_target and (now - self._last_calculation) < 60:
|
|
result["target"] = {
|
|
"position": self._last_target,
|
|
"calculated_at": self._last_calculation,
|
|
"age_seconds": round(now - self._last_calculation, 1)
|
|
}
|
|
|
|
return result
|
|
|
|
def update_config(self, rssi_at_1m: float = None, path_loss_n: float = None, smoothing_window: int = None):
|
|
"""
|
|
Update MLAT configuration parameters.
|
|
|
|
Args:
|
|
rssi_at_1m: New RSSI at 1m value
|
|
path_loss_n: New path loss exponent
|
|
smoothing_window: New smoothing window size
|
|
"""
|
|
if rssi_at_1m is not None:
|
|
self.rssi_at_1m = rssi_at_1m
|
|
if path_loss_n is not None:
|
|
self.path_loss_n = path_loss_n
|
|
if smoothing_window is not None:
|
|
self.smoothing_window = max(1, smoothing_window)
|
|
|
|
def clear(self):
|
|
"""Clear all scanner data and reset state."""
|
|
self.scanners.clear()
|
|
self._last_target = None
|
|
self._last_calculation = 0
|