espilon-source/tools/C3PO/core/tunnel.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

685 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
TunnelServer SOCKS5 proxy via ESP32 channel multiplexing.
Architecture:
- SOCKS5 server (asyncio, port 1080) accepts operator tools (proxychains, curl, nmap)
- Tunnel listener (asyncio, port 2627) accepts ESP32 bot connections
- Binary framing protocol maps SOCKS clients to numbered channels
- ESP32 performs actual TCP connections on the target network
The ESP32 never sees SOCKS5 packets — only OPEN/DATA/CLOSE/ERROR frames.
"""
import asyncio
import struct
import logging
import time
import socket
from typing import Optional
from core.crypto import CryptoContext
from core.keystore import KeyStore
log = logging.getLogger("tunnel")
# ============================================================
# Protocol constants (must match tun_core.h)
# ============================================================
FRAME_HDR_SIZE = 5
FRAME_MAX_DATA = 4096
FRAME_OPEN = 0x01
FRAME_OPEN_OK = 0x02
FRAME_DATA = 0x03
FRAME_CLOSE = 0x04
FRAME_ERROR = 0x05
FRAME_PING = 0x06
FRAME_PONG = 0x07
# Authentication
TUN_MAGIC = b"TUN\x01"
TUN_AUTH_TOKEN = b"espilon-tunnel-v1"
# SOCKS5
SOCKS5_VER = 0x05
SOCKS5_CMD_CONNECT = 0x01
SOCKS5_ATYP_IPV4 = 0x01
SOCKS5_ATYP_DOMAIN = 0x03
SOCKS5_ATYP_IPV6 = 0x04
# Close reasons
CLOSE_NORMAL = 0
CLOSE_RESET = 1
CLOSE_TIMEOUT = 2
# Timeouts
OPEN_TIMEOUT_S = 10
SOCKS_READ_SIZE = 4096
TUNNEL_READ_SIZE = 8192
# ============================================================
# TunnelChannel one SOCKS5 client <-> one ESP32 channel
# ============================================================
class TunnelChannel:
__slots__ = (
"channel_id", "socks_reader", "socks_writer",
"open_event", "open_success", "error_msg",
"closed", "bytes_tx", "bytes_rx", "target",
)
def __init__(self, channel_id: int,
socks_reader: asyncio.StreamReader,
socks_writer: asyncio.StreamWriter):
self.channel_id = channel_id
self.socks_reader = socks_reader
self.socks_writer = socks_writer
self.open_event = asyncio.Event()
self.open_success = False
self.error_msg = ""
self.closed = False
self.bytes_tx = 0
self.bytes_rx = 0
self.target = ""
# ============================================================
# DeviceTunnel binary-framed TCP connection to one ESP32
# ============================================================
class DeviceTunnel:
def __init__(self, device_id: str,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
crypto: Optional[CryptoContext],
encrypted: bool = False,
max_channels: int = 8):
self.device_id = device_id
self.reader = reader
self.writer = writer
self.crypto = crypto
self.encrypted = encrypted
self.max_channels = max_channels
self.channels: dict[int, TunnelChannel] = {}
self._next_channel_id = 1
self._lock = asyncio.Lock()
self.connected = True
def allocate_channel_id(self) -> Optional[int]:
"""Return next free channel id (1-based, cycles within max_channels)."""
for _ in range(self.max_channels):
cid = self._next_channel_id
self._next_channel_id = (self._next_channel_id % self.max_channels) + 1
if cid not in self.channels:
return cid
return None
async def send_frame(self, channel_id: int, frame_type: int,
data: bytes = b""):
"""Encode and send a frame to the ESP32."""
hdr = struct.pack("!BHB H", channel_id >> 8, channel_id & 0xFF,
frame_type, len(data))
# Simpler: pack as big-endian
hdr = struct.pack(">HBH", channel_id, frame_type, len(data))
frame = hdr + data
if self.encrypted and self.crypto:
encrypted = self.crypto.encrypt(frame)
length_prefix = struct.pack(">H", len(encrypted))
self.writer.write(length_prefix + encrypted)
else:
self.writer.write(frame)
try:
await self.writer.drain()
except (ConnectionError, OSError):
self.connected = False
async def read_frame(self) -> Optional[tuple[int, int, bytes]]:
"""Read one frame from ESP32. Returns (channel_id, type, data) or None on disconnect."""
try:
if self.encrypted and self.crypto:
# Read 2-byte length prefix
len_bytes = await self.reader.readexactly(2)
enc_len = struct.unpack(">H", len_bytes)[0]
enc_data = await self.reader.readexactly(enc_len)
frame = self.crypto.decrypt(enc_data)
else:
# Read 5-byte header
hdr = await self.reader.readexactly(FRAME_HDR_SIZE)
frame = hdr
channel_id, frame_type, data_len = struct.unpack(">HBH", hdr)
if data_len > FRAME_MAX_DATA:
log.warning(f"[{self.device_id}] frame too large: {data_len}")
return None
if data_len > 0:
payload = await self.reader.readexactly(data_len)
return (channel_id, frame_type, payload)
return (channel_id, frame_type, b"")
except (asyncio.IncompleteReadError, ConnectionError, OSError):
self.connected = False
return None
# Encrypted path: parse decrypted frame
if len(frame) < FRAME_HDR_SIZE:
return None
channel_id, frame_type, data_len = struct.unpack(">HBH", frame[:5])
data = frame[5:5 + data_len]
return (channel_id, frame_type, data)
async def run_rx_loop(self, server: 'TunnelServer'):
"""Read frames from ESP32, dispatch to channels."""
log.info(f"[{self.device_id}] RX loop started")
while self.connected:
result = await self.read_frame()
if result is None:
break
channel_id, frame_type, data = result
if frame_type == FRAME_OPEN_OK:
ch = self.channels.get(channel_id)
if ch:
ch.open_success = True
ch.open_event.set()
elif frame_type == FRAME_DATA:
ch = self.channels.get(channel_id)
if ch and not ch.closed:
ch.bytes_rx += len(data)
try:
ch.socks_writer.write(data)
await ch.socks_writer.drain()
except (ConnectionError, OSError):
ch.closed = True
await self.send_frame(channel_id, FRAME_CLOSE,
bytes([CLOSE_RESET]))
elif frame_type == FRAME_CLOSE:
ch = self.channels.pop(channel_id, None)
if ch:
ch.closed = True
ch.open_event.set() # Unblock if waiting
try:
ch.socks_writer.close()
await ch.socks_writer.wait_closed()
except Exception:
pass
log.info(f"[{self.device_id}] chan {channel_id} closed "
f"(tx={ch.bytes_tx} rx={ch.bytes_rx})")
elif frame_type == FRAME_ERROR:
ch = self.channels.get(channel_id)
if ch:
ch.error_msg = data.decode(errors="ignore")
ch.open_success = False
ch.open_event.set()
log.warning(f"[{self.device_id}] chan {channel_id} error: "
f"{ch.error_msg}")
elif frame_type == FRAME_PING:
await self.send_frame(0, FRAME_PONG, data)
elif frame_type == FRAME_PONG:
pass # Keepalive acknowledged
# Connection lost — close all channels
log.info(f"[{self.device_id}] RX loop ended, closing all channels")
for cid, ch in list(self.channels.items()):
ch.closed = True
ch.open_event.set()
try:
ch.socks_writer.close()
except Exception:
pass
self.channels.clear()
self.connected = False
# ============================================================
# TunnelServer SOCKS5 + ESP32 tunnel manager
# ============================================================
class TunnelServer:
def __init__(self, keystore: KeyStore,
socks_host: str = "127.0.0.1",
socks_port: int = 1080,
tunnel_host: str = "0.0.0.0",
tunnel_port: int = 2627):
self.keystore = keystore
self.socks_host = socks_host
self.socks_port = socks_port
self.tunnel_host = tunnel_host
self.tunnel_port = tunnel_port
self._device_tunnels: dict[str, DeviceTunnel] = {}
self._active_device: Optional[str] = None
self._socks_server: Optional[asyncio.Server] = None
self._tunnel_server: Optional[asyncio.Server] = None
self._crypto_cache: dict[str, CryptoContext] = {}
def _get_crypto(self, device_id: str) -> Optional[CryptoContext]:
if device_id in self._crypto_cache:
return self._crypto_cache[device_id]
master_key = self.keystore.get(device_id)
if master_key is None:
return None
ctx = CryptoContext(master_key, device_id)
self._crypto_cache[device_id] = ctx
return ctx
# ==========================================================
# Start / Stop
# ==========================================================
async def start(self):
"""Start SOCKS5 and tunnel listener servers."""
self._tunnel_server = await asyncio.start_server(
self._handle_tunnel_connection,
self.tunnel_host, self.tunnel_port,
)
log.info(f"Tunnel listener on {self.tunnel_host}:{self.tunnel_port}")
self._socks_server = await asyncio.start_server(
self._handle_socks_client,
self.socks_host, self.socks_port,
)
log.info(f"SOCKS5 server on {self.socks_host}:{self.socks_port}")
async def stop(self):
"""Shut down all servers and tunnels."""
if self._socks_server:
self._socks_server.close()
await self._socks_server.wait_closed()
if self._tunnel_server:
self._tunnel_server.close()
await self._tunnel_server.wait_closed()
for tunnel in self._device_tunnels.values():
tunnel.connected = False
try:
tunnel.writer.close()
except Exception:
pass
self._device_tunnels.clear()
# ==========================================================
# ESP32 tunnel connection
# ==========================================================
async def _handle_tunnel_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
log.info(f"Tunnel connection from {addr}")
device_id = await self._authenticate_device(reader, writer)
if not device_id:
log.warning(f"Tunnel auth failed from {addr}")
writer.close()
return
# Check encryption flag from handshake (stored during auth)
encrypted = getattr(self, "_last_auth_encrypted", False)
crypto = self._get_crypto(device_id)
tunnel = DeviceTunnel(device_id, reader, writer, crypto, encrypted)
# Replace existing tunnel for this device
old = self._device_tunnels.pop(device_id, None)
if old:
old.connected = False
try:
old.writer.close()
except Exception:
pass
self._device_tunnels[device_id] = tunnel
if self._active_device is None:
self._active_device = device_id
log.info(f"Device '{device_id}' tunnel authenticated "
f"(encrypted={'yes' if encrypted else 'no'})")
# Run frame dispatch loop
await tunnel.run_rx_loop(self)
# Cleanup on disconnect
self._device_tunnels.pop(device_id, None)
if self._active_device == device_id:
# Pick another active device if available
if self._device_tunnels:
self._active_device = next(iter(self._device_tunnels))
else:
self._active_device = None
log.info(f"Device '{device_id}' tunnel disconnected")
async def _authenticate_device(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> Optional[str]:
"""
Verify ESP32 handshake:
magic[4] + flags[1] + device_id_len[1] + device_id[N] + encrypted_token[45]
"""
try:
# Read magic
magic = await asyncio.wait_for(reader.readexactly(4), timeout=10)
if magic != TUN_MAGIC:
log.warning(f"Bad tunnel magic: {magic!r}")
writer.write(b"\x01")
await writer.drain()
return None
# Read flags + device_id_len
meta = await reader.readexactly(2)
flags = meta[0]
id_len = meta[1]
if id_len == 0 or id_len > 64:
writer.write(b"\x01")
await writer.drain()
return None
device_id = (await reader.readexactly(id_len)).decode(errors="ignore")
# Get crypto for this device
crypto = self._get_crypto(device_id)
if crypto is None:
log.warning(f"Unknown device in tunnel auth: '{device_id}'")
writer.write(b"\x01")
await writer.drain()
return None
# Read encrypted auth token
# The token "espilon-tunnel-v1" (17 bytes) encrypted =
# nonce(12) + ciphertext(17) + tag(16) = 45 bytes
enc_token = await reader.readexactly(45)
# Decrypt and verify
try:
plaintext = crypto.decrypt(enc_token)
except Exception as e:
log.warning(f"Tunnel auth decrypt failed for '{device_id}': {e}")
writer.write(b"\x01")
await writer.drain()
return None
if plaintext != TUN_AUTH_TOKEN:
log.warning(f"Tunnel auth token mismatch for '{device_id}'")
writer.write(b"\x01")
await writer.drain()
return None
# Store encryption preference
self._last_auth_encrypted = bool(flags & 0x01)
# Auth OK
writer.write(b"\x00")
await writer.drain()
return device_id
except (asyncio.TimeoutError, asyncio.IncompleteReadError, Exception) as e:
log.warning(f"Tunnel auth error: {e}")
try:
writer.write(b"\x01")
await writer.drain()
except Exception:
pass
return None
# ==========================================================
# SOCKS5 client handling
# ==========================================================
async def _handle_socks_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
try:
# 1) Find active tunnel
tunnel = self._get_active_tunnel()
if not tunnel:
log.warning(f"SOCKS5 from {addr}: no active tunnel")
writer.close()
return
# 2) SOCKS5 auth negotiation
if not await self._socks5_handshake(reader, writer):
return
# 3) SOCKS5 CONNECT request
target = await self._socks5_connect(reader, writer)
if not target:
return
host, port, atyp_data = target
# 4) Allocate channel
channel_id = tunnel.allocate_channel_id()
if channel_id is None:
log.warning(f"SOCKS5: no free channels for {host}:{port}")
await self._socks5_reply(writer, 0x01) # General failure
return
channel = TunnelChannel(channel_id, reader, writer)
channel.target = f"{host}:{port}"
tunnel.channels[channel_id] = channel
# 5) Send OPEN frame to ESP32
open_payload = self._build_open_payload(host, port)
await tunnel.send_frame(channel_id, FRAME_OPEN, open_payload)
# 6) Wait for OPEN_OK or ERROR
try:
await asyncio.wait_for(channel.open_event.wait(),
timeout=OPEN_TIMEOUT_S)
except asyncio.TimeoutError:
log.warning(f"SOCKS5: OPEN timeout for {host}:{port}")
tunnel.channels.pop(channel_id, None)
await self._socks5_reply(writer, 0x04) # Host unreachable
return
if not channel.open_success:
log.warning(f"SOCKS5: OPEN failed for {host}:{port}: "
f"{channel.error_msg}")
tunnel.channels.pop(channel_id, None)
await self._socks5_reply(writer, 0x05) # Connection refused
return
# 7) SOCKS5 success reply
await self._socks5_reply(writer, 0x00)
log.info(f"SOCKS5: chan {channel_id} -> {host}:{port}")
# 8) Relay: SOCKS client -> DATA frames -> ESP32
await self._relay_socks_to_tunnel(channel, tunnel)
except Exception as e:
log.error(f"SOCKS5 error from {addr}: {e}")
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
def _get_active_tunnel(self) -> Optional[DeviceTunnel]:
"""Get the currently active device tunnel."""
if self._active_device:
tunnel = self._device_tunnels.get(self._active_device)
if tunnel and tunnel.connected:
return tunnel
# Try any connected tunnel
for did, tunnel in self._device_tunnels.items():
if tunnel.connected:
self._active_device = did
return tunnel
return None
async def _socks5_handshake(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> bool:
"""SOCKS5 auth negotiation — accept NO_AUTH."""
try:
data = await asyncio.wait_for(reader.read(257), timeout=5)
if len(data) < 3 or data[0] != SOCKS5_VER:
writer.close()
return False
# Accept NO_AUTH (0x00)
n_methods = data[1]
methods = data[2:2 + n_methods]
if 0x00 in methods:
writer.write(bytes([SOCKS5_VER, 0x00]))
await writer.drain()
return True
else:
writer.write(bytes([SOCKS5_VER, 0xFF]))
await writer.drain()
writer.close()
return False
except Exception:
return False
async def _socks5_connect(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
) -> Optional[tuple[str, int, bytes]]:
"""Parse SOCKS5 CONNECT request. Returns (host, port, raw_atyp_data)."""
try:
data = await asyncio.wait_for(reader.read(262), timeout=5)
if len(data) < 7 or data[0] != SOCKS5_VER:
await self._socks5_reply(writer, 0x01)
return None
cmd = data[1]
if cmd != SOCKS5_CMD_CONNECT:
await self._socks5_reply(writer, 0x07) # Command not supported
return None
atyp = data[3]
if atyp == SOCKS5_ATYP_IPV4:
if len(data) < 10:
await self._socks5_reply(writer, 0x01)
return None
host = socket.inet_ntoa(data[4:8])
port = struct.unpack(">H", data[8:10])[0]
return (host, port, data[3:10])
elif atyp == SOCKS5_ATYP_DOMAIN:
domain_len = data[4]
if len(data) < 5 + domain_len + 2:
await self._socks5_reply(writer, 0x01)
return None
host = data[5:5 + domain_len].decode(errors="ignore")
port = struct.unpack(">H",
data[5 + domain_len:7 + domain_len])[0]
return (host, port, data[3:7 + domain_len])
elif atyp == SOCKS5_ATYP_IPV6:
await self._socks5_reply(writer, 0x08) # Atyp not supported
return None
else:
await self._socks5_reply(writer, 0x08)
return None
except Exception:
return None
async def _socks5_reply(self, writer: asyncio.StreamWriter, status: int):
"""Send SOCKS5 reply."""
reply = struct.pack("!BBBB4sH",
SOCKS5_VER, status, 0x00,
SOCKS5_ATYP_IPV4,
b"\x00\x00\x00\x00", 0)
try:
writer.write(reply)
await writer.drain()
except Exception:
pass
def _build_open_payload(self, host: str, port: int) -> bytes:
"""Build OPEN frame payload: [IPv4:4][port:2][domain_len:1][domain:0-255]"""
# Try to parse as IPv4
try:
ipv4_bytes = socket.inet_aton(host)
domain = b""
except OSError:
# It's a domain name — send 0.0.0.0 as fallback IP,
# ESP32 will resolve via getaddrinfo
ipv4_bytes = b"\x00\x00\x00\x00"
domain = host.encode()[:255]
return (ipv4_bytes
+ struct.pack(">H", port)
+ bytes([len(domain)])
+ domain)
async def _relay_socks_to_tunnel(self, channel: TunnelChannel,
tunnel: DeviceTunnel):
"""Read from SOCKS client, send DATA frames to ESP32."""
try:
while not channel.closed and tunnel.connected:
data = await channel.socks_reader.read(SOCKS_READ_SIZE)
if not data:
break
channel.bytes_tx += len(data)
# Split into frame-sized chunks
offset = 0
while offset < len(data):
chunk = data[offset:offset + FRAME_MAX_DATA]
await tunnel.send_frame(channel.channel_id,
FRAME_DATA, chunk)
offset += len(chunk)
except (ConnectionError, OSError):
pass
finally:
if channel.channel_id in tunnel.channels:
tunnel.channels.pop(channel.channel_id, None)
await tunnel.send_frame(channel.channel_id, FRAME_CLOSE,
bytes([CLOSE_NORMAL]))
log.info(f"SOCKS5: chan {channel.channel_id} done "
f"({channel.target} tx={channel.bytes_tx} "
f"rx={channel.bytes_rx})")
# ==========================================================
# Status API (for web/TUI)
# ==========================================================
def get_status(self) -> dict:
"""Return tunnel status for API/UI."""
tunnels = []
for did, t in self._device_tunnels.items():
channels = []
total_tx = 0
total_rx = 0
for cid, ch in t.channels.items():
channels.append({
"id": cid,
"target": ch.target,
"state": "closed" if ch.closed else "open",
"bytes_tx": ch.bytes_tx,
"bytes_rx": ch.bytes_rx,
})
total_tx += ch.bytes_tx
total_rx += ch.bytes_rx
tunnels.append({
"device_id": did,
"connected": t.connected,
"encrypted": t.encrypted,
"active_channels": len(t.channels),
"max_channels": t.max_channels,
"channels": channels,
"bytes_tx": total_tx,
"bytes_rx": total_rx,
})
socks_running = self._socks_server is not None and self._socks_server.is_serving()
return {
"socks_running": socks_running,
"socks_addr": f"{self.socks_host}:{self.socks_port}",
"active_device": self._active_device,
"tunnels": tunnels,
}
def set_active_device(self, device_id: str) -> bool:
"""Switch SOCKS traffic to a different device."""
if device_id in self._device_tunnels:
self._active_device = device_id
return True
return False