espilon-source/tools/C3PO/core/transport.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

262 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from core.crypto import CryptoContext
from core.device import Device
from core.keystore import KeyStore
from core.registry import DeviceRegistry
from log.manager import LogManager
from utils.display import Display
from proto.c2_pb2 import Command, AgentMessage, AgentMsgType
# Forward declaration for type hinting to avoid circular import
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.session import Session
class Transport:
def __init__(self, registry: DeviceRegistry, logger: LogManager,
keystore: KeyStore, session: 'Session' = None):
self.registry = registry
self.logger = logger
self.keystore = keystore
self.session = session
self.command_responses = {}
self.hp_store = None
self.hp_commander = None
# Cache of CryptoContext per device_id (HKDF derivation is expensive)
self._crypto_cache: dict[str, CryptoContext] = {}
def set_session(self, session: 'Session'):
self.session = session
def _get_crypto(self, device_id: str) -> CryptoContext | None:
"""Get or create a CryptoContext for the given device."""
if device_id in self._crypto_cache:
return self._crypto_cache[device_id]
master_key = self.keystore.get(device_id)
if master_key is None:
return None
ctx = CryptoContext(master_key, device_id)
self._crypto_cache[device_id] = ctx
return ctx
# ==================================================
# RX (ESP → C2)
# ==================================================
def handle_incoming(self, sock, addr, raw_data: bytes):
"""
raw_data = device_id:BASE64( nonce[12] || ChaCha20-Poly1305( Protobuf ) || tag[16] )
"""
# 1) Parse device_id prefix
raw_str = raw_data
if b":" not in raw_str:
Display.error(f"No device_id prefix in message from {addr}")
return
device_id_bytes, b64_payload = raw_str.split(b":", 1)
device_id = device_id_bytes.decode(errors="ignore").strip()
if not device_id:
Display.error(f"Empty device_id from {addr}")
return
# 2) Lookup crypto key for this device
crypto = self._get_crypto(device_id)
if crypto is None:
Display.error(f"Unknown device '{device_id}' from {addr} no key in keystore")
return
# 3) Base64 decode
try:
encrypted = crypto.b64_decode(b64_payload)
except Exception as e:
Display.error(f"Base64 decode failed from {device_id}@{addr}: {e}")
return
# 4) Decrypt + verify (AEAD)
try:
protobuf_bytes = crypto.decrypt(encrypted)
except Exception as e:
Display.error(f"Decrypt/auth failed from {device_id}@{addr}: {e}")
return
# 5) Protobuf decode → AgentMessage
try:
msg = AgentMessage.FromString(protobuf_bytes)
except Exception as e:
Display.error(f"Protobuf decode failed from {device_id}@{addr}: {e}")
return
if not msg.device_id:
msg.device_id = device_id
self._dispatch(sock, addr, msg)
# ==================================================
# DISPATCH
# ==================================================
def _dispatch(self, sock, addr, msg: AgentMessage):
device = self.registry.get(msg.device_id)
is_new_device = False
if not device:
device = Device(
id=msg.device_id,
sock=sock,
address=addr
)
self.registry.add(device)
Display.device_event(device.id, f"Connected from {addr[0]}")
is_new_device = True
else:
# Device reconnected with new socket - update connection info
if device.sock != sock:
try:
device.sock.close()
except Exception:
pass
device.sock = sock
device.address = addr
Display.device_event(device.id, f"Reconnected from {addr[0]}:{addr[1]}")
device.touch()
self._handle_agent_message(device, msg)
# Auto-query system_info on new device connection
if is_new_device:
self._auto_query_system_info(device)
def probe_device(self, device: Device):
"""Send a system_info probe to check if the device is alive."""
cmd = Command()
cmd.device_id = device.id
cmd.command_name = "system_info"
cmd.request_id = f"probe-{device.id}"
self.send_command(device.sock, cmd, device.id)
def _auto_query_system_info(self, device: Device):
"""Send system_info command automatically when device connects."""
try:
cmd = Command()
cmd.device_id = device.id
cmd.command_name = "system_info"
cmd.request_id = f"auto-sysinfo-{device.id}"
self.send_command(device.sock, cmd, device.id)
except Exception as e:
Display.error(f"Auto system_info failed for {device.id}: {e}")
def _parse_system_info(self, device: Device, payload: str, silent: bool = False):
"""Parse system_info response and update device info."""
# Format: chip=esp32 cores=2 flash=external heap=4310096 uptime=7s modules=network,fakeap
try:
for part in payload.split():
if "=" in part:
key, value = part.split("=", 1)
if key == "chip":
device.chip = value
elif key == "modules":
device.modules = value
if not silent:
# Notify TUI about device info update
Display.device_event(device.id, f"INFO: {payload}")
# Send special message to update TUI title
from tui.bridge import tui_bridge, TUIMessage, MessageType
tui_bridge.post_message(TUIMessage(
msg_type=MessageType.DEVICE_INFO_UPDATED,
device_id=device.id,
payload=device.modules
))
except Exception as e:
Display.error(f"Failed to parse system_info: {e}")
# ==================================================
# AGENT MESSAGE HANDLER
# ==================================================
def _handle_agent_message(self, device: Device, msg: AgentMessage):
payload_str = ""
if msg.payload:
try:
payload_str = msg.payload.decode(errors="ignore")
except Exception:
payload_str = repr(msg.payload)
# --- Route request_id-bearing responses to the right handler ---
req_id = msg.request_id or ""
is_probe = req_id.startswith("probe-")
is_auto = req_id.startswith("auto-sysinfo-")
is_hp = req_id.startswith("hp-")
is_user_cmd = bool(req_id) and not is_probe and not is_auto and not is_hp
# --- Type-specific handling (display, parsing, etc.) ---
if msg.type == AgentMsgType.AGENT_CMD_RESULT:
if is_probe:
self._parse_system_info(device, payload_str, silent=True)
return
if is_auto:
self._parse_system_info(device, payload_str)
elif msg.type == AgentMsgType.AGENT_INFO:
if "chip=" in payload_str and "modules=" in payload_str:
self._parse_system_info(device, payload_str)
elif payload_str.startswith("MLAT:") and self.session:
mlat_data = payload_str[5:]
if self.session.mlat_engine.parse_mlat_message(device.id, mlat_data):
state = self.session.mlat_engine.get_state()
if state["scanners_count"] >= 3:
self.session.mlat_engine.calculate_position()
else:
Display.device_event(device.id, f"MLAT: Invalid data format: {mlat_data}")
elif not is_user_cmd:
Display.device_event(device.id, f"INFO: {payload_str}")
elif msg.type == AgentMsgType.AGENT_ERROR:
if not is_user_cmd:
Display.device_event(device.id, f"ERROR: {payload_str}")
elif msg.type == AgentMsgType.AGENT_LOG:
Display.device_event(device.id, f"LOG: {payload_str}")
elif msg.type == AgentMsgType.AGENT_DATA:
if payload_str.startswith("EVT|") and self.hp_store:
self.hp_store.parse_and_store(device.id, payload_str)
if payload_str.startswith("CAN|") and self.session and hasattr(self.session, 'can_store') and self.session.can_store:
self.session.can_store.store_frame(device.id, payload_str)
if not is_user_cmd:
Display.device_event(device.id, f"DATA: {payload_str}")
else:
Display.device_event(device.id, f"UNKNOWN Message Type ({AgentMsgType.Name(msg.type)}): {payload_str}")
# --- Forward to command originator (session or hp_commander) ---
if is_hp and self.hp_commander:
self.hp_commander.handle_response(req_id, device.id, payload_str, msg.eof)
elif is_user_cmd and self.session:
self.session.handle_command_response(req_id, device.id, payload_str, msg.eof)
# ==================================================
# TX (C2 → ESP)
# ==================================================
def send_command(self, sock, cmd: Command, device_id: str = None):
"""
Command → Protobuf → ChaCha20-Poly1305 → Base64 → \\n
"""
target_id = device_id or cmd.device_id
crypto = self._get_crypto(target_id)
if crypto is None:
Display.error(f"Cannot send to '{target_id}' no key in keystore")
return
try:
proto = cmd.SerializeToString()
# Encrypt (AEAD)
encrypted = crypto.encrypt(proto)
# Base64
b64 = crypto.b64_encode(encrypted)
sock.sendall(b64 + b"\n")
except Exception as e:
Display.error(f"Failed to send command to {target_id}: {e}")