""" TunnelServer – SOCKS5 proxy via ESP32 channel multiplexing. Architecture: - SOCKS5 server (asyncio, port 1080) accepts operator tools (proxychains, curl, nmap) - Tunnel listener (asyncio, port 2627) accepts ESP32 bot connections - Binary framing protocol maps SOCKS clients to numbered channels - ESP32 performs actual TCP connections on the target network The ESP32 never sees SOCKS5 packets — only OPEN/DATA/CLOSE/ERROR frames. """ import asyncio import struct import logging import time import socket from typing import Optional from core.crypto import CryptoContext from core.keystore import KeyStore log = logging.getLogger("tunnel") # ============================================================ # Protocol constants (must match tun_core.h) # ============================================================ FRAME_HDR_SIZE = 5 FRAME_MAX_DATA = 4096 FRAME_OPEN = 0x01 FRAME_OPEN_OK = 0x02 FRAME_DATA = 0x03 FRAME_CLOSE = 0x04 FRAME_ERROR = 0x05 FRAME_PING = 0x06 FRAME_PONG = 0x07 # Authentication TUN_MAGIC = b"TUN\x01" TUN_AUTH_TOKEN = b"espilon-tunnel-v1" # SOCKS5 SOCKS5_VER = 0x05 SOCKS5_CMD_CONNECT = 0x01 SOCKS5_ATYP_IPV4 = 0x01 SOCKS5_ATYP_DOMAIN = 0x03 SOCKS5_ATYP_IPV6 = 0x04 # Close reasons CLOSE_NORMAL = 0 CLOSE_RESET = 1 CLOSE_TIMEOUT = 2 # Timeouts OPEN_TIMEOUT_S = 10 SOCKS_READ_SIZE = 4096 TUNNEL_READ_SIZE = 8192 # ============================================================ # TunnelChannel – one SOCKS5 client <-> one ESP32 channel # ============================================================ class TunnelChannel: __slots__ = ( "channel_id", "socks_reader", "socks_writer", "open_event", "open_success", "error_msg", "closed", "bytes_tx", "bytes_rx", "target", ) def __init__(self, channel_id: int, socks_reader: asyncio.StreamReader, socks_writer: asyncio.StreamWriter): self.channel_id = channel_id self.socks_reader = socks_reader self.socks_writer = socks_writer self.open_event = asyncio.Event() self.open_success = False self.error_msg = "" self.closed = False self.bytes_tx = 0 self.bytes_rx = 0 self.target = "" # ============================================================ # DeviceTunnel – binary-framed TCP connection to one ESP32 # ============================================================ class DeviceTunnel: def __init__(self, device_id: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, crypto: Optional[CryptoContext], encrypted: bool = False, max_channels: int = 8): self.device_id = device_id self.reader = reader self.writer = writer self.crypto = crypto self.encrypted = encrypted self.max_channels = max_channels self.channels: dict[int, TunnelChannel] = {} self._next_channel_id = 1 self._lock = asyncio.Lock() self.connected = True def allocate_channel_id(self) -> Optional[int]: """Return next free channel id (1-based, cycles within max_channels).""" for _ in range(self.max_channels): cid = self._next_channel_id self._next_channel_id = (self._next_channel_id % self.max_channels) + 1 if cid not in self.channels: return cid return None async def send_frame(self, channel_id: int, frame_type: int, data: bytes = b""): """Encode and send a frame to the ESP32.""" hdr = struct.pack("!BHB H", channel_id >> 8, channel_id & 0xFF, frame_type, len(data)) # Simpler: pack as big-endian hdr = struct.pack(">HBH", channel_id, frame_type, len(data)) frame = hdr + data if self.encrypted and self.crypto: encrypted = self.crypto.encrypt(frame) length_prefix = struct.pack(">H", len(encrypted)) self.writer.write(length_prefix + encrypted) else: self.writer.write(frame) try: await self.writer.drain() except (ConnectionError, OSError): self.connected = False async def read_frame(self) -> Optional[tuple[int, int, bytes]]: """Read one frame from ESP32. Returns (channel_id, type, data) or None on disconnect.""" try: if self.encrypted and self.crypto: # Read 2-byte length prefix len_bytes = await self.reader.readexactly(2) enc_len = struct.unpack(">H", len_bytes)[0] enc_data = await self.reader.readexactly(enc_len) frame = self.crypto.decrypt(enc_data) else: # Read 5-byte header hdr = await self.reader.readexactly(FRAME_HDR_SIZE) frame = hdr channel_id, frame_type, data_len = struct.unpack(">HBH", hdr) if data_len > FRAME_MAX_DATA: log.warning(f"[{self.device_id}] frame too large: {data_len}") return None if data_len > 0: payload = await self.reader.readexactly(data_len) return (channel_id, frame_type, payload) return (channel_id, frame_type, b"") except (asyncio.IncompleteReadError, ConnectionError, OSError): self.connected = False return None # Encrypted path: parse decrypted frame if len(frame) < FRAME_HDR_SIZE: return None channel_id, frame_type, data_len = struct.unpack(">HBH", frame[:5]) data = frame[5:5 + data_len] return (channel_id, frame_type, data) async def run_rx_loop(self, server: 'TunnelServer'): """Read frames from ESP32, dispatch to channels.""" log.info(f"[{self.device_id}] RX loop started") while self.connected: result = await self.read_frame() if result is None: break channel_id, frame_type, data = result if frame_type == FRAME_OPEN_OK: ch = self.channels.get(channel_id) if ch: ch.open_success = True ch.open_event.set() elif frame_type == FRAME_DATA: ch = self.channels.get(channel_id) if ch and not ch.closed: ch.bytes_rx += len(data) try: ch.socks_writer.write(data) await ch.socks_writer.drain() except (ConnectionError, OSError): ch.closed = True await self.send_frame(channel_id, FRAME_CLOSE, bytes([CLOSE_RESET])) elif frame_type == FRAME_CLOSE: ch = self.channels.pop(channel_id, None) if ch: ch.closed = True ch.open_event.set() # Unblock if waiting try: ch.socks_writer.close() await ch.socks_writer.wait_closed() except Exception: pass log.info(f"[{self.device_id}] chan {channel_id} closed " f"(tx={ch.bytes_tx} rx={ch.bytes_rx})") elif frame_type == FRAME_ERROR: ch = self.channels.get(channel_id) if ch: ch.error_msg = data.decode(errors="ignore") ch.open_success = False ch.open_event.set() log.warning(f"[{self.device_id}] chan {channel_id} error: " f"{ch.error_msg}") elif frame_type == FRAME_PING: await self.send_frame(0, FRAME_PONG, data) elif frame_type == FRAME_PONG: pass # Keepalive acknowledged # Connection lost — close all channels log.info(f"[{self.device_id}] RX loop ended, closing all channels") for cid, ch in list(self.channels.items()): ch.closed = True ch.open_event.set() try: ch.socks_writer.close() except Exception: pass self.channels.clear() self.connected = False # ============================================================ # TunnelServer – SOCKS5 + ESP32 tunnel manager # ============================================================ class TunnelServer: def __init__(self, keystore: KeyStore, socks_host: str = "127.0.0.1", socks_port: int = 1080, tunnel_host: str = "0.0.0.0", tunnel_port: int = 2627): self.keystore = keystore self.socks_host = socks_host self.socks_port = socks_port self.tunnel_host = tunnel_host self.tunnel_port = tunnel_port self._device_tunnels: dict[str, DeviceTunnel] = {} self._active_device: Optional[str] = None self._socks_server: Optional[asyncio.Server] = None self._tunnel_server: Optional[asyncio.Server] = None self._crypto_cache: dict[str, CryptoContext] = {} def _get_crypto(self, device_id: str) -> Optional[CryptoContext]: if device_id in self._crypto_cache: return self._crypto_cache[device_id] master_key = self.keystore.get(device_id) if master_key is None: return None ctx = CryptoContext(master_key, device_id) self._crypto_cache[device_id] = ctx return ctx # ========================================================== # Start / Stop # ========================================================== async def start(self): """Start SOCKS5 and tunnel listener servers.""" self._tunnel_server = await asyncio.start_server( self._handle_tunnel_connection, self.tunnel_host, self.tunnel_port, ) log.info(f"Tunnel listener on {self.tunnel_host}:{self.tunnel_port}") self._socks_server = await asyncio.start_server( self._handle_socks_client, self.socks_host, self.socks_port, ) log.info(f"SOCKS5 server on {self.socks_host}:{self.socks_port}") async def stop(self): """Shut down all servers and tunnels.""" if self._socks_server: self._socks_server.close() await self._socks_server.wait_closed() if self._tunnel_server: self._tunnel_server.close() await self._tunnel_server.wait_closed() for tunnel in self._device_tunnels.values(): tunnel.connected = False try: tunnel.writer.close() except Exception: pass self._device_tunnels.clear() # ========================================================== # ESP32 tunnel connection # ========================================================== async def _handle_tunnel_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info("peername") log.info(f"Tunnel connection from {addr}") device_id = await self._authenticate_device(reader, writer) if not device_id: log.warning(f"Tunnel auth failed from {addr}") writer.close() return # Check encryption flag from handshake (stored during auth) encrypted = getattr(self, "_last_auth_encrypted", False) crypto = self._get_crypto(device_id) tunnel = DeviceTunnel(device_id, reader, writer, crypto, encrypted) # Replace existing tunnel for this device old = self._device_tunnels.pop(device_id, None) if old: old.connected = False try: old.writer.close() except Exception: pass self._device_tunnels[device_id] = tunnel if self._active_device is None: self._active_device = device_id log.info(f"Device '{device_id}' tunnel authenticated " f"(encrypted={'yes' if encrypted else 'no'})") # Run frame dispatch loop await tunnel.run_rx_loop(self) # Cleanup on disconnect self._device_tunnels.pop(device_id, None) if self._active_device == device_id: # Pick another active device if available if self._device_tunnels: self._active_device = next(iter(self._device_tunnels)) else: self._active_device = None log.info(f"Device '{device_id}' tunnel disconnected") async def _authenticate_device(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> Optional[str]: """ Verify ESP32 handshake: magic[4] + flags[1] + device_id_len[1] + device_id[N] + encrypted_token[45] """ try: # Read magic magic = await asyncio.wait_for(reader.readexactly(4), timeout=10) if magic != TUN_MAGIC: log.warning(f"Bad tunnel magic: {magic!r}") writer.write(b"\x01") await writer.drain() return None # Read flags + device_id_len meta = await reader.readexactly(2) flags = meta[0] id_len = meta[1] if id_len == 0 or id_len > 64: writer.write(b"\x01") await writer.drain() return None device_id = (await reader.readexactly(id_len)).decode(errors="ignore") # Get crypto for this device crypto = self._get_crypto(device_id) if crypto is None: log.warning(f"Unknown device in tunnel auth: '{device_id}'") writer.write(b"\x01") await writer.drain() return None # Read encrypted auth token # The token "espilon-tunnel-v1" (17 bytes) encrypted = # nonce(12) + ciphertext(17) + tag(16) = 45 bytes enc_token = await reader.readexactly(45) # Decrypt and verify try: plaintext = crypto.decrypt(enc_token) except Exception as e: log.warning(f"Tunnel auth decrypt failed for '{device_id}': {e}") writer.write(b"\x01") await writer.drain() return None if plaintext != TUN_AUTH_TOKEN: log.warning(f"Tunnel auth token mismatch for '{device_id}'") writer.write(b"\x01") await writer.drain() return None # Store encryption preference self._last_auth_encrypted = bool(flags & 0x01) # Auth OK writer.write(b"\x00") await writer.drain() return device_id except (asyncio.TimeoutError, asyncio.IncompleteReadError, Exception) as e: log.warning(f"Tunnel auth error: {e}") try: writer.write(b"\x01") await writer.drain() except Exception: pass return None # ========================================================== # SOCKS5 client handling # ========================================================== async def _handle_socks_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info("peername") try: # 1) Find active tunnel tunnel = self._get_active_tunnel() if not tunnel: log.warning(f"SOCKS5 from {addr}: no active tunnel") writer.close() return # 2) SOCKS5 auth negotiation if not await self._socks5_handshake(reader, writer): return # 3) SOCKS5 CONNECT request target = await self._socks5_connect(reader, writer) if not target: return host, port, atyp_data = target # 4) Allocate channel channel_id = tunnel.allocate_channel_id() if channel_id is None: log.warning(f"SOCKS5: no free channels for {host}:{port}") await self._socks5_reply(writer, 0x01) # General failure return channel = TunnelChannel(channel_id, reader, writer) channel.target = f"{host}:{port}" tunnel.channels[channel_id] = channel # 5) Send OPEN frame to ESP32 open_payload = self._build_open_payload(host, port) await tunnel.send_frame(channel_id, FRAME_OPEN, open_payload) # 6) Wait for OPEN_OK or ERROR try: await asyncio.wait_for(channel.open_event.wait(), timeout=OPEN_TIMEOUT_S) except asyncio.TimeoutError: log.warning(f"SOCKS5: OPEN timeout for {host}:{port}") tunnel.channels.pop(channel_id, None) await self._socks5_reply(writer, 0x04) # Host unreachable return if not channel.open_success: log.warning(f"SOCKS5: OPEN failed for {host}:{port}: " f"{channel.error_msg}") tunnel.channels.pop(channel_id, None) await self._socks5_reply(writer, 0x05) # Connection refused return # 7) SOCKS5 success reply await self._socks5_reply(writer, 0x00) log.info(f"SOCKS5: chan {channel_id} -> {host}:{port}") # 8) Relay: SOCKS client -> DATA frames -> ESP32 await self._relay_socks_to_tunnel(channel, tunnel) except Exception as e: log.error(f"SOCKS5 error from {addr}: {e}") finally: try: writer.close() await writer.wait_closed() except Exception: pass def _get_active_tunnel(self) -> Optional[DeviceTunnel]: """Get the currently active device tunnel.""" if self._active_device: tunnel = self._device_tunnels.get(self._active_device) if tunnel and tunnel.connected: return tunnel # Try any connected tunnel for did, tunnel in self._device_tunnels.items(): if tunnel.connected: self._active_device = did return tunnel return None async def _socks5_handshake(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> bool: """SOCKS5 auth negotiation — accept NO_AUTH.""" try: data = await asyncio.wait_for(reader.read(257), timeout=5) if len(data) < 3 or data[0] != SOCKS5_VER: writer.close() return False # Accept NO_AUTH (0x00) n_methods = data[1] methods = data[2:2 + n_methods] if 0x00 in methods: writer.write(bytes([SOCKS5_VER, 0x00])) await writer.drain() return True else: writer.write(bytes([SOCKS5_VER, 0xFF])) await writer.drain() writer.close() return False except Exception: return False async def _socks5_connect(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> Optional[tuple[str, int, bytes]]: """Parse SOCKS5 CONNECT request. Returns (host, port, raw_atyp_data).""" try: data = await asyncio.wait_for(reader.read(262), timeout=5) if len(data) < 7 or data[0] != SOCKS5_VER: await self._socks5_reply(writer, 0x01) return None cmd = data[1] if cmd != SOCKS5_CMD_CONNECT: await self._socks5_reply(writer, 0x07) # Command not supported return None atyp = data[3] if atyp == SOCKS5_ATYP_IPV4: if len(data) < 10: await self._socks5_reply(writer, 0x01) return None host = socket.inet_ntoa(data[4:8]) port = struct.unpack(">H", data[8:10])[0] return (host, port, data[3:10]) elif atyp == SOCKS5_ATYP_DOMAIN: domain_len = data[4] if len(data) < 5 + domain_len + 2: await self._socks5_reply(writer, 0x01) return None host = data[5:5 + domain_len].decode(errors="ignore") port = struct.unpack(">H", data[5 + domain_len:7 + domain_len])[0] return (host, port, data[3:7 + domain_len]) elif atyp == SOCKS5_ATYP_IPV6: await self._socks5_reply(writer, 0x08) # Atyp not supported return None else: await self._socks5_reply(writer, 0x08) return None except Exception: return None async def _socks5_reply(self, writer: asyncio.StreamWriter, status: int): """Send SOCKS5 reply.""" reply = struct.pack("!BBBB4sH", SOCKS5_VER, status, 0x00, SOCKS5_ATYP_IPV4, b"\x00\x00\x00\x00", 0) try: writer.write(reply) await writer.drain() except Exception: pass def _build_open_payload(self, host: str, port: int) -> bytes: """Build OPEN frame payload: [IPv4:4][port:2][domain_len:1][domain:0-255]""" # Try to parse as IPv4 try: ipv4_bytes = socket.inet_aton(host) domain = b"" except OSError: # It's a domain name — send 0.0.0.0 as fallback IP, # ESP32 will resolve via getaddrinfo ipv4_bytes = b"\x00\x00\x00\x00" domain = host.encode()[:255] return (ipv4_bytes + struct.pack(">H", port) + bytes([len(domain)]) + domain) async def _relay_socks_to_tunnel(self, channel: TunnelChannel, tunnel: DeviceTunnel): """Read from SOCKS client, send DATA frames to ESP32.""" try: while not channel.closed and tunnel.connected: data = await channel.socks_reader.read(SOCKS_READ_SIZE) if not data: break channel.bytes_tx += len(data) # Split into frame-sized chunks offset = 0 while offset < len(data): chunk = data[offset:offset + FRAME_MAX_DATA] await tunnel.send_frame(channel.channel_id, FRAME_DATA, chunk) offset += len(chunk) except (ConnectionError, OSError): pass finally: if channel.channel_id in tunnel.channels: tunnel.channels.pop(channel.channel_id, None) await tunnel.send_frame(channel.channel_id, FRAME_CLOSE, bytes([CLOSE_NORMAL])) log.info(f"SOCKS5: chan {channel.channel_id} done " f"({channel.target} tx={channel.bytes_tx} " f"rx={channel.bytes_rx})") # ========================================================== # Status API (for web/TUI) # ========================================================== def get_status(self) -> dict: """Return tunnel status for API/UI.""" tunnels = [] for did, t in self._device_tunnels.items(): channels = [] total_tx = 0 total_rx = 0 for cid, ch in t.channels.items(): channels.append({ "id": cid, "target": ch.target, "state": "closed" if ch.closed else "open", "bytes_tx": ch.bytes_tx, "bytes_rx": ch.bytes_rx, }) total_tx += ch.bytes_tx total_rx += ch.bytes_rx tunnels.append({ "device_id": did, "connected": t.connected, "encrypted": t.encrypted, "active_channels": len(t.channels), "max_channels": t.max_channels, "channels": channels, "bytes_tx": total_tx, "bytes_rx": total_rx, }) socks_running = self._socks_server is not None and self._socks_server.is_serving() return { "socks_running": socks_running, "socks_addr": f"{self.socks_host}:{self.socks_port}", "active_device": self._active_device, "tunnels": tunnels, } def set_active_device(self, device_id: str) -> bool: """Switch SOCKS traffic to a different device.""" if device_id in self._device_tunnels: self._active_device = device_id return True return False