""" Honeypot Commander — builds C2 commands and tracks responses. Uses unified command dispatch: instead of individual commands per service (hp_ssh_start, hp_telnet_stop, ...), sends a single `hp_svc` command with service name and action as arguments. This fits within the ESP32's 32-command budget. Manages command dispatch to honeypot ESP32 devices via the C3PO transport layer, tracks pending/completed commands, and caches service state from status responses. """ import time import threading from dataclasses import dataclass, field from collections import deque from typing import Optional, Callable # ============================================================ # Service and monitor definitions # ============================================================ HP_SERVICES = { "ssh": {"port": 22}, "telnet": {"port": 23}, "http": {"port": 80}, "ftp": {"port": 21}, } HP_MONITORS = { "wifi": {}, "net": {}, } SYSTEM_COMMANDS = ["system_info", "system_reboot", "system_mem", "system_uptime", "system_health"] OTA_COMMANDS = ["ota_update", "ota_status", "ota_rollback"] CONFIG_COMMANDS = ["hp_config_set", "hp_config_get", "hp_config_list", "hp_config_reset"] # Configurable banner services and threshold keys CONFIG_BANNER_SERVICES = ["ssh", "telnet", "ftp", "http"] CONFIG_THRESHOLDS = { "portscan": {"label": "Port Scan", "default": 5, "min": 1, "max": 50}, "synflood": {"label": "SYN Flood", "default": 50, "min": 10, "max": 500}, "icmp": {"label": "ICMP Sweep", "default": 10, "min": 1, "max": 100}, "udpflood": {"label": "UDP Flood", "default": 100, "min": 10, "max": 1000}, "arpflood": {"label": "ARP Flood", "default": 50, "min": 10, "max": 500}, "tarpit_ms": {"label": "Tarpit (ms)", "default": 2000, "min": 500, "max": 10000}, } # Unified command names (sent to ESP32) ALLOWED_COMMANDS = { "hp_svc", # hp_svc "hp_wifi", # hp_wifi "hp_net", # hp_net "hp_config_set", # hp_config_set "hp_config_get", # hp_config_get "hp_config_list", # hp_config_list [type] "hp_config_reset", # hp_config_reset "hp_status", # hp_status "hp_start_all", # hp_start_all — start all services + monitors "hp_stop_all", # hp_stop_all — stop all services + monitors } ALLOWED_COMMANDS.update(SYSTEM_COMMANDS) ALLOWED_COMMANDS.update(OTA_COMMANDS) @dataclass class CommandRecord: """Tracks a sent command and its response.""" request_id: str device_id: str command_name: str argv: list sent_at: float status: str = "pending" response: str = "" completed_at: float = 0.0 class HpCommander: """ Manages command dispatch and response tracking for honeypot devices. Constructor receives deferred lambdas (same pattern as C3PO blueprints): get_transport: Callable -> Transport get_registry: Callable -> DeviceRegistry """ def __init__(self, get_transport: Callable, get_registry: Callable, max_history: int = 500): self._get_transport = get_transport self._get_registry = get_registry self.lock = threading.Lock() self.history: deque = deque(maxlen=max_history) self.pending: dict = {} # Cached service states per device: # { device_id: { service_name: { "running": bool, "detail": {...}, "updated_at": float } } } self.service_states: dict = {} self._req_counter = 0 def _next_request_id(self) -> str: self._req_counter += 1 return f"hp-{int(time.time())}-{self._req_counter}" def get_honeypot_devices(self) -> list: """Return all connected devices.""" registry = self._get_registry() if not registry: return [] devices = [] for d in registry.all(): devices.append({ "id": d.id, "ip": d.address[0] if d.address else "unknown", "status": d.status, "modules": getattr(d, "modules", ""), "last_seen": d.last_seen, }) return devices def send_command(self, device_id: str, command_name: str, argv: list = None) -> Optional[str]: """ Send a single command to a device. Returns request_id or None on failure. """ registry = self._get_registry() transport = self._get_transport() if not registry or not transport: return None device = registry.get(device_id) if not device or not getattr(device, "sock", None): return None # Lazy import — proto is available in C3PO process from proto.c2_pb2 import Command request_id = self._next_request_id() cmd = Command() cmd.device_id = device_id cmd.command_name = command_name cmd.request_id = request_id if argv: cmd.argv.extend(argv) record = CommandRecord( request_id=request_id, device_id=device_id, command_name=command_name, argv=argv or [], sent_at=time.time(), ) with self.lock: self.pending[request_id] = record self.history.append(record) transport.send_command(device.sock, cmd) return request_id def send_to_all_devices(self, command_name: str, argv: list = None) -> list: """Send a command to all connected devices. Returns list of request_ids.""" request_ids = [] for dev in self.get_honeypot_devices(): if dev["status"] == "Connected": rid = self.send_command(dev["id"], command_name, argv) if rid: request_ids.append(rid) return request_ids # ============================================================ # Unified service/monitor dispatch # ============================================================ def send_service_command(self, device_id: str, service: str, action: str) -> Optional[str]: """Send hp_svc to a device.""" return self.send_command(device_id, "hp_svc", [service, action]) def send_monitor_command(self, device_id: str, monitor: str, action: str) -> Optional[str]: """Send hp_wifi/hp_net to a device.""" return self.send_command(device_id, f"hp_{monitor}", [action]) def send_service_to_all(self, service: str, action: str) -> list: """Send hp_svc to all devices.""" return self.send_to_all_devices("hp_svc", [service, action]) def send_monitor_to_all(self, monitor: str, action: str) -> list: """Send hp_wifi/hp_net to all devices.""" return self.send_to_all_devices(f"hp_{monitor}", [action]) # ============================================================ # Response handling # ============================================================ def handle_response(self, request_id: str, device_id: str, payload: str, eof: bool): """Called by transport layer when a command response arrives.""" with self.lock: rec = self.pending.get(request_id) if not rec: return rec.response += payload if eof: rec.status = "completed" rec.completed_at = time.time() del self.pending[request_id] self._parse_status_response(rec) def _parse_status_response(self, rec: CommandRecord): """Parse 'running=yes connections=5' style status responses.""" state = {} for part in rec.response.split(): if "=" in part: k, v = part.split("=", 1) state[k] = v if not state: return # Determine which service/monitor this belongs to service_name = None if rec.command_name == "hp_svc" and len(rec.argv) >= 2: if rec.argv[1] == "status": service_name = rec.argv[0] elif rec.command_name in ("hp_wifi", "hp_net"): if rec.argv and rec.argv[0] == "status": service_name = rec.command_name.replace("hp_", "") if service_name: if rec.device_id not in self.service_states: self.service_states[rec.device_id] = {} self.service_states[rec.device_id][service_name] = { "running": state.get("running") == "yes", "detail": state, "updated_at": time.time(), } def get_service_states(self, device_id: str = None) -> dict: """Get cached service states.""" with self.lock: if device_id: return dict(self.service_states.get(device_id, {})) return {k: dict(v) for k, v in self.service_states.items()} # ============================================================ # Runtime Config helpers # ============================================================ def config_set(self, device_id: str, cfg_type: str, key: str, value: str) -> Optional[str]: """Set a runtime config value (banner or threshold).""" return self.send_command(device_id, "hp_config_set", [cfg_type, key, str(value)]) def config_get(self, device_id: str, cfg_type: str, key: str) -> Optional[str]: """Get a single config value.""" return self.send_command(device_id, "hp_config_get", [cfg_type, key]) def config_list(self, device_id: str, cfg_type: str = "") -> Optional[str]: """List all config values (optionally filtered by type).""" args = [cfg_type] if cfg_type else [] return self.send_command(device_id, "hp_config_list", args) def config_reset(self, device_id: str) -> Optional[str]: """Reset all runtime config to compile-time defaults.""" return self.send_command(device_id, "hp_config_reset") def get_command_history(self, limit: int = 100) -> list: """Get recent command history (most recent first).""" with self.lock: records = list(self.history) records.reverse() return [ { "request_id": r.request_id, "device_id": r.device_id, "command_name": r.command_name, "argv": r.argv, "sent_at": r.sent_at, "status": r.status, "response": r.response, "completed_at": r.completed_at, "duration": round(r.completed_at - r.sent_at, 3) if r.completed_at else 0, } for r in records[:limit] ]