espilon-source/tools/C3PO/hp_dashboard/hp_commander.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

300 lines
11 KiB
Python

"""
Honeypot Commander — builds C2 commands and tracks responses.
Uses unified command dispatch: instead of individual commands per service
(hp_ssh_start, hp_telnet_stop, ...), sends a single `hp_svc` command
with service name and action as arguments. This fits within the ESP32's
32-command budget.
Manages command dispatch to honeypot ESP32 devices via the C3PO
transport layer, tracks pending/completed commands, and caches
service state from status responses.
"""
import time
import threading
from dataclasses import dataclass, field
from collections import deque
from typing import Optional, Callable
# ============================================================
# Service and monitor definitions
# ============================================================
HP_SERVICES = {
"ssh": {"port": 22},
"telnet": {"port": 23},
"http": {"port": 80},
"ftp": {"port": 21},
}
HP_MONITORS = {
"wifi": {},
"net": {},
}
SYSTEM_COMMANDS = ["system_info", "system_reboot", "system_mem", "system_uptime", "system_health"]
OTA_COMMANDS = ["ota_update", "ota_status", "ota_rollback"]
CONFIG_COMMANDS = ["hp_config_set", "hp_config_get", "hp_config_list", "hp_config_reset"]
# Configurable banner services and threshold keys
CONFIG_BANNER_SERVICES = ["ssh", "telnet", "ftp", "http"]
CONFIG_THRESHOLDS = {
"portscan": {"label": "Port Scan", "default": 5, "min": 1, "max": 50},
"synflood": {"label": "SYN Flood", "default": 50, "min": 10, "max": 500},
"icmp": {"label": "ICMP Sweep", "default": 10, "min": 1, "max": 100},
"udpflood": {"label": "UDP Flood", "default": 100, "min": 10, "max": 1000},
"arpflood": {"label": "ARP Flood", "default": 50, "min": 10, "max": 500},
"tarpit_ms": {"label": "Tarpit (ms)", "default": 2000, "min": 500, "max": 10000},
}
# Unified command names (sent to ESP32)
ALLOWED_COMMANDS = {
"hp_svc", # hp_svc <service> <start|stop|status>
"hp_wifi", # hp_wifi <start|stop|status>
"hp_net", # hp_net <start|stop|status>
"hp_config_set", # hp_config_set <type> <key> <value>
"hp_config_get", # hp_config_get <type> <key>
"hp_config_list", # hp_config_list [type]
"hp_config_reset", # hp_config_reset
"hp_status", # hp_status
"hp_start_all", # hp_start_all — start all services + monitors
"hp_stop_all", # hp_stop_all — stop all services + monitors
}
ALLOWED_COMMANDS.update(SYSTEM_COMMANDS)
ALLOWED_COMMANDS.update(OTA_COMMANDS)
@dataclass
class CommandRecord:
"""Tracks a sent command and its response."""
request_id: str
device_id: str
command_name: str
argv: list
sent_at: float
status: str = "pending"
response: str = ""
completed_at: float = 0.0
class HpCommander:
"""
Manages command dispatch and response tracking for honeypot devices.
Constructor receives deferred lambdas (same pattern as C3PO blueprints):
get_transport: Callable -> Transport
get_registry: Callable -> DeviceRegistry
"""
def __init__(self, get_transport: Callable, get_registry: Callable,
max_history: int = 500):
self._get_transport = get_transport
self._get_registry = get_registry
self.lock = threading.Lock()
self.history: deque = deque(maxlen=max_history)
self.pending: dict = {}
# Cached service states per device:
# { device_id: { service_name: { "running": bool, "detail": {...}, "updated_at": float } } }
self.service_states: dict = {}
self._req_counter = 0
def _next_request_id(self) -> str:
self._req_counter += 1
return f"hp-{int(time.time())}-{self._req_counter}"
def get_honeypot_devices(self) -> list:
"""Return all connected devices."""
registry = self._get_registry()
if not registry:
return []
devices = []
for d in registry.all():
devices.append({
"id": d.id,
"ip": d.address[0] if d.address else "unknown",
"status": d.status,
"modules": getattr(d, "modules", ""),
"last_seen": d.last_seen,
})
return devices
def send_command(self, device_id: str, command_name: str,
argv: list = None) -> Optional[str]:
"""
Send a single command to a device.
Returns request_id or None on failure.
"""
registry = self._get_registry()
transport = self._get_transport()
if not registry or not transport:
return None
device = registry.get(device_id)
if not device or not getattr(device, "sock", None):
return None
# Lazy import — proto is available in C3PO process
from proto.c2_pb2 import Command
request_id = self._next_request_id()
cmd = Command()
cmd.device_id = device_id
cmd.command_name = command_name
cmd.request_id = request_id
if argv:
cmd.argv.extend(argv)
record = CommandRecord(
request_id=request_id,
device_id=device_id,
command_name=command_name,
argv=argv or [],
sent_at=time.time(),
)
with self.lock:
self.pending[request_id] = record
self.history.append(record)
transport.send_command(device.sock, cmd)
return request_id
def send_to_all_devices(self, command_name: str,
argv: list = None) -> list:
"""Send a command to all connected devices. Returns list of request_ids."""
request_ids = []
for dev in self.get_honeypot_devices():
if dev["status"] == "Connected":
rid = self.send_command(dev["id"], command_name, argv)
if rid:
request_ids.append(rid)
return request_ids
# ============================================================
# Unified service/monitor dispatch
# ============================================================
def send_service_command(self, device_id: str, service: str,
action: str) -> Optional[str]:
"""Send hp_svc <service> <action> to a device."""
return self.send_command(device_id, "hp_svc", [service, action])
def send_monitor_command(self, device_id: str, monitor: str,
action: str) -> Optional[str]:
"""Send hp_wifi/hp_net <action> to a device."""
return self.send_command(device_id, f"hp_{monitor}", [action])
def send_service_to_all(self, service: str, action: str) -> list:
"""Send hp_svc <service> <action> to all devices."""
return self.send_to_all_devices("hp_svc", [service, action])
def send_monitor_to_all(self, monitor: str, action: str) -> list:
"""Send hp_wifi/hp_net <action> to all devices."""
return self.send_to_all_devices(f"hp_{monitor}", [action])
# ============================================================
# Response handling
# ============================================================
def handle_response(self, request_id: str, device_id: str,
payload: str, eof: bool):
"""Called by transport layer when a command response arrives."""
with self.lock:
rec = self.pending.get(request_id)
if not rec:
return
rec.response += payload
if eof:
rec.status = "completed"
rec.completed_at = time.time()
del self.pending[request_id]
self._parse_status_response(rec)
def _parse_status_response(self, rec: CommandRecord):
"""Parse 'running=yes connections=5' style status responses."""
state = {}
for part in rec.response.split():
if "=" in part:
k, v = part.split("=", 1)
state[k] = v
if not state:
return
# Determine which service/monitor this belongs to
service_name = None
if rec.command_name == "hp_svc" and len(rec.argv) >= 2:
if rec.argv[1] == "status":
service_name = rec.argv[0]
elif rec.command_name in ("hp_wifi", "hp_net"):
if rec.argv and rec.argv[0] == "status":
service_name = rec.command_name.replace("hp_", "")
if service_name:
if rec.device_id not in self.service_states:
self.service_states[rec.device_id] = {}
self.service_states[rec.device_id][service_name] = {
"running": state.get("running") == "yes",
"detail": state,
"updated_at": time.time(),
}
def get_service_states(self, device_id: str = None) -> dict:
"""Get cached service states."""
with self.lock:
if device_id:
return dict(self.service_states.get(device_id, {}))
return {k: dict(v) for k, v in self.service_states.items()}
# ============================================================
# Runtime Config helpers
# ============================================================
def config_set(self, device_id: str, cfg_type: str, key: str,
value: str) -> Optional[str]:
"""Set a runtime config value (banner or threshold)."""
return self.send_command(device_id, "hp_config_set",
[cfg_type, key, str(value)])
def config_get(self, device_id: str, cfg_type: str,
key: str) -> Optional[str]:
"""Get a single config value."""
return self.send_command(device_id, "hp_config_get",
[cfg_type, key])
def config_list(self, device_id: str,
cfg_type: str = "") -> Optional[str]:
"""List all config values (optionally filtered by type)."""
args = [cfg_type] if cfg_type else []
return self.send_command(device_id, "hp_config_list", args)
def config_reset(self, device_id: str) -> Optional[str]:
"""Reset all runtime config to compile-time defaults."""
return self.send_command(device_id, "hp_config_reset")
def get_command_history(self, limit: int = 100) -> list:
"""Get recent command history (most recent first)."""
with self.lock:
records = list(self.history)
records.reverse()
return [
{
"request_id": r.request_id,
"device_id": r.device_id,
"command_name": r.command_name,
"argv": r.argv,
"sent_at": r.sent_at,
"status": r.status,
"response": r.response,
"completed_at": r.completed_at,
"duration": round(r.completed_at - r.sent_at, 3) if r.completed_at else 0,
}
for r in records[:limit]
]