"""UDP server for receiving camera frames from ESP devices. Protocol from ESP32: - TOKEN + "START" -> Start of new frame - TOKEN + chunk -> JPEG data chunk - TOKEN + "END" -> End of frame, decode and process """ import os import socket import threading import time import cv2 import numpy as np from datetime import datetime from typing import Optional, Callable, Dict from .config import ( UDP_HOST, UDP_PORT, UDP_BUFFER_SIZE, SECRET_TOKEN, IMAGE_DIR, VIDEO_FPS, VIDEO_CODEC ) # Camera timeout - mark as inactive after this many seconds without frames CAMERA_TIMEOUT_SECONDS = 5 class FrameAssembler: """Assembles JPEG frames from multiple UDP packets.""" def __init__(self, timeout: float = 5.0): self.timeout = timeout self.buffer = bytearray() self.start_time: Optional[float] = None self.receiving = False def start_frame(self): self.buffer = bytearray() self.start_time = time.time() self.receiving = True def add_chunk(self, data: bytes) -> bool: if not self.receiving: return False if self.start_time and (time.time() - self.start_time) > self.timeout: self.reset() return False self.buffer.extend(data) return True def finish_frame(self) -> Optional[bytes]: if not self.receiving or len(self.buffer) == 0: return None data = bytes(self.buffer) self.reset() return data def reset(self): self.buffer = bytearray() self.start_time = None self.receiving = False class CameraRecorder: """Handles video recording for a single camera.""" def __init__(self, camera_id: str, output_dir: str): self.camera_id = camera_id self.output_dir = output_dir self._writer: Optional[cv2.VideoWriter] = None self._video_size: Optional[tuple] = None self._recording = False self._filename: Optional[str] = None self._frame_count = 0 self._start_time: Optional[float] = None @property def is_recording(self) -> bool: return self._recording @property def filename(self) -> Optional[str]: return self._filename @property def duration(self) -> float: if self._start_time: return time.time() - self._start_time return 0 @property def frame_count(self) -> int: return self._frame_count def start(self) -> str: if self._recording: return self._filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_id = self.camera_id.replace(":", "_").replace(".", "_") self._filename = f"recording_{safe_id}_{timestamp}.avi" self._recording = True self._frame_count = 0 self._start_time = time.time() return self._filename def stop(self) -> dict: if not self._recording: return {"error": "Not recording"} self._recording = False result = { "filename": self._filename, "frames": self._frame_count, "duration": self.duration } if self._writer: self._writer.release() self._writer = None self._video_size = None return result def write_frame(self, frame: np.ndarray): if not self._recording: return if self._writer is None: self._video_size = (frame.shape[1], frame.shape[0]) fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC) video_path = os.path.join(self.output_dir, self._filename) self._writer = cv2.VideoWriter( video_path, fourcc, VIDEO_FPS, self._video_size ) if self._writer and self._writer.isOpened(): self._writer.write(frame) self._frame_count += 1 class UDPReceiver: """Receives JPEG frames via UDP from ESP camera devices.""" def __init__(self, host: str = UDP_HOST, port: int = UDP_PORT, image_dir: str = IMAGE_DIR, on_frame: Optional[Callable] = None, device_registry=None): self.host = host self.port = port self.image_dir = image_dir self.on_frame = on_frame self.device_registry = device_registry self._sock: Optional[socket.socket] = None self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # Frame assemblers per source address self._assemblers: Dict[str, FrameAssembler] = {} # Per-camera recorders (keyed by device_id) self._recorders: Dict[str, CameraRecorder] = {} self._recordings_dir = os.path.join(os.path.dirname(image_dir), "recordings") # IP to device_id mapping cache self._ip_to_device: Dict[str, str] = {} # Statistics (protected by _stats_lock) self._stats_lock = threading.Lock() self.frames_received = 0 self.invalid_tokens = 0 self.decode_errors = 0 self.packets_received = 0 # Active cameras tracking (protected by _cameras_lock) self._cameras_lock = threading.Lock() self._active_cameras: Dict[str, dict] = {} os.makedirs(self.image_dir, exist_ok=True) os.makedirs(self._recordings_dir, exist_ok=True) def set_device_registry(self, registry): """Set device registry for IP to device_id lookup.""" self.device_registry = registry @property def is_running(self) -> bool: return self._thread is not None and self._thread.is_alive() @property def active_cameras(self) -> list: """Returns list of active camera device IDs.""" with self._cameras_lock: return [cid for cid, info in self._active_cameras.items() if info.get("active", False)] def _get_device_id_from_ip(self, ip: str) -> Optional[str]: """Look up device_id from IP address using device registry.""" # Check cache first if ip in self._ip_to_device: return self._ip_to_device[ip] # Look up in device registry if self.device_registry: for device in self.device_registry.all(): if device.address and device.address[0] == ip: self._ip_to_device[ip] = device.id return device.id return None def start(self) -> bool: if self.is_running: return False self._stop_event.clear() self._thread = threading.Thread(target=self._receive_loop, daemon=True) self._thread.start() # Start timeout checker self._timeout_thread = threading.Thread(target=self._timeout_checker, daemon=True) self._timeout_thread.start() return True def stop(self): self._stop_event.set() if self._sock: try: self._sock.close() except Exception: pass self._sock = None for recorder in self._recorders.values(): if recorder.is_recording: recorder.stop() self._cleanup_frames() self._active_cameras.clear() self._assemblers.clear() self._recorders.clear() self._ip_to_device.clear() self.frames_received = 0 self.packets_received = 0 def _cleanup_frames(self): """Remove all .jpg files from image directory.""" try: for f in os.listdir(self.image_dir): if f.endswith(".jpg"): os.remove(os.path.join(self.image_dir, f)) except Exception: pass def _timeout_checker(self): """Check for camera timeouts and mark them as inactive.""" while not self._stop_event.is_set(): time.sleep(1) now = time.time() for camera_id, info in list(self._active_cameras.items()): last_frame = info.get("last_frame", 0) was_active = info.get("active", False) if now - last_frame > CAMERA_TIMEOUT_SECONDS: if was_active: self._active_cameras[camera_id]["active"] = False # Remove the frame file so frontend shows default image self._remove_camera_frame(camera_id) def _remove_camera_frame(self, camera_id: str): """Remove the frame file for a camera.""" try: filepath = os.path.join(self.image_dir, f"{camera_id}.jpg") if os.path.exists(filepath): os.remove(filepath) except Exception: pass def _get_assembler(self, addr: tuple) -> FrameAssembler: key = f"{addr[0]}:{addr[1]}" if key not in self._assemblers: self._assemblers[key] = FrameAssembler() return self._assemblers[key] def _get_recorder(self, camera_id: str) -> CameraRecorder: if camera_id not in self._recorders: self._recorders[camera_id] = CameraRecorder(camera_id, self._recordings_dir) return self._recorders[camera_id] def _receive_loop(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind((self.host, self.port)) self._sock.settimeout(1.0) print(f"[UDP] Receiver started on {self.host}:{self.port}") while not self._stop_event.is_set(): try: data, addr = self._sock.recvfrom(UDP_BUFFER_SIZE) except socket.timeout: continue except OSError: break with self._stats_lock: self.packets_received += 1 if not data.startswith(SECRET_TOKEN): with self._stats_lock: self.invalid_tokens += 1 continue payload = data[len(SECRET_TOKEN):] assembler = self._get_assembler(addr) # Try to get device_id from IP, fallback to IP if not found ip = addr[0] device_id = self._get_device_id_from_ip(ip) if not device_id: # Fallback: use IP (without port to avoid duplicates) device_id = ip.replace(".", "_") if payload == b"START": assembler.start_frame() continue elif payload == b"END": frame_data = assembler.finish_frame() if frame_data: self._process_complete_frame(device_id, frame_data, addr) continue else: if not assembler.receiving: frame = self._decode_frame(payload) if frame is not None: self._process_frame(device_id, frame, addr) else: with self._stats_lock: self.decode_errors += 1 else: assembler.add_chunk(payload) if self._sock: self._sock.close() self._sock = None print("[UDP] Receiver stopped") def _process_complete_frame(self, camera_id: str, frame_data: bytes, addr: tuple): frame = self._decode_frame(frame_data) if frame is None: with self._stats_lock: self.decode_errors += 1 return self._process_frame(camera_id, frame, addr) def _process_frame(self, camera_id: str, frame: np.ndarray, addr: tuple): with self._stats_lock: self.frames_received += 1 # Update camera tracking with self._cameras_lock: self._active_cameras[camera_id] = { "last_frame": time.time(), "active": True, "addr": addr } # Save frame self._save_frame(camera_id, frame) # Record if recording is active for this camera recorder = self._get_recorder(camera_id) if recorder.is_recording: recorder.write_frame(frame) if self.on_frame: self.on_frame(camera_id, frame, addr) def _decode_frame(self, data: bytes) -> Optional[np.ndarray]: try: npdata = np.frombuffer(data, np.uint8) frame = cv2.imdecode(npdata, cv2.IMREAD_COLOR) return frame except Exception: return None def _save_frame(self, camera_id: str, frame: np.ndarray): try: filepath = os.path.join(self.image_dir, f"{camera_id}.jpg") cv2.imwrite(filepath, frame) except Exception: pass # === Recording API === def start_recording(self, camera_id: str) -> dict: if camera_id not in self._active_cameras or not self._active_cameras[camera_id].get("active"): return {"error": f"Camera {camera_id} not active"} recorder = self._get_recorder(camera_id) if recorder.is_recording: return {"error": "Already recording", "filename": recorder.filename} filename = recorder.start() return {"status": "recording", "filename": filename, "camera_id": camera_id} def stop_recording(self, camera_id: str) -> dict: if camera_id not in self._recorders: return {"error": f"No recorder for {camera_id}"} recorder = self._recorders[camera_id] if not recorder.is_recording: return {"error": "Not recording"} result = recorder.stop() result["camera_id"] = camera_id result["path"] = os.path.join(self._recordings_dir, result["filename"]) return result def get_recording_status(self, camera_id: str = None) -> dict: if camera_id: if camera_id not in self._recorders: return {"camera_id": camera_id, "recording": False} recorder = self._recorders[camera_id] return { "camera_id": camera_id, "recording": recorder.is_recording, "filename": recorder.filename, "duration": recorder.duration, "frames": recorder.frame_count } result = {} for cid, info in self._active_cameras.items(): if info.get("active"): recorder = self._get_recorder(cid) result[cid] = { "recording": recorder.is_recording, "filename": recorder.filename if recorder.is_recording else None, "duration": recorder.duration if recorder.is_recording else 0 } return result def list_recordings(self) -> list: try: files = [] for f in os.listdir(self._recordings_dir): if f.endswith(".avi"): path = os.path.join(self._recordings_dir, f) stat = os.stat(path) files.append({ "filename": f, "size": stat.st_size, "created": stat.st_mtime }) return sorted(files, key=lambda x: x["created"], reverse=True) except Exception: return [] def get_stats(self) -> dict: recording_count = sum(1 for r in self._recorders.values() if r.is_recording) with self._cameras_lock: active_count = sum(1 for info in self._active_cameras.values() if info.get("active")) with self._stats_lock: return { "running": self.is_running, "packets_received": self.packets_received, "frames_received": self.frames_received, "invalid_tokens": self.invalid_tokens, "decode_errors": self.decode_errors, "active_cameras": active_count, "active_recordings": recording_count }