import time import threading from typing import Optional from utils.display import Display from core.transport import Transport from core.registry import DeviceRegistry from core.groups import GroupRegistry from commands.registry import CommandRegistry from web.mlat import MlatEngine from core.can_store import CanStore COMMAND_TIMEOUT_S = 120 # 2 minutes class Session: """Central runtime state for C3PO. No UI logic.""" def __init__(self, registry: DeviceRegistry, commands: CommandRegistry, groups: GroupRegistry, transport: Transport): self.registry = registry self.commands = commands self.groups = groups self.transport = transport # Active command tracking self.active_commands: dict = {} # Service instances self.web_server = None self.udp_receiver = None self.mlat_engine = MlatEngine() # CAN bus frame storage self.can_store = CanStore() # Honeypot dashboard components (created on web start) self.hp_store = None self.hp_commander = None self.hp_alerts = None self.hp_geo = None # Stale command cleanup self._cleanup_timer = threading.Thread( target=self._cleanup_stale_commands, daemon=True) self._cleanup_timer.start() def _cleanup_stale_commands(self): """Periodically remove commands that never received an EOF.""" while True: time.sleep(30) now = time.time() stale = [ rid for rid, info in list(self.active_commands.items()) if now - info.get("start_time", now) > COMMAND_TIMEOUT_S ] for rid in stale: info = self.active_commands.pop(rid, None) if info: Display.device_event( info.get("device_id", "?"), f"Command '{info.get('command_name', '?')}' timed out ({rid})" ) # --- Callback for Transport --- def handle_command_response(self, request_id: str, device_id: str, payload: str, eof: bool): """Called by Transport when a command response arrives.""" if request_id in self.active_commands: command_info = self.active_commands[request_id] command_info["output"].append(payload) if eof: command_info["status"] = "completed" Display.command_response( request_id, device_id, f"Command completed in {Display.format_duration(time.time() - command_info['start_time'])}" ) del self.active_commands[request_id] else: Display.device_event(device_id, f"Received response for unknown command {request_id}: {payload}")