espilon-source/tools/C3PO/core/session.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

83 lines
2.8 KiB
Python

import time
import threading
from typing import Optional
from utils.display import Display
from core.transport import Transport
from core.registry import DeviceRegistry
from core.groups import GroupRegistry
from commands.registry import CommandRegistry
from web.mlat import MlatEngine
from core.can_store import CanStore
COMMAND_TIMEOUT_S = 120 # 2 minutes
class Session:
"""Central runtime state for C3PO. No UI logic."""
def __init__(self, registry: DeviceRegistry, commands: CommandRegistry,
groups: GroupRegistry, transport: Transport):
self.registry = registry
self.commands = commands
self.groups = groups
self.transport = transport
# Active command tracking
self.active_commands: dict = {}
# Service instances
self.web_server = None
self.udp_receiver = None
self.mlat_engine = MlatEngine()
# CAN bus frame storage
self.can_store = CanStore()
# Honeypot dashboard components (created on web start)
self.hp_store = None
self.hp_commander = None
self.hp_alerts = None
self.hp_geo = None
# Stale command cleanup
self._cleanup_timer = threading.Thread(
target=self._cleanup_stale_commands, daemon=True)
self._cleanup_timer.start()
def _cleanup_stale_commands(self):
"""Periodically remove commands that never received an EOF."""
while True:
time.sleep(30)
now = time.time()
stale = [
rid for rid, info in list(self.active_commands.items())
if now - info.get("start_time", now) > COMMAND_TIMEOUT_S
]
for rid in stale:
info = self.active_commands.pop(rid, None)
if info:
Display.device_event(
info.get("device_id", "?"),
f"Command '{info.get('command_name', '?')}' timed out ({rid})"
)
# --- Callback for Transport ---
def handle_command_response(self, request_id: str, device_id: str,
payload: str, eof: bool):
"""Called by Transport when a command response arrives."""
if request_id in self.active_commands:
command_info = self.active_commands[request_id]
command_info["output"].append(payload)
if eof:
command_info["status"] = "completed"
Display.command_response(
request_id, device_id,
f"Command completed in {Display.format_duration(time.time() - command_info['start_time'])}"
)
del self.active_commands[request_id]
else:
Display.device_event(device_id,
f"Received response for unknown command {request_id}: {payload}")