Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
337 lines
12 KiB
Python
337 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
import asyncio
|
||
import logging
|
||
import socket
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import re
|
||
import os
|
||
import sys
|
||
import time
|
||
|
||
from core.registry import DeviceRegistry
|
||
from core.keystore import KeyStore
|
||
from core.transport import Transport
|
||
from core.session import Session
|
||
from log.manager import LogManager
|
||
from commands.registry import CommandRegistry
|
||
from commands.reboot import RebootCommand
|
||
from core.groups import GroupRegistry
|
||
from tui.commander import Commander
|
||
from utils.constant import HOST, PORT
|
||
from utils.display import Display
|
||
|
||
# New wire format: device_id:BASE64 + '\n'
|
||
FRAME_RE = re.compile(br'^[A-Za-z0-9_-]+:[A-Za-z0-9+/=]+$')
|
||
|
||
RX_BUF_SIZE = 4096
|
||
MAX_BUFFER_SIZE = 1024 * 1024 # 1MB max buffer to prevent memory exhaustion
|
||
DEVICE_TIMEOUT_SECONDS = 300 # Devices are considered inactive after 5 minutes without a heartbeat
|
||
PROBE_THRESHOLD_SECONDS = 240 # Probe devices after 4 minutes of inactivity
|
||
HEARTBEAT_CHECK_INTERVAL = 10 # Check every 10 seconds
|
||
|
||
|
||
# ============================================================
|
||
# HELLO handshake (server identity verification)
|
||
# ============================================================
|
||
def _handle_hello(sock: socket.socket, device_id: str, transport: Transport):
|
||
"""Respond to HELLO handshake with an AEAD-encrypted challenge.
|
||
|
||
The device verifies the server possesses the shared key by
|
||
decrypting and checking the AEAD tag. No new crypto primitives
|
||
are needed — this reuses the existing ChaCha20-Poly1305 context.
|
||
"""
|
||
crypto = transport._get_crypto(device_id)
|
||
if crypto is None:
|
||
Display.error(f"HELLO from unknown device '{device_id}' – no key in keystore")
|
||
return
|
||
|
||
challenge = os.urandom(32)
|
||
encrypted = crypto.encrypt(challenge)
|
||
b64_challenge = crypto.b64_encode(encrypted)
|
||
|
||
sock.sendall(b64_challenge + b"\n")
|
||
Display.system_message(f"HELLO handshake: challenge sent to {device_id}")
|
||
|
||
|
||
# ============================================================
|
||
# Client handler
|
||
# ============================================================
|
||
def client_thread(sock: socket.socket, addr, transport: Transport, registry: DeviceRegistry):
|
||
Display.system_message(f"Client connected from {addr}")
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||
buffer = b""
|
||
device_id = None # To track which device disconnected
|
||
first_frame = True # For HELLO handshake detection
|
||
|
||
try:
|
||
while True:
|
||
data = sock.recv(RX_BUF_SIZE)
|
||
if not data:
|
||
break
|
||
|
||
buffer += data
|
||
|
||
# Prevent memory exhaustion from malicious clients
|
||
if len(buffer) > MAX_BUFFER_SIZE:
|
||
Display.error(f"Buffer overflow from {addr}, dropping connection")
|
||
break
|
||
|
||
# Strict framing by '\n' (ESP behavior)
|
||
while b"\n" in buffer:
|
||
line, buffer = buffer.split(b"\n", 1)
|
||
line = line.strip()
|
||
|
||
if not line:
|
||
continue
|
||
|
||
# HELLO handshake: server identity verification
|
||
if first_frame and line.startswith(b"HELLO:"):
|
||
first_frame = False
|
||
hello_device_id = line[6:].decode(errors="ignore").strip()
|
||
if hello_device_id:
|
||
_handle_hello(sock, hello_device_id, transport)
|
||
else:
|
||
Display.error(f"HELLO with empty device_id from {addr}")
|
||
continue
|
||
|
||
first_frame = False
|
||
|
||
# Validate frame format: device_id:base64
|
||
if not FRAME_RE.match(line):
|
||
Display.system_message(f"Ignoring invalid frame from {addr}")
|
||
continue
|
||
|
||
try:
|
||
# Pass registry to handle_incoming to update device status
|
||
transport.handle_incoming(sock, addr, line)
|
||
# After successful handling, try to get device_id
|
||
if not device_id and registry.get_device_by_sock(sock):
|
||
device_id = registry.get_device_by_sock(sock).id
|
||
except Exception as e:
|
||
Display.error(f"Transport error from {addr}: {e}")
|
||
|
||
except Exception as e:
|
||
Display.error(f"Client error from {addr}: {e}")
|
||
|
||
finally:
|
||
try:
|
||
sock.close()
|
||
except Exception:
|
||
pass
|
||
if device_id:
|
||
Display.device_event(device_id, "Disconnected")
|
||
registry.remove(device_id) # Remove device from registry on disconnect
|
||
else:
|
||
Display.system_message(f"Client disconnected from {addr}")
|
||
|
||
|
||
# ============================================================
|
||
# Main server
|
||
# ============================================================
|
||
def main():
|
||
# ============================
|
||
# Security check
|
||
# ============================
|
||
from streams.config import validate_config
|
||
if not validate_config():
|
||
print("[C3PO] FATAL: Insecure default values detected.")
|
||
print("[C3PO] Copy .env.example to .env and change the defaults.")
|
||
print("[C3PO] To bypass (dev only): set ESPILON_ALLOW_DEFAULTS=1")
|
||
if not os.environ.get("ESPILON_ALLOW_DEFAULTS"):
|
||
sys.exit(1)
|
||
|
||
# ============================
|
||
# Core components
|
||
# ============================
|
||
registry = DeviceRegistry()
|
||
logger = LogManager()
|
||
keystore = KeyStore(os.path.join(os.path.dirname(os.path.abspath(__file__)), "keys.json"))
|
||
|
||
commands = CommandRegistry()
|
||
commands.register(RebootCommand())
|
||
groups = GroupRegistry()
|
||
|
||
# Create Transport, then Session, then wire them together
|
||
transport = Transport(registry, logger, keystore)
|
||
session = Session(registry, commands, groups, transport)
|
||
transport.set_session(session)
|
||
|
||
commander = Commander(session)
|
||
|
||
# ============================
|
||
# TCP server
|
||
# ============================
|
||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
||
try:
|
||
server.bind((HOST, PORT))
|
||
except OSError as e:
|
||
print(f"Failed to bind server to {HOST}:{PORT}: {e}")
|
||
sys.exit(1)
|
||
|
||
server.listen()
|
||
|
||
# Function to periodically check device status and probe inactive devices
|
||
def device_status_checker():
|
||
probed = set() # Track devices already probed this cycle
|
||
while True:
|
||
now = time.time()
|
||
for device in registry.all():
|
||
elapsed = now - device.last_seen
|
||
if elapsed > DEVICE_TIMEOUT_SECONDS:
|
||
if device.status != "Inactive":
|
||
device.status = "Inactive"
|
||
Display.device_event(device.id, "Status changed to Inactive (timeout)")
|
||
probed.discard(device.id)
|
||
elif elapsed > PROBE_THRESHOLD_SECONDS and device.id not in probed:
|
||
try:
|
||
transport.probe_device(device)
|
||
probed.add(device.id)
|
||
Display.device_event(device.id, "Probing (no activity for 4min)")
|
||
except Exception:
|
||
pass
|
||
elif elapsed <= PROBE_THRESHOLD_SECONDS:
|
||
probed.discard(device.id)
|
||
if device.status == "Inactive":
|
||
device.status = "Connected"
|
||
Display.device_event(device.id, "Status changed to Connected")
|
||
time.sleep(HEARTBEAT_CHECK_INTERVAL)
|
||
|
||
# Function to accept client connections (bounded thread pool)
|
||
client_pool = ThreadPoolExecutor(max_workers=50, thread_name_prefix="c2-client")
|
||
|
||
def accept_loop():
|
||
while True:
|
||
try:
|
||
sock, addr = server.accept()
|
||
sock.settimeout(300) # 5 min idle timeout per connection
|
||
client_pool.submit(client_thread, sock, addr, transport, registry)
|
||
except OSError:
|
||
break
|
||
except Exception as e:
|
||
Display.error(f"Server error: {e}")
|
||
|
||
# Device status checker thread
|
||
threading.Thread(target=device_status_checker, daemon=True).start()
|
||
# Accept loop thread
|
||
threading.Thread(target=accept_loop, daemon=True).start()
|
||
|
||
# ============================
|
||
# Tunnel / SOCKS5 proxy server
|
||
# ============================
|
||
from core.tunnel import TunnelServer
|
||
from streams.config import TUNNEL_SOCKS_HOST, TUNNEL_SOCKS_PORT, TUNNEL_LISTEN_PORT
|
||
|
||
tunnel_server = TunnelServer(
|
||
keystore=keystore,
|
||
socks_host=TUNNEL_SOCKS_HOST,
|
||
socks_port=TUNNEL_SOCKS_PORT,
|
||
tunnel_port=TUNNEL_LISTEN_PORT,
|
||
)
|
||
session.tunnel_server = tunnel_server
|
||
|
||
def run_tunnel():
|
||
logging.basicConfig(level=logging.INFO,
|
||
format="[TUNNEL] %(levelname)s %(message)s")
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
loop.run_until_complete(tunnel_server.start())
|
||
loop.run_forever()
|
||
|
||
threading.Thread(target=run_tunnel, daemon=True, name="tunnel-server").start()
|
||
print(f"[C3PO] SOCKS5 proxy on {TUNNEL_SOCKS_HOST}:{TUNNEL_SOCKS_PORT}, "
|
||
f"tunnel listener on 0.0.0.0:{TUNNEL_LISTEN_PORT}")
|
||
|
||
# ============================
|
||
# Mode selection: TUI or headless
|
||
# ============================
|
||
headless = "--headless" in sys.argv
|
||
|
||
if headless:
|
||
# Headless mode: auto-start web server, no TUI
|
||
from streams.config import (
|
||
WEB_HOST, WEB_PORT, IMAGE_DIR, MULTILAT_AUTH_TOKEN,
|
||
DEFAULT_USERNAME, DEFAULT_PASSWORD, FLASK_SECRET_KEY
|
||
)
|
||
from web.server import UnifiedWebServer
|
||
|
||
# Initialize honeypot dashboard components
|
||
try:
|
||
from hp_dashboard import HpStore, HpCommander, HpAlertEngine, HpGeoLookup
|
||
_c3po_root = os.path.dirname(os.path.abspath(__file__))
|
||
_data_dir = os.path.join(_c3po_root, "data")
|
||
os.makedirs(_data_dir, exist_ok=True)
|
||
|
||
session.hp_geo = HpGeoLookup(
|
||
db_path=os.path.join(_data_dir, "honeypot_geo.db"))
|
||
session.hp_store = HpStore(
|
||
db_path=os.path.join(_data_dir, "honeypot_events.db"),
|
||
geo_lookup=session.hp_geo)
|
||
session.hp_alerts = HpAlertEngine(
|
||
db_path=os.path.join(_data_dir, "honeypot_alerts.db"))
|
||
session.hp_alerts.set_store(session.hp_store)
|
||
session.hp_store.set_alert_engine(session.hp_alerts)
|
||
session.hp_commander = HpCommander(
|
||
get_transport=lambda: transport,
|
||
get_registry=lambda: registry,
|
||
)
|
||
transport.hp_store = session.hp_store
|
||
transport.hp_commander = session.hp_commander
|
||
print("[C3PO] Honeypot dashboard enabled")
|
||
except ImportError:
|
||
print("[C3PO] Honeypot dashboard not available (hp_dashboard not found)")
|
||
|
||
session.web_server = UnifiedWebServer(
|
||
host=WEB_HOST,
|
||
port=WEB_PORT,
|
||
image_dir=IMAGE_DIR,
|
||
username=DEFAULT_USERNAME,
|
||
password=DEFAULT_PASSWORD,
|
||
secret_key=FLASK_SECRET_KEY,
|
||
device_registry=registry,
|
||
transport=transport,
|
||
session=session,
|
||
multilat_token=MULTILAT_AUTH_TOKEN,
|
||
hp_store=getattr(session, 'hp_store', None),
|
||
hp_commander=getattr(session, 'hp_commander', None),
|
||
hp_alerts=getattr(session, 'hp_alerts', None),
|
||
hp_geo=getattr(session, 'hp_geo', None),
|
||
)
|
||
|
||
if session.web_server.start():
|
||
print(f"[C3PO] Web server started at {session.web_server.get_url()}")
|
||
else:
|
||
print("[C3PO] Web server failed to start")
|
||
sys.exit(1)
|
||
|
||
print(f"[C3PO] C2 listening on {HOST}:{PORT}")
|
||
print(f"[C3PO] Running in headless mode (Ctrl+C to stop)")
|
||
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
print("\n[C3PO] Shutting down...")
|
||
session.web_server.stop()
|
||
else:
|
||
# TUI mode
|
||
try:
|
||
from tui.app import C3POApp
|
||
Display.enable_tui_mode()
|
||
app = C3POApp(registry=registry, session=session, commander=commander)
|
||
app.run()
|
||
except ImportError as e:
|
||
print(f"TUI not available: {e}")
|
||
print("Install textual: pip install textual")
|
||
sys.exit(1)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
|
||
server.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|