espilon-source/tools/C3PO/streams/udp_receiver.py
Eun0us 8b6c1cd53d ε - ChaCha20-Poly1305 AEAD + HKDF crypto upgrade + C3PO rewrite + docs
Crypto:
- Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD
- HKDF-SHA256 key derivation from per-device factory NVS master keys
- Random 12-byte nonce per message (ESP32 hardware RNG)
- crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2)
- Custom partition table with factory NVS (fctry at 0x10000)

Firmware:
- crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt
- crypto_init() at boot with esp_restart() on failure
- Fix command_t initializations across all modules (sub/help fields)
- Clean CMakeLists dependencies for ESP-IDF v5.3.2

C3PO (C2):
- Rename tools/c2 + tools/c3po -> tools/C3PO
- Per-device CryptoContext with HKDF key derivation
- KeyStore (keys.json) for master key management
- Transport parses device_id:base64(...) wire format

Tools:
- New tools/provisioning/provision.py for factory NVS key generation
- Updated flasher with mbedtls config for v5.3.2

Docs:
- Update all READMEs for new crypto, C3PO paths, provisioning
- Update roadmap, architecture diagrams, security sections
- Update CONTRIBUTING.md project structure
2026-02-10 21:28:45 +01:00

480 lines
16 KiB
Python

"""UDP server for receiving camera frames from ESP devices.
Protocol from ESP32:
- TOKEN + "START" -> Start of new frame
- TOKEN + chunk -> JPEG data chunk
- TOKEN + "END" -> End of frame, decode and process
"""
import os
import socket
import threading
import time
import cv2
import numpy as np
from datetime import datetime
from typing import Optional, Callable, Dict
from .config import (
UDP_HOST, UDP_PORT, UDP_BUFFER_SIZE,
SECRET_TOKEN, IMAGE_DIR,
VIDEO_FPS, VIDEO_CODEC
)
# Camera timeout - mark as inactive after this many seconds without frames
CAMERA_TIMEOUT_SECONDS = 5
class FrameAssembler:
"""Assembles JPEG frames from multiple UDP packets."""
def __init__(self, timeout: float = 5.0):
self.timeout = timeout
self.buffer = bytearray()
self.start_time: Optional[float] = None
self.receiving = False
def start_frame(self):
self.buffer = bytearray()
self.start_time = time.time()
self.receiving = True
def add_chunk(self, data: bytes) -> bool:
if not self.receiving:
return False
if self.start_time and (time.time() - self.start_time) > self.timeout:
self.reset()
return False
self.buffer.extend(data)
return True
def finish_frame(self) -> Optional[bytes]:
if not self.receiving or len(self.buffer) == 0:
return None
data = bytes(self.buffer)
self.reset()
return data
def reset(self):
self.buffer = bytearray()
self.start_time = None
self.receiving = False
class CameraRecorder:
"""Handles video recording for a single camera."""
def __init__(self, camera_id: str, output_dir: str):
self.camera_id = camera_id
self.output_dir = output_dir
self._writer: Optional[cv2.VideoWriter] = None
self._video_size: Optional[tuple] = None
self._recording = False
self._filename: Optional[str] = None
self._frame_count = 0
self._start_time: Optional[float] = None
@property
def is_recording(self) -> bool:
return self._recording
@property
def filename(self) -> Optional[str]:
return self._filename
@property
def duration(self) -> float:
if self._start_time:
return time.time() - self._start_time
return 0
@property
def frame_count(self) -> int:
return self._frame_count
def start(self) -> str:
if self._recording:
return self._filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_id = self.camera_id.replace(":", "_").replace(".", "_")
self._filename = f"recording_{safe_id}_{timestamp}.avi"
self._recording = True
self._frame_count = 0
self._start_time = time.time()
return self._filename
def stop(self) -> dict:
if not self._recording:
return {"error": "Not recording"}
self._recording = False
result = {
"filename": self._filename,
"frames": self._frame_count,
"duration": self.duration
}
if self._writer:
self._writer.release()
self._writer = None
self._video_size = None
return result
def write_frame(self, frame: np.ndarray):
if not self._recording:
return
if self._writer is None:
self._video_size = (frame.shape[1], frame.shape[0])
fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
video_path = os.path.join(self.output_dir, self._filename)
self._writer = cv2.VideoWriter(
video_path, fourcc, VIDEO_FPS, self._video_size
)
if self._writer and self._writer.isOpened():
self._writer.write(frame)
self._frame_count += 1
class UDPReceiver:
"""Receives JPEG frames via UDP from ESP camera devices."""
def __init__(self,
host: str = UDP_HOST,
port: int = UDP_PORT,
image_dir: str = IMAGE_DIR,
on_frame: Optional[Callable] = None,
device_registry=None):
self.host = host
self.port = port
self.image_dir = image_dir
self.on_frame = on_frame
self.device_registry = device_registry
self._sock: Optional[socket.socket] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Frame assemblers per source address
self._assemblers: Dict[str, FrameAssembler] = {}
# Per-camera recorders (keyed by device_id)
self._recorders: Dict[str, CameraRecorder] = {}
self._recordings_dir = os.path.join(os.path.dirname(image_dir), "recordings")
# IP to device_id mapping cache
self._ip_to_device: Dict[str, str] = {}
# Statistics (protected by _stats_lock)
self._stats_lock = threading.Lock()
self.frames_received = 0
self.invalid_tokens = 0
self.decode_errors = 0
self.packets_received = 0
# Active cameras tracking (protected by _cameras_lock)
self._cameras_lock = threading.Lock()
self._active_cameras: Dict[str, dict] = {}
os.makedirs(self.image_dir, exist_ok=True)
os.makedirs(self._recordings_dir, exist_ok=True)
def set_device_registry(self, registry):
"""Set device registry for IP to device_id lookup."""
self.device_registry = registry
@property
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
@property
def active_cameras(self) -> list:
"""Returns list of active camera device IDs."""
with self._cameras_lock:
return [cid for cid, info in self._active_cameras.items() if info.get("active", False)]
def _get_device_id_from_ip(self, ip: str) -> Optional[str]:
"""Look up device_id from IP address using device registry."""
# Check cache first
if ip in self._ip_to_device:
return self._ip_to_device[ip]
# Look up in device registry
if self.device_registry:
for device in self.device_registry.all():
if device.address and device.address[0] == ip:
self._ip_to_device[ip] = device.id
return device.id
return None
def start(self) -> bool:
if self.is_running:
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._receive_loop, daemon=True)
self._thread.start()
# Start timeout checker
self._timeout_thread = threading.Thread(target=self._timeout_checker, daemon=True)
self._timeout_thread.start()
return True
def stop(self):
self._stop_event.set()
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
for recorder in self._recorders.values():
if recorder.is_recording:
recorder.stop()
self._cleanup_frames()
self._active_cameras.clear()
self._assemblers.clear()
self._recorders.clear()
self._ip_to_device.clear()
self.frames_received = 0
self.packets_received = 0
def _cleanup_frames(self):
"""Remove all .jpg files from image directory."""
try:
for f in os.listdir(self.image_dir):
if f.endswith(".jpg"):
os.remove(os.path.join(self.image_dir, f))
except Exception:
pass
def _timeout_checker(self):
"""Check for camera timeouts and mark them as inactive."""
while not self._stop_event.is_set():
time.sleep(1)
now = time.time()
for camera_id, info in list(self._active_cameras.items()):
last_frame = info.get("last_frame", 0)
was_active = info.get("active", False)
if now - last_frame > CAMERA_TIMEOUT_SECONDS:
if was_active:
self._active_cameras[camera_id]["active"] = False
# Remove the frame file so frontend shows default image
self._remove_camera_frame(camera_id)
def _remove_camera_frame(self, camera_id: str):
"""Remove the frame file for a camera."""
try:
filepath = os.path.join(self.image_dir, f"{camera_id}.jpg")
if os.path.exists(filepath):
os.remove(filepath)
except Exception:
pass
def _get_assembler(self, addr: tuple) -> FrameAssembler:
key = f"{addr[0]}:{addr[1]}"
if key not in self._assemblers:
self._assemblers[key] = FrameAssembler()
return self._assemblers[key]
def _get_recorder(self, camera_id: str) -> CameraRecorder:
if camera_id not in self._recorders:
self._recorders[camera_id] = CameraRecorder(camera_id, self._recordings_dir)
return self._recorders[camera_id]
def _receive_loop(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self.host, self.port))
self._sock.settimeout(1.0)
print(f"[UDP] Receiver started on {self.host}:{self.port}")
while not self._stop_event.is_set():
try:
data, addr = self._sock.recvfrom(UDP_BUFFER_SIZE)
except socket.timeout:
continue
except OSError:
break
with self._stats_lock:
self.packets_received += 1
if not data.startswith(SECRET_TOKEN):
with self._stats_lock:
self.invalid_tokens += 1
continue
payload = data[len(SECRET_TOKEN):]
assembler = self._get_assembler(addr)
# Try to get device_id from IP, fallback to IP if not found
ip = addr[0]
device_id = self._get_device_id_from_ip(ip)
if not device_id:
# Fallback: use IP (without port to avoid duplicates)
device_id = ip.replace(".", "_")
if payload == b"START":
assembler.start_frame()
continue
elif payload == b"END":
frame_data = assembler.finish_frame()
if frame_data:
self._process_complete_frame(device_id, frame_data, addr)
continue
else:
if not assembler.receiving:
frame = self._decode_frame(payload)
if frame is not None:
self._process_frame(device_id, frame, addr)
else:
with self._stats_lock:
self.decode_errors += 1
else:
assembler.add_chunk(payload)
if self._sock:
self._sock.close()
self._sock = None
print("[UDP] Receiver stopped")
def _process_complete_frame(self, camera_id: str, frame_data: bytes, addr: tuple):
frame = self._decode_frame(frame_data)
if frame is None:
with self._stats_lock:
self.decode_errors += 1
return
self._process_frame(camera_id, frame, addr)
def _process_frame(self, camera_id: str, frame: np.ndarray, addr: tuple):
with self._stats_lock:
self.frames_received += 1
# Update camera tracking
with self._cameras_lock:
self._active_cameras[camera_id] = {
"last_frame": time.time(),
"active": True,
"addr": addr
}
# Save frame
self._save_frame(camera_id, frame)
# Record if recording is active for this camera
recorder = self._get_recorder(camera_id)
if recorder.is_recording:
recorder.write_frame(frame)
if self.on_frame:
self.on_frame(camera_id, frame, addr)
def _decode_frame(self, data: bytes) -> Optional[np.ndarray]:
try:
npdata = np.frombuffer(data, np.uint8)
frame = cv2.imdecode(npdata, cv2.IMREAD_COLOR)
return frame
except Exception:
return None
def _save_frame(self, camera_id: str, frame: np.ndarray):
try:
filepath = os.path.join(self.image_dir, f"{camera_id}.jpg")
cv2.imwrite(filepath, frame)
except Exception:
pass
# === Recording API ===
def start_recording(self, camera_id: str) -> dict:
if camera_id not in self._active_cameras or not self._active_cameras[camera_id].get("active"):
return {"error": f"Camera {camera_id} not active"}
recorder = self._get_recorder(camera_id)
if recorder.is_recording:
return {"error": "Already recording", "filename": recorder.filename}
filename = recorder.start()
return {"status": "recording", "filename": filename, "camera_id": camera_id}
def stop_recording(self, camera_id: str) -> dict:
if camera_id not in self._recorders:
return {"error": f"No recorder for {camera_id}"}
recorder = self._recorders[camera_id]
if not recorder.is_recording:
return {"error": "Not recording"}
result = recorder.stop()
result["camera_id"] = camera_id
result["path"] = os.path.join(self._recordings_dir, result["filename"])
return result
def get_recording_status(self, camera_id: str = None) -> dict:
if camera_id:
if camera_id not in self._recorders:
return {"camera_id": camera_id, "recording": False}
recorder = self._recorders[camera_id]
return {
"camera_id": camera_id,
"recording": recorder.is_recording,
"filename": recorder.filename,
"duration": recorder.duration,
"frames": recorder.frame_count
}
result = {}
for cid, info in self._active_cameras.items():
if info.get("active"):
recorder = self._get_recorder(cid)
result[cid] = {
"recording": recorder.is_recording,
"filename": recorder.filename if recorder.is_recording else None,
"duration": recorder.duration if recorder.is_recording else 0
}
return result
def list_recordings(self) -> list:
try:
files = []
for f in os.listdir(self._recordings_dir):
if f.endswith(".avi"):
path = os.path.join(self._recordings_dir, f)
stat = os.stat(path)
files.append({
"filename": f,
"size": stat.st_size,
"created": stat.st_mtime
})
return sorted(files, key=lambda x: x["created"], reverse=True)
except Exception:
return []
def get_stats(self) -> dict:
recording_count = sum(1 for r in self._recorders.values() if r.is_recording)
with self._cameras_lock:
active_count = sum(1 for info in self._active_cameras.values() if info.get("active"))
with self._stats_lock:
return {
"running": self.is_running,
"packets_received": self.packets_received,
"frames_received": self.frames_received,
"invalid_tokens": self.invalid_tokens,
"decode_errors": self.decode_errors,
"active_cameras": active_count,
"active_recordings": recording_count
}