from core.crypto import CryptoContext from core.device import Device from core.keystore import KeyStore from core.registry import DeviceRegistry from log.manager import LogManager from utils.display import Display from proto.c2_pb2 import Command, AgentMessage, AgentMsgType # Forward declaration for type hinting to avoid circular import from typing import TYPE_CHECKING if TYPE_CHECKING: from core.session import Session class Transport: def __init__(self, registry: DeviceRegistry, logger: LogManager, keystore: KeyStore, session: 'Session' = None): self.registry = registry self.logger = logger self.keystore = keystore self.session = session self.command_responses = {} self.hp_store = None self.hp_commander = None # Cache of CryptoContext per device_id (HKDF derivation is expensive) self._crypto_cache: dict[str, CryptoContext] = {} def set_session(self, session: 'Session'): self.session = session def _get_crypto(self, device_id: str) -> CryptoContext | None: """Get or create a CryptoContext for the given device.""" if device_id in self._crypto_cache: return self._crypto_cache[device_id] master_key = self.keystore.get(device_id) if master_key is None: return None ctx = CryptoContext(master_key, device_id) self._crypto_cache[device_id] = ctx return ctx # ================================================== # RX (ESP → C2) # ================================================== def handle_incoming(self, sock, addr, raw_data: bytes): """ raw_data = device_id:BASE64( nonce[12] || ChaCha20-Poly1305( Protobuf ) || tag[16] ) """ # 1) Parse device_id prefix raw_str = raw_data if b":" not in raw_str: Display.error(f"No device_id prefix in message from {addr}") return device_id_bytes, b64_payload = raw_str.split(b":", 1) device_id = device_id_bytes.decode(errors="ignore").strip() if not device_id: Display.error(f"Empty device_id from {addr}") return # 2) Lookup crypto key for this device crypto = self._get_crypto(device_id) if crypto is None: Display.error(f"Unknown device '{device_id}' from {addr} – no key in keystore") return # 3) Base64 decode try: encrypted = crypto.b64_decode(b64_payload) except Exception as e: Display.error(f"Base64 decode failed from {device_id}@{addr}: {e}") return # 4) Decrypt + verify (AEAD) try: protobuf_bytes = crypto.decrypt(encrypted) except Exception as e: Display.error(f"Decrypt/auth failed from {device_id}@{addr}: {e}") return # 5) Protobuf decode → AgentMessage try: msg = AgentMessage.FromString(protobuf_bytes) except Exception as e: Display.error(f"Protobuf decode failed from {device_id}@{addr}: {e}") return if not msg.device_id: msg.device_id = device_id self._dispatch(sock, addr, msg) # ================================================== # DISPATCH # ================================================== def _dispatch(self, sock, addr, msg: AgentMessage): device = self.registry.get(msg.device_id) is_new_device = False if not device: device = Device( id=msg.device_id, sock=sock, address=addr ) self.registry.add(device) Display.device_event(device.id, f"Connected from {addr[0]}") is_new_device = True else: # Device reconnected with new socket - update connection info if device.sock != sock: try: device.sock.close() except Exception: pass device.sock = sock device.address = addr Display.device_event(device.id, f"Reconnected from {addr[0]}:{addr[1]}") device.touch() self._handle_agent_message(device, msg) # Auto-query system_info on new device connection if is_new_device: self._auto_query_system_info(device) def probe_device(self, device: Device): """Send a system_info probe to check if the device is alive.""" cmd = Command() cmd.device_id = device.id cmd.command_name = "system_info" cmd.request_id = f"probe-{device.id}" self.send_command(device.sock, cmd, device.id) def _auto_query_system_info(self, device: Device): """Send system_info command automatically when device connects.""" try: cmd = Command() cmd.device_id = device.id cmd.command_name = "system_info" cmd.request_id = f"auto-sysinfo-{device.id}" self.send_command(device.sock, cmd, device.id) except Exception as e: Display.error(f"Auto system_info failed for {device.id}: {e}") def _parse_system_info(self, device: Device, payload: str, silent: bool = False): """Parse system_info response and update device info.""" # Format: chip=esp32 cores=2 flash=external heap=4310096 uptime=7s modules=network,fakeap try: for part in payload.split(): if "=" in part: key, value = part.split("=", 1) if key == "chip": device.chip = value elif key == "modules": device.modules = value if not silent: # Notify TUI about device info update Display.device_event(device.id, f"INFO: {payload}") # Send special message to update TUI title from tui.bridge import tui_bridge, TUIMessage, MessageType tui_bridge.post_message(TUIMessage( msg_type=MessageType.DEVICE_INFO_UPDATED, device_id=device.id, payload=device.modules )) except Exception as e: Display.error(f"Failed to parse system_info: {e}") # ================================================== # AGENT MESSAGE HANDLER # ================================================== def _handle_agent_message(self, device: Device, msg: AgentMessage): payload_str = "" if msg.payload: try: payload_str = msg.payload.decode(errors="ignore") except Exception: payload_str = repr(msg.payload) # --- Route request_id-bearing responses to the right handler --- req_id = msg.request_id or "" is_probe = req_id.startswith("probe-") is_auto = req_id.startswith("auto-sysinfo-") is_hp = req_id.startswith("hp-") is_user_cmd = bool(req_id) and not is_probe and not is_auto and not is_hp # --- Type-specific handling (display, parsing, etc.) --- if msg.type == AgentMsgType.AGENT_CMD_RESULT: if is_probe: self._parse_system_info(device, payload_str, silent=True) return if is_auto: self._parse_system_info(device, payload_str) elif msg.type == AgentMsgType.AGENT_INFO: if "chip=" in payload_str and "modules=" in payload_str: self._parse_system_info(device, payload_str) elif payload_str.startswith("MLAT:") and self.session: mlat_data = payload_str[5:] if self.session.mlat_engine.parse_mlat_message(device.id, mlat_data): state = self.session.mlat_engine.get_state() if state["scanners_count"] >= 3: self.session.mlat_engine.calculate_position() else: Display.device_event(device.id, f"MLAT: Invalid data format: {mlat_data}") elif not is_user_cmd: Display.device_event(device.id, f"INFO: {payload_str}") elif msg.type == AgentMsgType.AGENT_ERROR: if not is_user_cmd: Display.device_event(device.id, f"ERROR: {payload_str}") elif msg.type == AgentMsgType.AGENT_LOG: Display.device_event(device.id, f"LOG: {payload_str}") elif msg.type == AgentMsgType.AGENT_DATA: if payload_str.startswith("EVT|") and self.hp_store: self.hp_store.parse_and_store(device.id, payload_str) if payload_str.startswith("CAN|") and self.session and hasattr(self.session, 'can_store') and self.session.can_store: self.session.can_store.store_frame(device.id, payload_str) if not is_user_cmd: Display.device_event(device.id, f"DATA: {payload_str}") else: Display.device_event(device.id, f"UNKNOWN Message Type ({AgentMsgType.Name(msg.type)}): {payload_str}") # --- Forward to command originator (session or hp_commander) --- if is_hp and self.hp_commander: self.hp_commander.handle_response(req_id, device.id, payload_str, msg.eof) elif is_user_cmd and self.session: self.session.handle_command_response(req_id, device.id, payload_str, msg.eof) # ================================================== # TX (C2 → ESP) # ================================================== def send_command(self, sock, cmd: Command, device_id: str = None): """ Command → Protobuf → ChaCha20-Poly1305 → Base64 → \\n """ target_id = device_id or cmd.device_id crypto = self._get_crypto(target_id) if crypto is None: Display.error(f"Cannot send to '{target_id}' – no key in keystore") return try: proto = cmd.SerializeToString() # Encrypt (AEAD) encrypted = crypto.encrypt(proto) # Base64 b64 = crypto.b64_encode(encrypted) sock.sendall(b64 + b"\n") except Exception as e: Display.error(f"Failed to send command to {target_id}: {e}")