Crypto: - Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD - HKDF-SHA256 key derivation from per-device factory NVS master keys - Random 12-byte nonce per message (ESP32 hardware RNG) - crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2) - Custom partition table with factory NVS (fctry at 0x10000) Firmware: - crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt - crypto_init() at boot with esp_restart() on failure - Fix command_t initializations across all modules (sub/help fields) - Clean CMakeLists dependencies for ESP-IDF v5.3.2 C3PO (C2): - Rename tools/c2 + tools/c3po -> tools/C3PO - Per-device CryptoContext with HKDF key derivation - KeyStore (keys.json) for master key management - Transport parses device_id:base64(...) wire format Tools: - New tools/provisioning/provision.py for factory NVS key generation - Updated flasher with mbedtls config for v5.3.2 Docs: - Update all READMEs for new crypto, C3PO paths, provisioning - Update roadmap, architecture diagrams, security sections - Update CONTRIBUTING.md project structure
480 lines
16 KiB
Python
480 lines
16 KiB
Python
"""UDP server for receiving camera frames from ESP devices.
|
|
|
|
Protocol from ESP32:
|
|
- TOKEN + "START" -> Start of new frame
|
|
- TOKEN + chunk -> JPEG data chunk
|
|
- TOKEN + "END" -> End of frame, decode and process
|
|
"""
|
|
|
|
import os
|
|
import socket
|
|
import threading
|
|
import time
|
|
import cv2
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from typing import Optional, Callable, Dict
|
|
|
|
from .config import (
|
|
UDP_HOST, UDP_PORT, UDP_BUFFER_SIZE,
|
|
SECRET_TOKEN, IMAGE_DIR,
|
|
VIDEO_FPS, VIDEO_CODEC
|
|
)
|
|
|
|
# Camera timeout - mark as inactive after this many seconds without frames
|
|
CAMERA_TIMEOUT_SECONDS = 5
|
|
|
|
|
|
class FrameAssembler:
|
|
"""Assembles JPEG frames from multiple UDP packets."""
|
|
|
|
def __init__(self, timeout: float = 5.0):
|
|
self.timeout = timeout
|
|
self.buffer = bytearray()
|
|
self.start_time: Optional[float] = None
|
|
self.receiving = False
|
|
|
|
def start_frame(self):
|
|
self.buffer = bytearray()
|
|
self.start_time = time.time()
|
|
self.receiving = True
|
|
|
|
def add_chunk(self, data: bytes) -> bool:
|
|
if not self.receiving:
|
|
return False
|
|
if self.start_time and (time.time() - self.start_time) > self.timeout:
|
|
self.reset()
|
|
return False
|
|
self.buffer.extend(data)
|
|
return True
|
|
|
|
def finish_frame(self) -> Optional[bytes]:
|
|
if not self.receiving or len(self.buffer) == 0:
|
|
return None
|
|
data = bytes(self.buffer)
|
|
self.reset()
|
|
return data
|
|
|
|
def reset(self):
|
|
self.buffer = bytearray()
|
|
self.start_time = None
|
|
self.receiving = False
|
|
|
|
|
|
class CameraRecorder:
|
|
"""Handles video recording for a single camera."""
|
|
|
|
def __init__(self, camera_id: str, output_dir: str):
|
|
self.camera_id = camera_id
|
|
self.output_dir = output_dir
|
|
self._writer: Optional[cv2.VideoWriter] = None
|
|
self._video_size: Optional[tuple] = None
|
|
self._recording = False
|
|
self._filename: Optional[str] = None
|
|
self._frame_count = 0
|
|
self._start_time: Optional[float] = None
|
|
|
|
@property
|
|
def is_recording(self) -> bool:
|
|
return self._recording
|
|
|
|
@property
|
|
def filename(self) -> Optional[str]:
|
|
return self._filename
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
if self._start_time:
|
|
return time.time() - self._start_time
|
|
return 0
|
|
|
|
@property
|
|
def frame_count(self) -> int:
|
|
return self._frame_count
|
|
|
|
def start(self) -> str:
|
|
if self._recording:
|
|
return self._filename
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_id = self.camera_id.replace(":", "_").replace(".", "_")
|
|
self._filename = f"recording_{safe_id}_{timestamp}.avi"
|
|
self._recording = True
|
|
self._frame_count = 0
|
|
self._start_time = time.time()
|
|
return self._filename
|
|
|
|
def stop(self) -> dict:
|
|
if not self._recording:
|
|
return {"error": "Not recording"}
|
|
|
|
self._recording = False
|
|
result = {
|
|
"filename": self._filename,
|
|
"frames": self._frame_count,
|
|
"duration": self.duration
|
|
}
|
|
|
|
if self._writer:
|
|
self._writer.release()
|
|
self._writer = None
|
|
|
|
self._video_size = None
|
|
return result
|
|
|
|
def write_frame(self, frame: np.ndarray):
|
|
if not self._recording:
|
|
return
|
|
|
|
if self._writer is None:
|
|
self._video_size = (frame.shape[1], frame.shape[0])
|
|
fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
|
|
video_path = os.path.join(self.output_dir, self._filename)
|
|
self._writer = cv2.VideoWriter(
|
|
video_path, fourcc, VIDEO_FPS, self._video_size
|
|
)
|
|
|
|
if self._writer and self._writer.isOpened():
|
|
self._writer.write(frame)
|
|
self._frame_count += 1
|
|
|
|
|
|
class UDPReceiver:
|
|
"""Receives JPEG frames via UDP from ESP camera devices."""
|
|
|
|
def __init__(self,
|
|
host: str = UDP_HOST,
|
|
port: int = UDP_PORT,
|
|
image_dir: str = IMAGE_DIR,
|
|
on_frame: Optional[Callable] = None,
|
|
device_registry=None):
|
|
self.host = host
|
|
self.port = port
|
|
self.image_dir = image_dir
|
|
self.on_frame = on_frame
|
|
self.device_registry = device_registry
|
|
|
|
self._sock: Optional[socket.socket] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
|
|
# Frame assemblers per source address
|
|
self._assemblers: Dict[str, FrameAssembler] = {}
|
|
|
|
# Per-camera recorders (keyed by device_id)
|
|
self._recorders: Dict[str, CameraRecorder] = {}
|
|
self._recordings_dir = os.path.join(os.path.dirname(image_dir), "recordings")
|
|
|
|
# IP to device_id mapping cache
|
|
self._ip_to_device: Dict[str, str] = {}
|
|
|
|
# Statistics (protected by _stats_lock)
|
|
self._stats_lock = threading.Lock()
|
|
self.frames_received = 0
|
|
self.invalid_tokens = 0
|
|
self.decode_errors = 0
|
|
self.packets_received = 0
|
|
|
|
# Active cameras tracking (protected by _cameras_lock)
|
|
self._cameras_lock = threading.Lock()
|
|
self._active_cameras: Dict[str, dict] = {}
|
|
|
|
os.makedirs(self.image_dir, exist_ok=True)
|
|
os.makedirs(self._recordings_dir, exist_ok=True)
|
|
|
|
def set_device_registry(self, registry):
|
|
"""Set device registry for IP to device_id lookup."""
|
|
self.device_registry = registry
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._thread is not None and self._thread.is_alive()
|
|
|
|
@property
|
|
def active_cameras(self) -> list:
|
|
"""Returns list of active camera device IDs."""
|
|
with self._cameras_lock:
|
|
return [cid for cid, info in self._active_cameras.items() if info.get("active", False)]
|
|
|
|
def _get_device_id_from_ip(self, ip: str) -> Optional[str]:
|
|
"""Look up device_id from IP address using device registry."""
|
|
# Check cache first
|
|
if ip in self._ip_to_device:
|
|
return self._ip_to_device[ip]
|
|
|
|
# Look up in device registry
|
|
if self.device_registry:
|
|
for device in self.device_registry.all():
|
|
if device.address and device.address[0] == ip:
|
|
self._ip_to_device[ip] = device.id
|
|
return device.id
|
|
|
|
return None
|
|
|
|
def start(self) -> bool:
|
|
if self.is_running:
|
|
return False
|
|
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._receive_loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
# Start timeout checker
|
|
self._timeout_thread = threading.Thread(target=self._timeout_checker, daemon=True)
|
|
self._timeout_thread.start()
|
|
|
|
return True
|
|
|
|
def stop(self):
|
|
self._stop_event.set()
|
|
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
|
|
for recorder in self._recorders.values():
|
|
if recorder.is_recording:
|
|
recorder.stop()
|
|
|
|
self._cleanup_frames()
|
|
self._active_cameras.clear()
|
|
self._assemblers.clear()
|
|
self._recorders.clear()
|
|
self._ip_to_device.clear()
|
|
self.frames_received = 0
|
|
self.packets_received = 0
|
|
|
|
def _cleanup_frames(self):
|
|
"""Remove all .jpg files from image directory."""
|
|
try:
|
|
for f in os.listdir(self.image_dir):
|
|
if f.endswith(".jpg"):
|
|
os.remove(os.path.join(self.image_dir, f))
|
|
except Exception:
|
|
pass
|
|
|
|
def _timeout_checker(self):
|
|
"""Check for camera timeouts and mark them as inactive."""
|
|
while not self._stop_event.is_set():
|
|
time.sleep(1)
|
|
now = time.time()
|
|
|
|
for camera_id, info in list(self._active_cameras.items()):
|
|
last_frame = info.get("last_frame", 0)
|
|
was_active = info.get("active", False)
|
|
|
|
if now - last_frame > CAMERA_TIMEOUT_SECONDS:
|
|
if was_active:
|
|
self._active_cameras[camera_id]["active"] = False
|
|
# Remove the frame file so frontend shows default image
|
|
self._remove_camera_frame(camera_id)
|
|
|
|
def _remove_camera_frame(self, camera_id: str):
|
|
"""Remove the frame file for a camera."""
|
|
try:
|
|
filepath = os.path.join(self.image_dir, f"{camera_id}.jpg")
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_assembler(self, addr: tuple) -> FrameAssembler:
|
|
key = f"{addr[0]}:{addr[1]}"
|
|
if key not in self._assemblers:
|
|
self._assemblers[key] = FrameAssembler()
|
|
return self._assemblers[key]
|
|
|
|
def _get_recorder(self, camera_id: str) -> CameraRecorder:
|
|
if camera_id not in self._recorders:
|
|
self._recorders[camera_id] = CameraRecorder(camera_id, self._recordings_dir)
|
|
return self._recorders[camera_id]
|
|
|
|
def _receive_loop(self):
|
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self._sock.bind((self.host, self.port))
|
|
self._sock.settimeout(1.0)
|
|
|
|
print(f"[UDP] Receiver started on {self.host}:{self.port}")
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
data, addr = self._sock.recvfrom(UDP_BUFFER_SIZE)
|
|
except socket.timeout:
|
|
continue
|
|
except OSError:
|
|
break
|
|
|
|
with self._stats_lock:
|
|
self.packets_received += 1
|
|
|
|
if not data.startswith(SECRET_TOKEN):
|
|
with self._stats_lock:
|
|
self.invalid_tokens += 1
|
|
continue
|
|
|
|
payload = data[len(SECRET_TOKEN):]
|
|
assembler = self._get_assembler(addr)
|
|
|
|
# Try to get device_id from IP, fallback to IP if not found
|
|
ip = addr[0]
|
|
device_id = self._get_device_id_from_ip(ip)
|
|
if not device_id:
|
|
# Fallback: use IP (without port to avoid duplicates)
|
|
device_id = ip.replace(".", "_")
|
|
|
|
if payload == b"START":
|
|
assembler.start_frame()
|
|
continue
|
|
elif payload == b"END":
|
|
frame_data = assembler.finish_frame()
|
|
if frame_data:
|
|
self._process_complete_frame(device_id, frame_data, addr)
|
|
continue
|
|
else:
|
|
if not assembler.receiving:
|
|
frame = self._decode_frame(payload)
|
|
if frame is not None:
|
|
self._process_frame(device_id, frame, addr)
|
|
else:
|
|
with self._stats_lock:
|
|
self.decode_errors += 1
|
|
else:
|
|
assembler.add_chunk(payload)
|
|
|
|
if self._sock:
|
|
self._sock.close()
|
|
self._sock = None
|
|
|
|
print("[UDP] Receiver stopped")
|
|
|
|
def _process_complete_frame(self, camera_id: str, frame_data: bytes, addr: tuple):
|
|
frame = self._decode_frame(frame_data)
|
|
if frame is None:
|
|
with self._stats_lock:
|
|
self.decode_errors += 1
|
|
return
|
|
self._process_frame(camera_id, frame, addr)
|
|
|
|
def _process_frame(self, camera_id: str, frame: np.ndarray, addr: tuple):
|
|
with self._stats_lock:
|
|
self.frames_received += 1
|
|
|
|
# Update camera tracking
|
|
with self._cameras_lock:
|
|
self._active_cameras[camera_id] = {
|
|
"last_frame": time.time(),
|
|
"active": True,
|
|
"addr": addr
|
|
}
|
|
|
|
# Save frame
|
|
self._save_frame(camera_id, frame)
|
|
|
|
# Record if recording is active for this camera
|
|
recorder = self._get_recorder(camera_id)
|
|
if recorder.is_recording:
|
|
recorder.write_frame(frame)
|
|
|
|
if self.on_frame:
|
|
self.on_frame(camera_id, frame, addr)
|
|
|
|
def _decode_frame(self, data: bytes) -> Optional[np.ndarray]:
|
|
try:
|
|
npdata = np.frombuffer(data, np.uint8)
|
|
frame = cv2.imdecode(npdata, cv2.IMREAD_COLOR)
|
|
return frame
|
|
except Exception:
|
|
return None
|
|
|
|
def _save_frame(self, camera_id: str, frame: np.ndarray):
|
|
try:
|
|
filepath = os.path.join(self.image_dir, f"{camera_id}.jpg")
|
|
cv2.imwrite(filepath, frame)
|
|
except Exception:
|
|
pass
|
|
|
|
# === Recording API ===
|
|
|
|
def start_recording(self, camera_id: str) -> dict:
|
|
if camera_id not in self._active_cameras or not self._active_cameras[camera_id].get("active"):
|
|
return {"error": f"Camera {camera_id} not active"}
|
|
|
|
recorder = self._get_recorder(camera_id)
|
|
if recorder.is_recording:
|
|
return {"error": "Already recording", "filename": recorder.filename}
|
|
|
|
filename = recorder.start()
|
|
return {"status": "recording", "filename": filename, "camera_id": camera_id}
|
|
|
|
def stop_recording(self, camera_id: str) -> dict:
|
|
if camera_id not in self._recorders:
|
|
return {"error": f"No recorder for {camera_id}"}
|
|
|
|
recorder = self._recorders[camera_id]
|
|
if not recorder.is_recording:
|
|
return {"error": "Not recording"}
|
|
|
|
result = recorder.stop()
|
|
result["camera_id"] = camera_id
|
|
result["path"] = os.path.join(self._recordings_dir, result["filename"])
|
|
return result
|
|
|
|
def get_recording_status(self, camera_id: str = None) -> dict:
|
|
if camera_id:
|
|
if camera_id not in self._recorders:
|
|
return {"camera_id": camera_id, "recording": False}
|
|
recorder = self._recorders[camera_id]
|
|
return {
|
|
"camera_id": camera_id,
|
|
"recording": recorder.is_recording,
|
|
"filename": recorder.filename,
|
|
"duration": recorder.duration,
|
|
"frames": recorder.frame_count
|
|
}
|
|
|
|
result = {}
|
|
for cid, info in self._active_cameras.items():
|
|
if info.get("active"):
|
|
recorder = self._get_recorder(cid)
|
|
result[cid] = {
|
|
"recording": recorder.is_recording,
|
|
"filename": recorder.filename if recorder.is_recording else None,
|
|
"duration": recorder.duration if recorder.is_recording else 0
|
|
}
|
|
return result
|
|
|
|
def list_recordings(self) -> list:
|
|
try:
|
|
files = []
|
|
for f in os.listdir(self._recordings_dir):
|
|
if f.endswith(".avi"):
|
|
path = os.path.join(self._recordings_dir, f)
|
|
stat = os.stat(path)
|
|
files.append({
|
|
"filename": f,
|
|
"size": stat.st_size,
|
|
"created": stat.st_mtime
|
|
})
|
|
return sorted(files, key=lambda x: x["created"], reverse=True)
|
|
except Exception:
|
|
return []
|
|
|
|
def get_stats(self) -> dict:
|
|
recording_count = sum(1 for r in self._recorders.values() if r.is_recording)
|
|
with self._cameras_lock:
|
|
active_count = sum(1 for info in self._active_cameras.values() if info.get("active"))
|
|
with self._stats_lock:
|
|
return {
|
|
"running": self.is_running,
|
|
"packets_received": self.packets_received,
|
|
"frames_received": self.frames_received,
|
|
"invalid_tokens": self.invalid_tokens,
|
|
"decode_errors": self.decode_errors,
|
|
"active_cameras": active_count,
|
|
"active_recordings": recording_count
|
|
}
|