Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
685 lines
25 KiB
Python
685 lines
25 KiB
Python
"""
|
||
TunnelServer – SOCKS5 proxy via ESP32 channel multiplexing.
|
||
|
||
Architecture:
|
||
- SOCKS5 server (asyncio, port 1080) accepts operator tools (proxychains, curl, nmap)
|
||
- Tunnel listener (asyncio, port 2627) accepts ESP32 bot connections
|
||
- Binary framing protocol maps SOCKS clients to numbered channels
|
||
- ESP32 performs actual TCP connections on the target network
|
||
|
||
The ESP32 never sees SOCKS5 packets — only OPEN/DATA/CLOSE/ERROR frames.
|
||
"""
|
||
|
||
import asyncio
|
||
import struct
|
||
import logging
|
||
import time
|
||
import socket
|
||
from typing import Optional
|
||
|
||
from core.crypto import CryptoContext
|
||
from core.keystore import KeyStore
|
||
|
||
log = logging.getLogger("tunnel")
|
||
|
||
# ============================================================
|
||
# Protocol constants (must match tun_core.h)
|
||
# ============================================================
|
||
FRAME_HDR_SIZE = 5
|
||
FRAME_MAX_DATA = 4096
|
||
|
||
FRAME_OPEN = 0x01
|
||
FRAME_OPEN_OK = 0x02
|
||
FRAME_DATA = 0x03
|
||
FRAME_CLOSE = 0x04
|
||
FRAME_ERROR = 0x05
|
||
FRAME_PING = 0x06
|
||
FRAME_PONG = 0x07
|
||
|
||
# Authentication
|
||
TUN_MAGIC = b"TUN\x01"
|
||
TUN_AUTH_TOKEN = b"espilon-tunnel-v1"
|
||
|
||
# SOCKS5
|
||
SOCKS5_VER = 0x05
|
||
SOCKS5_CMD_CONNECT = 0x01
|
||
SOCKS5_ATYP_IPV4 = 0x01
|
||
SOCKS5_ATYP_DOMAIN = 0x03
|
||
SOCKS5_ATYP_IPV6 = 0x04
|
||
|
||
# Close reasons
|
||
CLOSE_NORMAL = 0
|
||
CLOSE_RESET = 1
|
||
CLOSE_TIMEOUT = 2
|
||
|
||
# Timeouts
|
||
OPEN_TIMEOUT_S = 10
|
||
SOCKS_READ_SIZE = 4096
|
||
TUNNEL_READ_SIZE = 8192
|
||
|
||
|
||
# ============================================================
|
||
# TunnelChannel – one SOCKS5 client <-> one ESP32 channel
|
||
# ============================================================
|
||
class TunnelChannel:
|
||
__slots__ = (
|
||
"channel_id", "socks_reader", "socks_writer",
|
||
"open_event", "open_success", "error_msg",
|
||
"closed", "bytes_tx", "bytes_rx", "target",
|
||
)
|
||
|
||
def __init__(self, channel_id: int,
|
||
socks_reader: asyncio.StreamReader,
|
||
socks_writer: asyncio.StreamWriter):
|
||
self.channel_id = channel_id
|
||
self.socks_reader = socks_reader
|
||
self.socks_writer = socks_writer
|
||
self.open_event = asyncio.Event()
|
||
self.open_success = False
|
||
self.error_msg = ""
|
||
self.closed = False
|
||
self.bytes_tx = 0
|
||
self.bytes_rx = 0
|
||
self.target = ""
|
||
|
||
|
||
# ============================================================
|
||
# DeviceTunnel – binary-framed TCP connection to one ESP32
|
||
# ============================================================
|
||
class DeviceTunnel:
|
||
def __init__(self, device_id: str,
|
||
reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter,
|
||
crypto: Optional[CryptoContext],
|
||
encrypted: bool = False,
|
||
max_channels: int = 8):
|
||
self.device_id = device_id
|
||
self.reader = reader
|
||
self.writer = writer
|
||
self.crypto = crypto
|
||
self.encrypted = encrypted
|
||
self.max_channels = max_channels
|
||
self.channels: dict[int, TunnelChannel] = {}
|
||
self._next_channel_id = 1
|
||
self._lock = asyncio.Lock()
|
||
self.connected = True
|
||
|
||
def allocate_channel_id(self) -> Optional[int]:
|
||
"""Return next free channel id (1-based, cycles within max_channels)."""
|
||
for _ in range(self.max_channels):
|
||
cid = self._next_channel_id
|
||
self._next_channel_id = (self._next_channel_id % self.max_channels) + 1
|
||
if cid not in self.channels:
|
||
return cid
|
||
return None
|
||
|
||
async def send_frame(self, channel_id: int, frame_type: int,
|
||
data: bytes = b""):
|
||
"""Encode and send a frame to the ESP32."""
|
||
hdr = struct.pack("!BHB H", channel_id >> 8, channel_id & 0xFF,
|
||
frame_type, len(data))
|
||
# Simpler: pack as big-endian
|
||
hdr = struct.pack(">HBH", channel_id, frame_type, len(data))
|
||
frame = hdr + data
|
||
|
||
if self.encrypted and self.crypto:
|
||
encrypted = self.crypto.encrypt(frame)
|
||
length_prefix = struct.pack(">H", len(encrypted))
|
||
self.writer.write(length_prefix + encrypted)
|
||
else:
|
||
self.writer.write(frame)
|
||
|
||
try:
|
||
await self.writer.drain()
|
||
except (ConnectionError, OSError):
|
||
self.connected = False
|
||
|
||
async def read_frame(self) -> Optional[tuple[int, int, bytes]]:
|
||
"""Read one frame from ESP32. Returns (channel_id, type, data) or None on disconnect."""
|
||
try:
|
||
if self.encrypted and self.crypto:
|
||
# Read 2-byte length prefix
|
||
len_bytes = await self.reader.readexactly(2)
|
||
enc_len = struct.unpack(">H", len_bytes)[0]
|
||
enc_data = await self.reader.readexactly(enc_len)
|
||
frame = self.crypto.decrypt(enc_data)
|
||
else:
|
||
# Read 5-byte header
|
||
hdr = await self.reader.readexactly(FRAME_HDR_SIZE)
|
||
frame = hdr
|
||
|
||
channel_id, frame_type, data_len = struct.unpack(">HBH", hdr)
|
||
if data_len > FRAME_MAX_DATA:
|
||
log.warning(f"[{self.device_id}] frame too large: {data_len}")
|
||
return None
|
||
|
||
if data_len > 0:
|
||
payload = await self.reader.readexactly(data_len)
|
||
return (channel_id, frame_type, payload)
|
||
return (channel_id, frame_type, b"")
|
||
|
||
except (asyncio.IncompleteReadError, ConnectionError, OSError):
|
||
self.connected = False
|
||
return None
|
||
|
||
# Encrypted path: parse decrypted frame
|
||
if len(frame) < FRAME_HDR_SIZE:
|
||
return None
|
||
channel_id, frame_type, data_len = struct.unpack(">HBH", frame[:5])
|
||
data = frame[5:5 + data_len]
|
||
return (channel_id, frame_type, data)
|
||
|
||
async def run_rx_loop(self, server: 'TunnelServer'):
|
||
"""Read frames from ESP32, dispatch to channels."""
|
||
log.info(f"[{self.device_id}] RX loop started")
|
||
while self.connected:
|
||
result = await self.read_frame()
|
||
if result is None:
|
||
break
|
||
|
||
channel_id, frame_type, data = result
|
||
|
||
if frame_type == FRAME_OPEN_OK:
|
||
ch = self.channels.get(channel_id)
|
||
if ch:
|
||
ch.open_success = True
|
||
ch.open_event.set()
|
||
|
||
elif frame_type == FRAME_DATA:
|
||
ch = self.channels.get(channel_id)
|
||
if ch and not ch.closed:
|
||
ch.bytes_rx += len(data)
|
||
try:
|
||
ch.socks_writer.write(data)
|
||
await ch.socks_writer.drain()
|
||
except (ConnectionError, OSError):
|
||
ch.closed = True
|
||
await self.send_frame(channel_id, FRAME_CLOSE,
|
||
bytes([CLOSE_RESET]))
|
||
|
||
elif frame_type == FRAME_CLOSE:
|
||
ch = self.channels.pop(channel_id, None)
|
||
if ch:
|
||
ch.closed = True
|
||
ch.open_event.set() # Unblock if waiting
|
||
try:
|
||
ch.socks_writer.close()
|
||
await ch.socks_writer.wait_closed()
|
||
except Exception:
|
||
pass
|
||
log.info(f"[{self.device_id}] chan {channel_id} closed "
|
||
f"(tx={ch.bytes_tx} rx={ch.bytes_rx})")
|
||
|
||
elif frame_type == FRAME_ERROR:
|
||
ch = self.channels.get(channel_id)
|
||
if ch:
|
||
ch.error_msg = data.decode(errors="ignore")
|
||
ch.open_success = False
|
||
ch.open_event.set()
|
||
log.warning(f"[{self.device_id}] chan {channel_id} error: "
|
||
f"{ch.error_msg}")
|
||
|
||
elif frame_type == FRAME_PING:
|
||
await self.send_frame(0, FRAME_PONG, data)
|
||
|
||
elif frame_type == FRAME_PONG:
|
||
pass # Keepalive acknowledged
|
||
|
||
# Connection lost — close all channels
|
||
log.info(f"[{self.device_id}] RX loop ended, closing all channels")
|
||
for cid, ch in list(self.channels.items()):
|
||
ch.closed = True
|
||
ch.open_event.set()
|
||
try:
|
||
ch.socks_writer.close()
|
||
except Exception:
|
||
pass
|
||
self.channels.clear()
|
||
self.connected = False
|
||
|
||
|
||
# ============================================================
|
||
# TunnelServer – SOCKS5 + ESP32 tunnel manager
|
||
# ============================================================
|
||
class TunnelServer:
|
||
def __init__(self, keystore: KeyStore,
|
||
socks_host: str = "127.0.0.1",
|
||
socks_port: int = 1080,
|
||
tunnel_host: str = "0.0.0.0",
|
||
tunnel_port: int = 2627):
|
||
self.keystore = keystore
|
||
self.socks_host = socks_host
|
||
self.socks_port = socks_port
|
||
self.tunnel_host = tunnel_host
|
||
self.tunnel_port = tunnel_port
|
||
self._device_tunnels: dict[str, DeviceTunnel] = {}
|
||
self._active_device: Optional[str] = None
|
||
self._socks_server: Optional[asyncio.Server] = None
|
||
self._tunnel_server: Optional[asyncio.Server] = None
|
||
self._crypto_cache: dict[str, CryptoContext] = {}
|
||
|
||
def _get_crypto(self, device_id: str) -> Optional[CryptoContext]:
|
||
if device_id in self._crypto_cache:
|
||
return self._crypto_cache[device_id]
|
||
master_key = self.keystore.get(device_id)
|
||
if master_key is None:
|
||
return None
|
||
ctx = CryptoContext(master_key, device_id)
|
||
self._crypto_cache[device_id] = ctx
|
||
return ctx
|
||
|
||
# ==========================================================
|
||
# Start / Stop
|
||
# ==========================================================
|
||
async def start(self):
|
||
"""Start SOCKS5 and tunnel listener servers."""
|
||
self._tunnel_server = await asyncio.start_server(
|
||
self._handle_tunnel_connection,
|
||
self.tunnel_host, self.tunnel_port,
|
||
)
|
||
log.info(f"Tunnel listener on {self.tunnel_host}:{self.tunnel_port}")
|
||
|
||
self._socks_server = await asyncio.start_server(
|
||
self._handle_socks_client,
|
||
self.socks_host, self.socks_port,
|
||
)
|
||
log.info(f"SOCKS5 server on {self.socks_host}:{self.socks_port}")
|
||
|
||
async def stop(self):
|
||
"""Shut down all servers and tunnels."""
|
||
if self._socks_server:
|
||
self._socks_server.close()
|
||
await self._socks_server.wait_closed()
|
||
if self._tunnel_server:
|
||
self._tunnel_server.close()
|
||
await self._tunnel_server.wait_closed()
|
||
for tunnel in self._device_tunnels.values():
|
||
tunnel.connected = False
|
||
try:
|
||
tunnel.writer.close()
|
||
except Exception:
|
||
pass
|
||
self._device_tunnels.clear()
|
||
|
||
# ==========================================================
|
||
# ESP32 tunnel connection
|
||
# ==========================================================
|
||
async def _handle_tunnel_connection(self, reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter):
|
||
addr = writer.get_extra_info("peername")
|
||
log.info(f"Tunnel connection from {addr}")
|
||
|
||
device_id = await self._authenticate_device(reader, writer)
|
||
if not device_id:
|
||
log.warning(f"Tunnel auth failed from {addr}")
|
||
writer.close()
|
||
return
|
||
|
||
# Check encryption flag from handshake (stored during auth)
|
||
encrypted = getattr(self, "_last_auth_encrypted", False)
|
||
|
||
crypto = self._get_crypto(device_id)
|
||
tunnel = DeviceTunnel(device_id, reader, writer, crypto, encrypted)
|
||
|
||
# Replace existing tunnel for this device
|
||
old = self._device_tunnels.pop(device_id, None)
|
||
if old:
|
||
old.connected = False
|
||
try:
|
||
old.writer.close()
|
||
except Exception:
|
||
pass
|
||
|
||
self._device_tunnels[device_id] = tunnel
|
||
if self._active_device is None:
|
||
self._active_device = device_id
|
||
|
||
log.info(f"Device '{device_id}' tunnel authenticated "
|
||
f"(encrypted={'yes' if encrypted else 'no'})")
|
||
|
||
# Run frame dispatch loop
|
||
await tunnel.run_rx_loop(self)
|
||
|
||
# Cleanup on disconnect
|
||
self._device_tunnels.pop(device_id, None)
|
||
if self._active_device == device_id:
|
||
# Pick another active device if available
|
||
if self._device_tunnels:
|
||
self._active_device = next(iter(self._device_tunnels))
|
||
else:
|
||
self._active_device = None
|
||
|
||
log.info(f"Device '{device_id}' tunnel disconnected")
|
||
|
||
async def _authenticate_device(self, reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter) -> Optional[str]:
|
||
"""
|
||
Verify ESP32 handshake:
|
||
magic[4] + flags[1] + device_id_len[1] + device_id[N] + encrypted_token[45]
|
||
"""
|
||
try:
|
||
# Read magic
|
||
magic = await asyncio.wait_for(reader.readexactly(4), timeout=10)
|
||
if magic != TUN_MAGIC:
|
||
log.warning(f"Bad tunnel magic: {magic!r}")
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
return None
|
||
|
||
# Read flags + device_id_len
|
||
meta = await reader.readexactly(2)
|
||
flags = meta[0]
|
||
id_len = meta[1]
|
||
|
||
if id_len == 0 or id_len > 64:
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
return None
|
||
|
||
device_id = (await reader.readexactly(id_len)).decode(errors="ignore")
|
||
|
||
# Get crypto for this device
|
||
crypto = self._get_crypto(device_id)
|
||
if crypto is None:
|
||
log.warning(f"Unknown device in tunnel auth: '{device_id}'")
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
return None
|
||
|
||
# Read encrypted auth token
|
||
# The token "espilon-tunnel-v1" (17 bytes) encrypted =
|
||
# nonce(12) + ciphertext(17) + tag(16) = 45 bytes
|
||
enc_token = await reader.readexactly(45)
|
||
|
||
# Decrypt and verify
|
||
try:
|
||
plaintext = crypto.decrypt(enc_token)
|
||
except Exception as e:
|
||
log.warning(f"Tunnel auth decrypt failed for '{device_id}': {e}")
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
return None
|
||
|
||
if plaintext != TUN_AUTH_TOKEN:
|
||
log.warning(f"Tunnel auth token mismatch for '{device_id}'")
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
return None
|
||
|
||
# Store encryption preference
|
||
self._last_auth_encrypted = bool(flags & 0x01)
|
||
|
||
# Auth OK
|
||
writer.write(b"\x00")
|
||
await writer.drain()
|
||
return device_id
|
||
|
||
except (asyncio.TimeoutError, asyncio.IncompleteReadError, Exception) as e:
|
||
log.warning(f"Tunnel auth error: {e}")
|
||
try:
|
||
writer.write(b"\x01")
|
||
await writer.drain()
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
# ==========================================================
|
||
# SOCKS5 client handling
|
||
# ==========================================================
|
||
async def _handle_socks_client(self, reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter):
|
||
addr = writer.get_extra_info("peername")
|
||
|
||
try:
|
||
# 1) Find active tunnel
|
||
tunnel = self._get_active_tunnel()
|
||
if not tunnel:
|
||
log.warning(f"SOCKS5 from {addr}: no active tunnel")
|
||
writer.close()
|
||
return
|
||
|
||
# 2) SOCKS5 auth negotiation
|
||
if not await self._socks5_handshake(reader, writer):
|
||
return
|
||
|
||
# 3) SOCKS5 CONNECT request
|
||
target = await self._socks5_connect(reader, writer)
|
||
if not target:
|
||
return
|
||
|
||
host, port, atyp_data = target
|
||
|
||
# 4) Allocate channel
|
||
channel_id = tunnel.allocate_channel_id()
|
||
if channel_id is None:
|
||
log.warning(f"SOCKS5: no free channels for {host}:{port}")
|
||
await self._socks5_reply(writer, 0x01) # General failure
|
||
return
|
||
|
||
channel = TunnelChannel(channel_id, reader, writer)
|
||
channel.target = f"{host}:{port}"
|
||
tunnel.channels[channel_id] = channel
|
||
|
||
# 5) Send OPEN frame to ESP32
|
||
open_payload = self._build_open_payload(host, port)
|
||
await tunnel.send_frame(channel_id, FRAME_OPEN, open_payload)
|
||
|
||
# 6) Wait for OPEN_OK or ERROR
|
||
try:
|
||
await asyncio.wait_for(channel.open_event.wait(),
|
||
timeout=OPEN_TIMEOUT_S)
|
||
except asyncio.TimeoutError:
|
||
log.warning(f"SOCKS5: OPEN timeout for {host}:{port}")
|
||
tunnel.channels.pop(channel_id, None)
|
||
await self._socks5_reply(writer, 0x04) # Host unreachable
|
||
return
|
||
|
||
if not channel.open_success:
|
||
log.warning(f"SOCKS5: OPEN failed for {host}:{port}: "
|
||
f"{channel.error_msg}")
|
||
tunnel.channels.pop(channel_id, None)
|
||
await self._socks5_reply(writer, 0x05) # Connection refused
|
||
return
|
||
|
||
# 7) SOCKS5 success reply
|
||
await self._socks5_reply(writer, 0x00)
|
||
log.info(f"SOCKS5: chan {channel_id} -> {host}:{port}")
|
||
|
||
# 8) Relay: SOCKS client -> DATA frames -> ESP32
|
||
await self._relay_socks_to_tunnel(channel, tunnel)
|
||
|
||
except Exception as e:
|
||
log.error(f"SOCKS5 error from {addr}: {e}")
|
||
finally:
|
||
try:
|
||
writer.close()
|
||
await writer.wait_closed()
|
||
except Exception:
|
||
pass
|
||
|
||
def _get_active_tunnel(self) -> Optional[DeviceTunnel]:
|
||
"""Get the currently active device tunnel."""
|
||
if self._active_device:
|
||
tunnel = self._device_tunnels.get(self._active_device)
|
||
if tunnel and tunnel.connected:
|
||
return tunnel
|
||
# Try any connected tunnel
|
||
for did, tunnel in self._device_tunnels.items():
|
||
if tunnel.connected:
|
||
self._active_device = did
|
||
return tunnel
|
||
return None
|
||
|
||
async def _socks5_handshake(self, reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter) -> bool:
|
||
"""SOCKS5 auth negotiation — accept NO_AUTH."""
|
||
try:
|
||
data = await asyncio.wait_for(reader.read(257), timeout=5)
|
||
if len(data) < 3 or data[0] != SOCKS5_VER:
|
||
writer.close()
|
||
return False
|
||
|
||
# Accept NO_AUTH (0x00)
|
||
n_methods = data[1]
|
||
methods = data[2:2 + n_methods]
|
||
if 0x00 in methods:
|
||
writer.write(bytes([SOCKS5_VER, 0x00]))
|
||
await writer.drain()
|
||
return True
|
||
else:
|
||
writer.write(bytes([SOCKS5_VER, 0xFF]))
|
||
await writer.drain()
|
||
writer.close()
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
async def _socks5_connect(self, reader: asyncio.StreamReader,
|
||
writer: asyncio.StreamWriter
|
||
) -> Optional[tuple[str, int, bytes]]:
|
||
"""Parse SOCKS5 CONNECT request. Returns (host, port, raw_atyp_data)."""
|
||
try:
|
||
data = await asyncio.wait_for(reader.read(262), timeout=5)
|
||
if len(data) < 7 or data[0] != SOCKS5_VER:
|
||
await self._socks5_reply(writer, 0x01)
|
||
return None
|
||
|
||
cmd = data[1]
|
||
if cmd != SOCKS5_CMD_CONNECT:
|
||
await self._socks5_reply(writer, 0x07) # Command not supported
|
||
return None
|
||
|
||
atyp = data[3]
|
||
if atyp == SOCKS5_ATYP_IPV4:
|
||
if len(data) < 10:
|
||
await self._socks5_reply(writer, 0x01)
|
||
return None
|
||
host = socket.inet_ntoa(data[4:8])
|
||
port = struct.unpack(">H", data[8:10])[0]
|
||
return (host, port, data[3:10])
|
||
|
||
elif atyp == SOCKS5_ATYP_DOMAIN:
|
||
domain_len = data[4]
|
||
if len(data) < 5 + domain_len + 2:
|
||
await self._socks5_reply(writer, 0x01)
|
||
return None
|
||
host = data[5:5 + domain_len].decode(errors="ignore")
|
||
port = struct.unpack(">H",
|
||
data[5 + domain_len:7 + domain_len])[0]
|
||
return (host, port, data[3:7 + domain_len])
|
||
|
||
elif atyp == SOCKS5_ATYP_IPV6:
|
||
await self._socks5_reply(writer, 0x08) # Atyp not supported
|
||
return None
|
||
|
||
else:
|
||
await self._socks5_reply(writer, 0x08)
|
||
return None
|
||
|
||
except Exception:
|
||
return None
|
||
|
||
async def _socks5_reply(self, writer: asyncio.StreamWriter, status: int):
|
||
"""Send SOCKS5 reply."""
|
||
reply = struct.pack("!BBBB4sH",
|
||
SOCKS5_VER, status, 0x00,
|
||
SOCKS5_ATYP_IPV4,
|
||
b"\x00\x00\x00\x00", 0)
|
||
try:
|
||
writer.write(reply)
|
||
await writer.drain()
|
||
except Exception:
|
||
pass
|
||
|
||
def _build_open_payload(self, host: str, port: int) -> bytes:
|
||
"""Build OPEN frame payload: [IPv4:4][port:2][domain_len:1][domain:0-255]"""
|
||
# Try to parse as IPv4
|
||
try:
|
||
ipv4_bytes = socket.inet_aton(host)
|
||
domain = b""
|
||
except OSError:
|
||
# It's a domain name — send 0.0.0.0 as fallback IP,
|
||
# ESP32 will resolve via getaddrinfo
|
||
ipv4_bytes = b"\x00\x00\x00\x00"
|
||
domain = host.encode()[:255]
|
||
|
||
return (ipv4_bytes
|
||
+ struct.pack(">H", port)
|
||
+ bytes([len(domain)])
|
||
+ domain)
|
||
|
||
async def _relay_socks_to_tunnel(self, channel: TunnelChannel,
|
||
tunnel: DeviceTunnel):
|
||
"""Read from SOCKS client, send DATA frames to ESP32."""
|
||
try:
|
||
while not channel.closed and tunnel.connected:
|
||
data = await channel.socks_reader.read(SOCKS_READ_SIZE)
|
||
if not data:
|
||
break
|
||
channel.bytes_tx += len(data)
|
||
|
||
# Split into frame-sized chunks
|
||
offset = 0
|
||
while offset < len(data):
|
||
chunk = data[offset:offset + FRAME_MAX_DATA]
|
||
await tunnel.send_frame(channel.channel_id,
|
||
FRAME_DATA, chunk)
|
||
offset += len(chunk)
|
||
|
||
except (ConnectionError, OSError):
|
||
pass
|
||
finally:
|
||
if channel.channel_id in tunnel.channels:
|
||
tunnel.channels.pop(channel.channel_id, None)
|
||
await tunnel.send_frame(channel.channel_id, FRAME_CLOSE,
|
||
bytes([CLOSE_NORMAL]))
|
||
log.info(f"SOCKS5: chan {channel.channel_id} done "
|
||
f"({channel.target} tx={channel.bytes_tx} "
|
||
f"rx={channel.bytes_rx})")
|
||
|
||
# ==========================================================
|
||
# Status API (for web/TUI)
|
||
# ==========================================================
|
||
def get_status(self) -> dict:
|
||
"""Return tunnel status for API/UI."""
|
||
tunnels = []
|
||
for did, t in self._device_tunnels.items():
|
||
channels = []
|
||
total_tx = 0
|
||
total_rx = 0
|
||
for cid, ch in t.channels.items():
|
||
channels.append({
|
||
"id": cid,
|
||
"target": ch.target,
|
||
"state": "closed" if ch.closed else "open",
|
||
"bytes_tx": ch.bytes_tx,
|
||
"bytes_rx": ch.bytes_rx,
|
||
})
|
||
total_tx += ch.bytes_tx
|
||
total_rx += ch.bytes_rx
|
||
tunnels.append({
|
||
"device_id": did,
|
||
"connected": t.connected,
|
||
"encrypted": t.encrypted,
|
||
"active_channels": len(t.channels),
|
||
"max_channels": t.max_channels,
|
||
"channels": channels,
|
||
"bytes_tx": total_tx,
|
||
"bytes_rx": total_rx,
|
||
})
|
||
|
||
socks_running = self._socks_server is not None and self._socks_server.is_serving()
|
||
return {
|
||
"socks_running": socks_running,
|
||
"socks_addr": f"{self.socks_host}:{self.socks_port}",
|
||
"active_device": self._active_device,
|
||
"tunnels": tunnels,
|
||
}
|
||
|
||
def set_active_device(self, device_id: str) -> bool:
|
||
"""Switch SOCKS traffic to a different device."""
|
||
if device_id in self._device_tunnels:
|
||
self._active_device = device_id
|
||
return True
|
||
return False
|