#!/usr/bin/env python3 import asyncio import logging import socket import threading from concurrent.futures import ThreadPoolExecutor import re import os import sys import time from core.registry import DeviceRegistry from core.keystore import KeyStore from core.transport import Transport from core.session import Session from log.manager import LogManager from commands.registry import CommandRegistry from commands.reboot import RebootCommand from core.groups import GroupRegistry from tui.commander import Commander from utils.constant import HOST, PORT from utils.display import Display # New wire format: device_id:BASE64 + '\n' FRAME_RE = re.compile(br'^[A-Za-z0-9_-]+:[A-Za-z0-9+/=]+$') RX_BUF_SIZE = 4096 MAX_BUFFER_SIZE = 1024 * 1024 # 1MB max buffer to prevent memory exhaustion DEVICE_TIMEOUT_SECONDS = 300 # Devices are considered inactive after 5 minutes without a heartbeat PROBE_THRESHOLD_SECONDS = 240 # Probe devices after 4 minutes of inactivity HEARTBEAT_CHECK_INTERVAL = 10 # Check every 10 seconds # ============================================================ # HELLO handshake (server identity verification) # ============================================================ def _handle_hello(sock: socket.socket, device_id: str, transport: Transport): """Respond to HELLO handshake with an AEAD-encrypted challenge. The device verifies the server possesses the shared key by decrypting and checking the AEAD tag. No new crypto primitives are needed — this reuses the existing ChaCha20-Poly1305 context. """ crypto = transport._get_crypto(device_id) if crypto is None: Display.error(f"HELLO from unknown device '{device_id}' – no key in keystore") return challenge = os.urandom(32) encrypted = crypto.encrypt(challenge) b64_challenge = crypto.b64_encode(encrypted) sock.sendall(b64_challenge + b"\n") Display.system_message(f"HELLO handshake: challenge sent to {device_id}") # ============================================================ # Client handler # ============================================================ def client_thread(sock: socket.socket, addr, transport: Transport, registry: DeviceRegistry): Display.system_message(f"Client connected from {addr}") sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) buffer = b"" device_id = None # To track which device disconnected first_frame = True # For HELLO handshake detection try: while True: data = sock.recv(RX_BUF_SIZE) if not data: break buffer += data # Prevent memory exhaustion from malicious clients if len(buffer) > MAX_BUFFER_SIZE: Display.error(f"Buffer overflow from {addr}, dropping connection") break # Strict framing by '\n' (ESP behavior) while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) line = line.strip() if not line: continue # HELLO handshake: server identity verification if first_frame and line.startswith(b"HELLO:"): first_frame = False hello_device_id = line[6:].decode(errors="ignore").strip() if hello_device_id: _handle_hello(sock, hello_device_id, transport) else: Display.error(f"HELLO with empty device_id from {addr}") continue first_frame = False # Validate frame format: device_id:base64 if not FRAME_RE.match(line): Display.system_message(f"Ignoring invalid frame from {addr}") continue try: # Pass registry to handle_incoming to update device status transport.handle_incoming(sock, addr, line) # After successful handling, try to get device_id if not device_id and registry.get_device_by_sock(sock): device_id = registry.get_device_by_sock(sock).id except Exception as e: Display.error(f"Transport error from {addr}: {e}") except Exception as e: Display.error(f"Client error from {addr}: {e}") finally: try: sock.close() except Exception: pass if device_id: Display.device_event(device_id, "Disconnected") registry.remove(device_id) # Remove device from registry on disconnect else: Display.system_message(f"Client disconnected from {addr}") # ============================================================ # Main server # ============================================================ def main(): # ============================ # Security check # ============================ from streams.config import validate_config if not validate_config(): print("[C3PO] FATAL: Insecure default values detected.") print("[C3PO] Copy .env.example to .env and change the defaults.") print("[C3PO] To bypass (dev only): set ESPILON_ALLOW_DEFAULTS=1") if not os.environ.get("ESPILON_ALLOW_DEFAULTS"): sys.exit(1) # ============================ # Core components # ============================ registry = DeviceRegistry() logger = LogManager() keystore = KeyStore(os.path.join(os.path.dirname(os.path.abspath(__file__)), "keys.json")) commands = CommandRegistry() commands.register(RebootCommand()) groups = GroupRegistry() # Create Transport, then Session, then wire them together transport = Transport(registry, logger, keystore) session = Session(registry, commands, groups, transport) transport.set_session(session) commander = Commander(session) # ============================ # TCP server # ============================ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: server.bind((HOST, PORT)) except OSError as e: print(f"Failed to bind server to {HOST}:{PORT}: {e}") sys.exit(1) server.listen() # Function to periodically check device status and probe inactive devices def device_status_checker(): probed = set() # Track devices already probed this cycle while True: now = time.time() for device in registry.all(): elapsed = now - device.last_seen if elapsed > DEVICE_TIMEOUT_SECONDS: if device.status != "Inactive": device.status = "Inactive" Display.device_event(device.id, "Status changed to Inactive (timeout)") probed.discard(device.id) elif elapsed > PROBE_THRESHOLD_SECONDS and device.id not in probed: try: transport.probe_device(device) probed.add(device.id) Display.device_event(device.id, "Probing (no activity for 4min)") except Exception: pass elif elapsed <= PROBE_THRESHOLD_SECONDS: probed.discard(device.id) if device.status == "Inactive": device.status = "Connected" Display.device_event(device.id, "Status changed to Connected") time.sleep(HEARTBEAT_CHECK_INTERVAL) # Function to accept client connections (bounded thread pool) client_pool = ThreadPoolExecutor(max_workers=50, thread_name_prefix="c2-client") def accept_loop(): while True: try: sock, addr = server.accept() sock.settimeout(300) # 5 min idle timeout per connection client_pool.submit(client_thread, sock, addr, transport, registry) except OSError: break except Exception as e: Display.error(f"Server error: {e}") # Device status checker thread threading.Thread(target=device_status_checker, daemon=True).start() # Accept loop thread threading.Thread(target=accept_loop, daemon=True).start() # ============================ # Tunnel / SOCKS5 proxy server # ============================ from core.tunnel import TunnelServer from streams.config import TUNNEL_SOCKS_HOST, TUNNEL_SOCKS_PORT, TUNNEL_LISTEN_PORT tunnel_server = TunnelServer( keystore=keystore, socks_host=TUNNEL_SOCKS_HOST, socks_port=TUNNEL_SOCKS_PORT, tunnel_port=TUNNEL_LISTEN_PORT, ) session.tunnel_server = tunnel_server def run_tunnel(): logging.basicConfig(level=logging.INFO, format="[TUNNEL] %(levelname)s %(message)s") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(tunnel_server.start()) loop.run_forever() threading.Thread(target=run_tunnel, daemon=True, name="tunnel-server").start() print(f"[C3PO] SOCKS5 proxy on {TUNNEL_SOCKS_HOST}:{TUNNEL_SOCKS_PORT}, " f"tunnel listener on 0.0.0.0:{TUNNEL_LISTEN_PORT}") # ============================ # Mode selection: TUI or headless # ============================ headless = "--headless" in sys.argv if headless: # Headless mode: auto-start web server, no TUI from streams.config import ( WEB_HOST, WEB_PORT, IMAGE_DIR, MULTILAT_AUTH_TOKEN, DEFAULT_USERNAME, DEFAULT_PASSWORD, FLASK_SECRET_KEY ) from web.server import UnifiedWebServer # Initialize honeypot dashboard components try: from hp_dashboard import HpStore, HpCommander, HpAlertEngine, HpGeoLookup _c3po_root = os.path.dirname(os.path.abspath(__file__)) _data_dir = os.path.join(_c3po_root, "data") os.makedirs(_data_dir, exist_ok=True) session.hp_geo = HpGeoLookup( db_path=os.path.join(_data_dir, "honeypot_geo.db")) session.hp_store = HpStore( db_path=os.path.join(_data_dir, "honeypot_events.db"), geo_lookup=session.hp_geo) session.hp_alerts = HpAlertEngine( db_path=os.path.join(_data_dir, "honeypot_alerts.db")) session.hp_alerts.set_store(session.hp_store) session.hp_store.set_alert_engine(session.hp_alerts) session.hp_commander = HpCommander( get_transport=lambda: transport, get_registry=lambda: registry, ) transport.hp_store = session.hp_store transport.hp_commander = session.hp_commander print("[C3PO] Honeypot dashboard enabled") except ImportError: print("[C3PO] Honeypot dashboard not available (hp_dashboard not found)") session.web_server = UnifiedWebServer( host=WEB_HOST, port=WEB_PORT, image_dir=IMAGE_DIR, username=DEFAULT_USERNAME, password=DEFAULT_PASSWORD, secret_key=FLASK_SECRET_KEY, device_registry=registry, transport=transport, session=session, multilat_token=MULTILAT_AUTH_TOKEN, hp_store=getattr(session, 'hp_store', None), hp_commander=getattr(session, 'hp_commander', None), hp_alerts=getattr(session, 'hp_alerts', None), hp_geo=getattr(session, 'hp_geo', None), ) if session.web_server.start(): print(f"[C3PO] Web server started at {session.web_server.get_url()}") else: print("[C3PO] Web server failed to start") sys.exit(1) print(f"[C3PO] C2 listening on {HOST}:{PORT}") print(f"[C3PO] Running in headless mode (Ctrl+C to stop)") try: while True: time.sleep(1) except KeyboardInterrupt: print("\n[C3PO] Shutting down...") session.web_server.stop() else: # TUI mode try: from tui.app import C3POApp Display.enable_tui_mode() app = C3POApp(registry=registry, session=session, commander=commander) app.run() except ImportError as e: print(f"TUI not available: {e}") print("Install textual: pip install textual") sys.exit(1) except KeyboardInterrupt: pass server.close() if __name__ == "__main__": main()