espilon-source/tools/C3PO/c3po.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

337 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import asyncio
import logging
import socket
import threading
from concurrent.futures import ThreadPoolExecutor
import re
import os
import sys
import time
from core.registry import DeviceRegistry
from core.keystore import KeyStore
from core.transport import Transport
from core.session import Session
from log.manager import LogManager
from commands.registry import CommandRegistry
from commands.reboot import RebootCommand
from core.groups import GroupRegistry
from tui.commander import Commander
from utils.constant import HOST, PORT
from utils.display import Display
# New wire format: device_id:BASE64 + '\n'
FRAME_RE = re.compile(br'^[A-Za-z0-9_-]+:[A-Za-z0-9+/=]+$')
RX_BUF_SIZE = 4096
MAX_BUFFER_SIZE = 1024 * 1024 # 1MB max buffer to prevent memory exhaustion
DEVICE_TIMEOUT_SECONDS = 300 # Devices are considered inactive after 5 minutes without a heartbeat
PROBE_THRESHOLD_SECONDS = 240 # Probe devices after 4 minutes of inactivity
HEARTBEAT_CHECK_INTERVAL = 10 # Check every 10 seconds
# ============================================================
# HELLO handshake (server identity verification)
# ============================================================
def _handle_hello(sock: socket.socket, device_id: str, transport: Transport):
"""Respond to HELLO handshake with an AEAD-encrypted challenge.
The device verifies the server possesses the shared key by
decrypting and checking the AEAD tag. No new crypto primitives
are needed — this reuses the existing ChaCha20-Poly1305 context.
"""
crypto = transport._get_crypto(device_id)
if crypto is None:
Display.error(f"HELLO from unknown device '{device_id}' no key in keystore")
return
challenge = os.urandom(32)
encrypted = crypto.encrypt(challenge)
b64_challenge = crypto.b64_encode(encrypted)
sock.sendall(b64_challenge + b"\n")
Display.system_message(f"HELLO handshake: challenge sent to {device_id}")
# ============================================================
# Client handler
# ============================================================
def client_thread(sock: socket.socket, addr, transport: Transport, registry: DeviceRegistry):
Display.system_message(f"Client connected from {addr}")
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
buffer = b""
device_id = None # To track which device disconnected
first_frame = True # For HELLO handshake detection
try:
while True:
data = sock.recv(RX_BUF_SIZE)
if not data:
break
buffer += data
# Prevent memory exhaustion from malicious clients
if len(buffer) > MAX_BUFFER_SIZE:
Display.error(f"Buffer overflow from {addr}, dropping connection")
break
# Strict framing by '\n' (ESP behavior)
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
line = line.strip()
if not line:
continue
# HELLO handshake: server identity verification
if first_frame and line.startswith(b"HELLO:"):
first_frame = False
hello_device_id = line[6:].decode(errors="ignore").strip()
if hello_device_id:
_handle_hello(sock, hello_device_id, transport)
else:
Display.error(f"HELLO with empty device_id from {addr}")
continue
first_frame = False
# Validate frame format: device_id:base64
if not FRAME_RE.match(line):
Display.system_message(f"Ignoring invalid frame from {addr}")
continue
try:
# Pass registry to handle_incoming to update device status
transport.handle_incoming(sock, addr, line)
# After successful handling, try to get device_id
if not device_id and registry.get_device_by_sock(sock):
device_id = registry.get_device_by_sock(sock).id
except Exception as e:
Display.error(f"Transport error from {addr}: {e}")
except Exception as e:
Display.error(f"Client error from {addr}: {e}")
finally:
try:
sock.close()
except Exception:
pass
if device_id:
Display.device_event(device_id, "Disconnected")
registry.remove(device_id) # Remove device from registry on disconnect
else:
Display.system_message(f"Client disconnected from {addr}")
# ============================================================
# Main server
# ============================================================
def main():
# ============================
# Security check
# ============================
from streams.config import validate_config
if not validate_config():
print("[C3PO] FATAL: Insecure default values detected.")
print("[C3PO] Copy .env.example to .env and change the defaults.")
print("[C3PO] To bypass (dev only): set ESPILON_ALLOW_DEFAULTS=1")
if not os.environ.get("ESPILON_ALLOW_DEFAULTS"):
sys.exit(1)
# ============================
# Core components
# ============================
registry = DeviceRegistry()
logger = LogManager()
keystore = KeyStore(os.path.join(os.path.dirname(os.path.abspath(__file__)), "keys.json"))
commands = CommandRegistry()
commands.register(RebootCommand())
groups = GroupRegistry()
# Create Transport, then Session, then wire them together
transport = Transport(registry, logger, keystore)
session = Session(registry, commands, groups, transport)
transport.set_session(session)
commander = Commander(session)
# ============================
# TCP server
# ============================
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((HOST, PORT))
except OSError as e:
print(f"Failed to bind server to {HOST}:{PORT}: {e}")
sys.exit(1)
server.listen()
# Function to periodically check device status and probe inactive devices
def device_status_checker():
probed = set() # Track devices already probed this cycle
while True:
now = time.time()
for device in registry.all():
elapsed = now - device.last_seen
if elapsed > DEVICE_TIMEOUT_SECONDS:
if device.status != "Inactive":
device.status = "Inactive"
Display.device_event(device.id, "Status changed to Inactive (timeout)")
probed.discard(device.id)
elif elapsed > PROBE_THRESHOLD_SECONDS and device.id not in probed:
try:
transport.probe_device(device)
probed.add(device.id)
Display.device_event(device.id, "Probing (no activity for 4min)")
except Exception:
pass
elif elapsed <= PROBE_THRESHOLD_SECONDS:
probed.discard(device.id)
if device.status == "Inactive":
device.status = "Connected"
Display.device_event(device.id, "Status changed to Connected")
time.sleep(HEARTBEAT_CHECK_INTERVAL)
# Function to accept client connections (bounded thread pool)
client_pool = ThreadPoolExecutor(max_workers=50, thread_name_prefix="c2-client")
def accept_loop():
while True:
try:
sock, addr = server.accept()
sock.settimeout(300) # 5 min idle timeout per connection
client_pool.submit(client_thread, sock, addr, transport, registry)
except OSError:
break
except Exception as e:
Display.error(f"Server error: {e}")
# Device status checker thread
threading.Thread(target=device_status_checker, daemon=True).start()
# Accept loop thread
threading.Thread(target=accept_loop, daemon=True).start()
# ============================
# Tunnel / SOCKS5 proxy server
# ============================
from core.tunnel import TunnelServer
from streams.config import TUNNEL_SOCKS_HOST, TUNNEL_SOCKS_PORT, TUNNEL_LISTEN_PORT
tunnel_server = TunnelServer(
keystore=keystore,
socks_host=TUNNEL_SOCKS_HOST,
socks_port=TUNNEL_SOCKS_PORT,
tunnel_port=TUNNEL_LISTEN_PORT,
)
session.tunnel_server = tunnel_server
def run_tunnel():
logging.basicConfig(level=logging.INFO,
format="[TUNNEL] %(levelname)s %(message)s")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(tunnel_server.start())
loop.run_forever()
threading.Thread(target=run_tunnel, daemon=True, name="tunnel-server").start()
print(f"[C3PO] SOCKS5 proxy on {TUNNEL_SOCKS_HOST}:{TUNNEL_SOCKS_PORT}, "
f"tunnel listener on 0.0.0.0:{TUNNEL_LISTEN_PORT}")
# ============================
# Mode selection: TUI or headless
# ============================
headless = "--headless" in sys.argv
if headless:
# Headless mode: auto-start web server, no TUI
from streams.config import (
WEB_HOST, WEB_PORT, IMAGE_DIR, MULTILAT_AUTH_TOKEN,
DEFAULT_USERNAME, DEFAULT_PASSWORD, FLASK_SECRET_KEY
)
from web.server import UnifiedWebServer
# Initialize honeypot dashboard components
try:
from hp_dashboard import HpStore, HpCommander, HpAlertEngine, HpGeoLookup
_c3po_root = os.path.dirname(os.path.abspath(__file__))
_data_dir = os.path.join(_c3po_root, "data")
os.makedirs(_data_dir, exist_ok=True)
session.hp_geo = HpGeoLookup(
db_path=os.path.join(_data_dir, "honeypot_geo.db"))
session.hp_store = HpStore(
db_path=os.path.join(_data_dir, "honeypot_events.db"),
geo_lookup=session.hp_geo)
session.hp_alerts = HpAlertEngine(
db_path=os.path.join(_data_dir, "honeypot_alerts.db"))
session.hp_alerts.set_store(session.hp_store)
session.hp_store.set_alert_engine(session.hp_alerts)
session.hp_commander = HpCommander(
get_transport=lambda: transport,
get_registry=lambda: registry,
)
transport.hp_store = session.hp_store
transport.hp_commander = session.hp_commander
print("[C3PO] Honeypot dashboard enabled")
except ImportError:
print("[C3PO] Honeypot dashboard not available (hp_dashboard not found)")
session.web_server = UnifiedWebServer(
host=WEB_HOST,
port=WEB_PORT,
image_dir=IMAGE_DIR,
username=DEFAULT_USERNAME,
password=DEFAULT_PASSWORD,
secret_key=FLASK_SECRET_KEY,
device_registry=registry,
transport=transport,
session=session,
multilat_token=MULTILAT_AUTH_TOKEN,
hp_store=getattr(session, 'hp_store', None),
hp_commander=getattr(session, 'hp_commander', None),
hp_alerts=getattr(session, 'hp_alerts', None),
hp_geo=getattr(session, 'hp_geo', None),
)
if session.web_server.start():
print(f"[C3PO] Web server started at {session.web_server.get_url()}")
else:
print("[C3PO] Web server failed to start")
sys.exit(1)
print(f"[C3PO] C2 listening on {HOST}:{PORT}")
print(f"[C3PO] Running in headless mode (Ctrl+C to stop)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[C3PO] Shutting down...")
session.web_server.stop()
else:
# TUI mode
try:
from tui.app import C3POApp
Display.enable_tui_mode()
app = C3POApp(registry=registry, session=session, commander=commander)
app.run()
except ImportError as e:
print(f"TUI not available: {e}")
print("Install textual: pip install textual")
sys.exit(1)
except KeyboardInterrupt:
pass
server.close()
if __name__ == "__main__":
main()