#!/usr/bin/env python3 """ Epsilon Deploy - Unified build, provision & flash pipeline for ESP32. Combines firmware building, crypto key provisioning, and flashing into a single automated workflow with correct partition offsets. Usage: python deploy.py # Interactive wizard python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ --wifi MySSID MyPass --srv 192.168.1.100 # Quick CLI deploy python deploy.py --config deploy.json # Batch from config python deploy.py --provision-only -p /dev/ttyUSB0 -d x # Keys only """ import argparse import json import os import secrets import shutil import subprocess import sys import tempfile from dataclasses import dataclass from glob import glob as _glob from pathlib import Path from typing import Dict, List, Optional, Tuple # ─── Paths ──────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent PROJECT_DIR = SCRIPT_DIR.parent / "espilon_bot" KEYSTORE_DEFAULT = SCRIPT_DIR / "C3PO" / "keys.json" # ─── Partition layout (must match partitions.csv) ───────────────────────────── CHIP = "esp32" BAUD = 460800 ADDR_BOOTLOADER = 0x1000 ADDR_PTABLE = 0x8000 ADDR_OTADATA = 0xD000 ADDR_FCTRY = 0x10000 ADDR_OTA_0 = 0x20000 FCTRY_SIZE = 0x6000 # 24 KB NVS_NAMESPACE = "crypto" NVS_KEY = "master_key" KEY_LEN = 32 # 256-bit master key # ─── Terminal helpers ───────────────────────────────────────────────────────── class C: RST = "\033[0m" B = "\033[1m" DIM = "\033[2m" RED = "\033[91m" GRN = "\033[92m" YLW = "\033[93m" BLU = "\033[94m" CYN = "\033[96m" def _tag(color: str, tag: str, msg: str): print(f" {color}{C.B}[{tag}]{C.RST} {msg}") def ok(m): _tag(C.GRN, " OK ", m) def info(m): _tag(C.BLU, " .. ", m) def warn(m): _tag(C.YLW, " !! ", m) def err(m): _tag(C.RED, " XX ", m) def step_header(n, m): pad = max(0, 42 - len(m)) print(f"\n {C.CYN}{C.B}--- Step {n}: {m} {'-' * pad}{C.RST}\n") def banner(): print(f"""\n{C.CYN}{C.B}\ +==================================================+ | E Epsilon Deploy Tool | | Build - Provision - Flash - Verify | +==================================================+{C.RST}\n""") def box(lines: List[str]): w = max((len(l) for l in lines), default=0) + 4 print(f" {C.DIM}+{'-' * w}+{C.RST}") for l in lines: print(f" {C.DIM}|{C.RST} {l}{' ' * (w - len(l) - 2)}{C.DIM}|{C.RST}") print(f" {C.DIM}+{'-' * w}+{C.RST}") # ─── Device Config ──────────────────────────────────────────────────────────── @dataclass class DeviceConfig: device_id: str = "" port: str = "" srv_ip: str = "192.168.1.100" srv_port: int = 2626 network_mode: str = "wifi" # "wifi" or "gprs" wifi_ssid: str = "" wifi_pass: str = "" gprs_apn: str = "sl2sfr" hostname: str = "" mod_network: bool = True mod_recon: bool = False mod_fakeap: bool = False mod_honeypot: bool = False mod_canbus: bool = False mod_fallback: bool = False mod_redteam: bool = False recon_camera: bool = False recon_ble_trilat: bool = False ota_enabled: bool = True ota_allow_http: bool = False master_key_hex: str = "" # empty = generate random # ══════════════════════════════════════════════════════════════════════════════ # Pipeline Steps # ══════════════════════════════════════════════════════════════════════════════ # ─── 1. Environment Check ──────────────────────────────────────────────────── def check_environment() -> str: """Validate ESP-IDF and tools. Returns idf_path.""" # Find IDF idf_path = os.environ.get("IDF_PATH", "") if not idf_path or not os.path.isdir(idf_path): for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]: if os.path.isdir(candidate): idf_path = candidate break if not idf_path or not os.path.isdir(idf_path): err("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.") sys.exit(1) ok(f"ESP-IDF : {idf_path}") # Find esptool esptool = shutil.which("esptool.py") or shutil.which("esptool") if esptool: ok(f"esptool : {esptool}") else: warn("esptool.py not in PATH -- will try via IDF export.sh") # Verify project directory if not PROJECT_DIR.is_dir(): err(f"Project directory not found: {PROJECT_DIR}") sys.exit(1) ok(f"Project : {PROJECT_DIR}") return idf_path # ─── 2. sdkconfig Generation ───────────────────────────────────────────────── def generate_sdkconfig(cfg: DeviceConfig) -> str: """Generate sdkconfig.defaults content for a device.""" lines = [ "# Generated by epsilon deploy -- do not edit manually", "", "# Device", f'CONFIG_DEVICE_ID="{cfg.device_id}"', "", "# Network", ] if cfg.network_mode == "wifi": lines += [ "CONFIG_NETWORK_WIFI=y", f'CONFIG_WIFI_SSID="{cfg.wifi_ssid}"', f'CONFIG_WIFI_PASS="{cfg.wifi_pass}"', ] else: lines += [ "CONFIG_NETWORK_GPRS=y", f'CONFIG_GPRS_APN="{cfg.gprs_apn}"', ] lines += [ "", "# C2 Server", f'CONFIG_SERVER_IP="{cfg.srv_ip}"', f"CONFIG_SERVER_PORT={cfg.srv_port}", "", "# Crypto -- ChaCha20-Poly1305 + HKDF (mbedtls, ESP-IDF v5.3)", 'CONFIG_CRYPTO_FCTRY_NS="crypto"', 'CONFIG_CRYPTO_FCTRY_KEY="master_key"', "CONFIG_MBEDTLS_CHACHA20_C=y", "CONFIG_MBEDTLS_POLY1305_C=y", "CONFIG_MBEDTLS_CHACHAPOLY_C=y", "CONFIG_MBEDTLS_HKDF_C=y", "", "# Flash & Partitions", "CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y", "CONFIG_PARTITION_TABLE_CUSTOM=y", f'CONFIG_PARTITION_TABLE_CUSTOM_FILENAME=' f'"{("partitions.csv" if cfg.ota_enabled else "partitions_noota.csv")}"', "", "# LWIP", "CONFIG_LWIP_IPV4_NAPT=y", "CONFIG_LWIP_IPV4_NAPT_PORTMAP=y", "CONFIG_LWIP_IP_FORWARD=y", ] if cfg.hostname: lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{cfg.hostname}"') lines += [ "", "# Modules", f'CONFIG_MODULE_NETWORK={"y" if cfg.mod_network else "n"}', f'CONFIG_MODULE_RECON={"y" if cfg.mod_recon else "n"}', f'CONFIG_MODULE_FAKEAP={"y" if cfg.mod_fakeap else "n"}', f'CONFIG_MODULE_HONEYPOT={"y" if cfg.mod_honeypot else "n"}', f'CONFIG_MODULE_CANBUS={"y" if cfg.mod_canbus else "n"}', f'CONFIG_MODULE_FALLBACK={"y" if cfg.mod_fallback else "n"}', f'CONFIG_MODULE_REDTEAM={"y" if cfg.mod_redteam else "n"}', ] if cfg.mod_recon: lines += [ f'CONFIG_RECON_MODE_CAMERA={"y" if cfg.recon_camera else "n"}', f'CONFIG_RECON_MODE_MLAT={"y" if cfg.recon_ble_trilat else "n"}', ] if cfg.recon_camera: lines += [ "", "# PSRAM (required for camera frame buffers)", "CONFIG_SPIRAM=y", "CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY=y", ] if cfg.recon_ble_trilat: lines += ["", "# BLE", "CONFIG_BT_ENABLED=y", "CONFIG_BT_BLUEDROID_ENABLED=y", "CONFIG_BT_BLE_ENABLED=y"] lines += [ "", "# OTA", f'CONFIG_ESPILON_OTA_ENABLED={"y" if cfg.ota_enabled else "n"}', ] if cfg.ota_enabled: if cfg.ota_allow_http: lines += [ f'CONFIG_ESPILON_OTA_ALLOW_HTTP=y', "CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y", ] else: lines += [ "CONFIG_MBEDTLS_CERTIFICATE_BUNDLE=y", "CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_FULL=y", ] lines += [ "", "# Logging", "CONFIG_ESPILON_LOG_LEVEL_INFO=y", "CONFIG_ESPILON_LOG_BOOT_SUMMARY=y", ] return "\n".join(lines) + "\n" def write_sdkconfig(cfg: DeviceConfig): """Write sdkconfig.defaults, backing up the existing one.""" sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults" if sdkconfig_path.exists(): backup = sdkconfig_path.with_suffix(".defaults.bak") shutil.copy2(sdkconfig_path, backup) info(f"Backed up existing config -> {backup.name}") content = generate_sdkconfig(cfg) with open(sdkconfig_path, "w") as f: f.write(content) ok(f"Generated sdkconfig.defaults for {cfg.device_id}") # ─── 3. Build Firmware ─────────────────────────────────────────────────────── def find_app_binary(build_dir: Path) -> str: """Find the application binary in the build directory.""" desc_file = build_dir / "project_description.json" if desc_file.exists(): with open(desc_file) as f: desc = json.load(f) app_bin = desc.get("app_bin", "") if app_bin and os.path.exists(app_bin): return app_bin name = desc.get("project_name", "bot-lwip") candidate = str(build_dir / f"{name}.bin") if os.path.exists(candidate): return candidate # Fallback: try known names for name in ["bot-lwip.bin", "epsilon_bot.bin"]: p = str(build_dir / name) if os.path.exists(p): return p err("Cannot find application binary in build/") sys.exit(1) def build_firmware(idf_path: str) -> Dict[str, str]: """Build firmware via idf.py. Returns dict of binary paths.""" # Full clean: remove sdkconfig AND build dir to avoid stale CMake cache # (required when switching between OTA/non-OTA configs) sdkconfig = PROJECT_DIR / "sdkconfig" build_dir = PROJECT_DIR / "build" if sdkconfig.exists(): sdkconfig.unlink() if build_dir.exists(): shutil.rmtree(build_dir) info("Cleaned build directory") info("Running idf.py build (this may take several minutes)...") cmd = ( f". {idf_path}/export.sh > /dev/null 2>&1 && " f"idf.py -C {PROJECT_DIR} " f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults " f"build 2>&1" ) result = subprocess.run( ["bash", "-c", cmd], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: err("Build failed!") lines = result.stdout.strip().split("\n") for line in lines[-50:]: print(f" {C.DIM}{line}{C.RST}") sys.exit(1) ok("Build succeeded") return locate_binaries() def locate_binaries() -> Dict[str, str]: """Locate all required binaries in build/.""" build_dir = PROJECT_DIR / "build" bins: Dict[str, str] = { "bootloader": str(build_dir / "bootloader" / "bootloader.bin"), "partition_table": str(build_dir / "partition_table" / "partition-table.bin"), "app": find_app_binary(build_dir), } ota_data = build_dir / "ota_data_initial.bin" if ota_data.exists(): bins["otadata"] = str(ota_data) for name, path in bins.items(): if not os.path.exists(path): err(f"Missing build artifact: {name} -> {path}") sys.exit(1) size = os.path.getsize(path) ok(f"{name:16s} {path} ({size:,} bytes)") return bins # ─── 4. Key Generation ─────────────────────────────────────────────────────── def generate_master_key(cfg: DeviceConfig) -> bytes: """Generate or parse the 32-byte master key.""" if cfg.master_key_hex: try: key = bytes.fromhex(cfg.master_key_hex) except ValueError: err("--key must be valid hex") sys.exit(1) if len(key) != KEY_LEN: err(f"Master key must be {KEY_LEN} bytes ({KEY_LEN * 2} hex chars), " f"got {len(key)}") sys.exit(1) info(f"Using provided master key: {key.hex()[:16]}...") else: key = secrets.token_bytes(KEY_LEN) ok(f"Generated master key: {key.hex()[:16]}...") return key # ─── 5. NVS Binary Generation ──────────────────────────────────────────────── def generate_nvs_binary(master_key: bytes, tmpdir: str) -> str: """Generate factory NVS partition binary from master key.""" csv_path = os.path.join(tmpdir, "fctry.csv") bin_path = os.path.join(tmpdir, "fctry.bin") with open(csv_path, "w") as f: f.write("key,type,encoding,value\n") f.write(f"{NVS_NAMESPACE},namespace,,\n") f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n") # Try the Python module (pip install esp-idf-nvs-partition-gen) generated = False try: r = subprocess.run( [sys.executable, "-m", "esp_idf_nvs_partition_gen", "generate", csv_path, bin_path, hex(FCTRY_SIZE)], capture_output=True, text=True, timeout=30, ) generated = r.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired): pass # Fallback: IDF script if not generated: idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf")) nvs_tool = os.path.join( idf_path, "components", "nvs_flash", "nvs_partition_generator", "nvs_partition_gen.py", ) if os.path.exists(nvs_tool): r = subprocess.run( [sys.executable, nvs_tool, "generate", csv_path, bin_path, hex(FCTRY_SIZE)], capture_output=True, text=True, timeout=30, ) generated = r.returncode == 0 if not generated: err("Failed to generate NVS binary. " "Install: pip install esp-idf-nvs-partition-gen") sys.exit(1) size = os.path.getsize(bin_path) ok(f"NVS binary generated: {size:,} bytes") return bin_path # ─── 6. Flash ──────────────────────────────────────────────────────────────── def esptool_cmd(port: str) -> List[str]: """Base esptool command.""" return ["esptool.py", "--chip", CHIP, "--port", port, "--baud", str(BAUD)] def flash_erase(port: str): """Full flash erase.""" info("Erasing entire flash...") try: subprocess.run(esptool_cmd(port) + ["erase_flash"], check=True, timeout=60) ok("Flash erased") except subprocess.CalledProcessError as e: err(f"Erase failed: {e}") sys.exit(1) def flash_all(cfg: DeviceConfig, binaries: Dict[str, str], nvs_bin: str, erase: bool = False): """Flash bootloader + partition table + factory NVS + app (+ otadata).""" if erase: flash_erase(cfg.port) # Build flash map flash_map = [ (ADDR_BOOTLOADER, binaries["bootloader"], "Bootloader"), (ADDR_PTABLE, binaries["partition_table"], "Partition table"), (ADDR_FCTRY, nvs_bin, "Factory NVS (master key)"), (ADDR_OTA_0, binaries["app"], "Application (ota_0)"), ] if "otadata" in binaries: flash_map.append((ADDR_OTADATA, binaries["otadata"], "OTA data")) # Display map box([f"{label:25s} @ {hex(addr)}" for addr, _, label in flash_map]) # Build esptool args cmd = esptool_cmd(cfg.port) + ["write_flash", "-z"] for addr, path, _ in flash_map: cmd.extend([hex(addr), path]) info("Flashing all partitions...") try: subprocess.run(cmd, check=True, timeout=120) ok("All partitions flashed successfully") except subprocess.CalledProcessError as e: err(f"Flash failed: {e}") sys.exit(1) except FileNotFoundError: err("esptool.py not found. Install: pip install esptool") sys.exit(1) def flash_fctry_only(port: str, nvs_bin: str): """Flash only the factory NVS partition (provision-only mode).""" info(f"Flashing factory NVS to {port} at {hex(ADDR_FCTRY)}...") try: subprocess.run( esptool_cmd(port) + ["write_flash", hex(ADDR_FCTRY), nvs_bin], check=True, timeout=60, ) ok("Factory NVS flashed") except subprocess.CalledProcessError as e: err(f"Flash failed: {e}") sys.exit(1) # ─── 7. Keystore ───────────────────────────────────────────────────────────── def update_keystore(device_id: str, master_key: bytes, keystore_path: str): """Add/update device key in the C2 keystore (keys.json).""" keys: Dict[str, str] = {} if os.path.exists(keystore_path): try: with open(keystore_path) as f: keys = json.load(f) except (json.JSONDecodeError, ValueError): warn(f"Corrupted keystore, starting fresh: {keystore_path}") keys[device_id] = master_key.hex() os.makedirs(os.path.dirname(os.path.abspath(keystore_path)), exist_ok=True) with open(keystore_path, "w") as f: json.dump(keys, f, indent=2) ok(f"Keystore updated: {keystore_path}") ok(f" {device_id} -> {master_key.hex()[:16]}...") # ─── 8. Serial Monitor ─────────────────────────────────────────────────────── def monitor_serial(port: str, idf_path: str): """Launch idf.py monitor for post-flash verification.""" info(f"Starting serial monitor on {port} (Ctrl+] to exit)") try: subprocess.run( ["bash", "-c", f". {idf_path}/export.sh > /dev/null 2>&1 && " f"idf.py -C {PROJECT_DIR} -p {port} monitor"], ) except KeyboardInterrupt: print() ok("Monitor closed") # ══════════════════════════════════════════════════════════════════════════════ # Interactive Wizard # ══════════════════════════════════════════════════════════════════════════════ def detect_serial_ports() -> List[str]: """Detect available serial ports on Linux.""" return sorted( _glob("/dev/ttyUSB*") + _glob("/dev/ttyACM*") ) def ask(msg: str, default: str = "") -> str: """Prompt with optional default.""" if default: val = input(f" {C.B}{msg}{C.RST} [{C.DIM}{default}{C.RST}]: ").strip() return val or default return input(f" {C.B}{msg}{C.RST}: ").strip() def ask_yn(msg: str, default: bool = True) -> bool: """Yes/no prompt.""" hint = "Y/n" if default else "y/N" val = input(f" {C.B}{msg}{C.RST} [{hint}]: ").strip().lower() if not val: return default return val in ("y", "yes", "o", "oui") def ask_choice(msg: str, choices: List[str], default: int = 0) -> int: """Numbered choice prompt.""" print(f"\n {C.B}{msg}{C.RST}") for i, c in enumerate(choices): marker = f"{C.GRN}>{C.RST}" if i == default else " " print(f" {marker} {i + 1}) {c}") val = input(f" Choice [{default + 1}]: ").strip() if not val: return default try: idx = int(val) - 1 if 0 <= idx < len(choices): return idx except ValueError: pass return default def interactive_wizard() -> DeviceConfig: """Guided interactive setup.""" cfg = DeviceConfig() print(f"\n {C.B}{C.CYN}--- Device ---{C.RST}\n") # Port ports = detect_serial_ports() if ports: info(f"Detected ports: {', '.join(ports)}") cfg.port = ask("Serial port", ports[0]) else: cfg.port = ask("Serial port (e.g. /dev/ttyUSB0)") # Device ID rand_id = secrets.token_hex(4) cfg.device_id = ask("Device ID (8 hex chars)", rand_id) # Network mode = ask_choice("Network mode:", ["WiFi", "GPRS"], 0) cfg.network_mode = "wifi" if mode == 0 else "gprs" if cfg.network_mode == "wifi": cfg.wifi_ssid = ask("WiFi SSID") cfg.wifi_pass = ask("WiFi password") else: cfg.gprs_apn = ask("GPRS APN", "sl2sfr") # Hostname cfg.hostname = ask("Device hostname (empty = random)", "") print(f"\n {C.B}{C.CYN}--- C2 Server ---{C.RST}\n") cfg.srv_ip = ask("Server IP", "192.168.1.100") cfg.srv_port = int(ask("Server port", "2626")) print(f"\n {C.B}{C.CYN}--- Modules ---{C.RST}\n") cfg.mod_network = ask_yn("Enable network module (ping, arp, proxy, dos)?", True) cfg.mod_fakeap = ask_yn("Enable fakeAP module (captive portal, sniffer)?", False) cfg.mod_honeypot = ask_yn("Enable honeypot module (SSH/Telnet/HTTP/FTP, WiFi/net monitor)?", False) cfg.mod_recon = ask_yn("Enable recon module?", False) if cfg.mod_recon: cfg.recon_camera = ask_yn(" Enable camera?", False) cfg.recon_ble_trilat = ask_yn(" Enable BLE trilateration?", False) cfg.mod_fallback = ask_yn("Enable fallback module (autonomous WiFi recovery)?", False) cfg.ota_enabled = ask_yn("Enable OTA updates?", True) print(f"\n {C.B}{C.CYN}--- Security ---{C.RST}\n") if ask_yn("Generate new random 256-bit master key?", True): cfg.master_key_hex = "" else: cfg.master_key_hex = ask("Master key (64 hex chars)") return cfg # ══════════════════════════════════════════════════════════════════════════════ # Deploy Pipeline # ══════════════════════════════════════════════════════════════════════════════ def deploy_one(cfg: DeviceConfig, *, build: bool = True, flash: bool = True, provision_only: bool = False, erase: bool = False, monitor: bool = False, keystore_path: str = ""): """Full deploy pipeline for a single device.""" ks_path = keystore_path or str(KEYSTORE_DEFAULT) # Summary net_info = (f"{cfg.network_mode.upper()} " + (f"({cfg.wifi_ssid})" if cfg.network_mode == "wifi" else f"({cfg.gprs_apn})")) mods = ", ".join(filter(None, [ "network" if cfg.mod_network else None, "fakeAP" if cfg.mod_fakeap else None, "honeypot" if cfg.mod_honeypot else None, "recon" if cfg.mod_recon else None, "fallback" if cfg.mod_fallback else None, "redteam" if cfg.mod_redteam else None, "canbus" if cfg.mod_canbus else None, ])) or "none" box([ f"Device ID : {cfg.device_id}", f"Port : {cfg.port}", f"Network : {net_info}", f"C2 Server : {cfg.srv_ip}:{cfg.srv_port}", f"Modules : {mods}", f"OTA : {'yes' if cfg.ota_enabled else 'no'}", f"Erase : {'yes' if erase else 'no'}", f"Mode : {'provision-only' if provision_only else 'full deploy'}", ]) with tempfile.TemporaryDirectory(prefix="epsilon-deploy-") as tmpdir: # ── Step 1: Environment ── step_header(1, "Environment Check") idf_path = check_environment() binaries: Dict[str, str] = {} if not provision_only: # ── Step 2: sdkconfig ── step_header(2, "Generate Configuration") write_sdkconfig(cfg) # ── Step 3: Build or locate ── if build: step_header(3, "Build Firmware") binaries = build_firmware(idf_path) else: step_header(3, "Locate Existing Binaries") binaries = locate_binaries() # ── Step 4: Crypto ── step_header(4, "Master Key") master_key = generate_master_key(cfg) # ── Step 5: NVS ── step_header(5, "Generate Factory NVS Partition") nvs_bin = generate_nvs_binary(master_key, tmpdir) # ── Step 6: Flash ── if flash: step_header(6, "Flash Device") if provision_only: flash_fctry_only(cfg.port, nvs_bin) else: flash_all(cfg, binaries, nvs_bin, erase=erase) # ── Step 7: Keystore ── step_header(7, "Update C2 Keystore") update_keystore(cfg.device_id, master_key, ks_path) # ── Done ── print(f"\n {C.GRN}{C.B}{'=' * 50}") print(f" Deploy complete for {cfg.device_id}") print(f" {'=' * 50}{C.RST}\n") # ── Optional monitor ── if monitor and flash: step_header("*", "Serial Monitor") monitor_serial(cfg.port, idf_path) # ══════════════════════════════════════════════════════════════════════════════ # CLI / Entry Point # ══════════════════════════════════════════════════════════════════════════════ def config_from_dict(d: dict, defaults: Optional[dict] = None) -> DeviceConfig: """Create DeviceConfig from a JSON dict with optional defaults. Supports both flat format (legacy) and nested format: Flat: {"device_id": "x", "srv_ip": "...", "module_network": true, ...} Nested: {"device_id": "x", "server": {"ip": "..."}, "modules": {"network": true}, ...} When defaults is provided, device-level keys override defaults. """ dfl = defaults or {} # Merge nested sections: defaults <- device overrides def merged(section: str) -> dict: base = dict(dfl.get(section, {})) base.update(d.get(section, {})) return base server = merged("server") network = merged("network") modules = merged("modules") ota = merged("ota") return DeviceConfig( device_id=d.get("device_id", secrets.token_hex(4)), port=d.get("port", dfl.get("port", "")), hostname=d.get("hostname", dfl.get("hostname", "")), # Server srv_ip=server.get("ip", d.get("srv_ip", "192.168.1.100")), srv_port=server.get("port", d.get("srv_port", 2626)), # Network network_mode=network.get("mode", d.get("network_mode", "wifi")), wifi_ssid=network.get("wifi_ssid", d.get("wifi_ssid", "")), wifi_pass=network.get("wifi_pass", d.get("wifi_pass", "")), gprs_apn=network.get("gprs_apn", d.get("gprs_apn", "sl2sfr")), # Modules mod_network=modules.get("network", d.get("module_network", True)), mod_recon=modules.get("recon", d.get("module_recon", False)), mod_fakeap=modules.get("fakeap", d.get("module_fakeap", False)), mod_honeypot=modules.get("honeypot", d.get("module_honeypot", False)), mod_canbus=modules.get("canbus", d.get("module_canbus", False)), mod_fallback=modules.get("fallback", d.get("module_fallback", False)), recon_camera=modules.get("recon_camera", d.get("recon_camera", False)), recon_ble_trilat=modules.get("recon_ble_trilat", d.get("recon_ble_trilat", False)), # OTA ota_enabled=ota.get("enabled", d.get("ota_enabled", True)), ota_allow_http=ota.get("allow_http", d.get("ota_allow_http", False)), # Security master_key_hex=d.get("master_key", dfl.get("master_key", "")), ) def main() -> int: parser = argparse.ArgumentParser( description="Epsilon Deploy - Unified build, provision & flash", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Interactive wizard (recommended for first use) python deploy.py # Full deploy with WiFi python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ --wifi MySSID MyPass --srv 192.168.1.100 # Full deploy with GPRS python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ --gprs sl2sfr --srv 203.0.113.10 # Provision only (generate key + flash NVS, no rebuild) python deploy.py --provision-only -p /dev/ttyUSB0 -d abc12345 # Flash existing build (skip rebuild) python deploy.py --flash-only -p /dev/ttyUSB0 -d abc12345 \\ --wifi MySSID MyPass --srv 192.168.1.100 # Clean install (erase flash first) python deploy.py --erase -p /dev/ttyUSB0 -d abc12345 \\ --wifi MySSID MyPass --srv 192.168.1.100 # Deploy + watch serial output python deploy.py --monitor -p /dev/ttyUSB0 -d abc12345 \\ --wifi MySSID MyPass --srv 192.168.1.100 # Batch deploy from config file python deploy.py --config deploy.json """, ) # Device parser.add_argument("-d", "--device-id", help="Device ID (default: random 8 hex)") parser.add_argument("-p", "--port", help="Serial port (e.g. /dev/ttyUSB0)") # Network (mutually exclusive) net = parser.add_mutually_exclusive_group() net.add_argument("--wifi", nargs=2, metavar=("SSID", "PASS"), help="WiFi mode with SSID and password") net.add_argument("--gprs", metavar="APN", nargs="?", const="sl2sfr", help="GPRS mode (default APN: sl2sfr)") # Server parser.add_argument("--srv", metavar="IP", help="C2 server IP") parser.add_argument("--srv-port", type=int, default=2626) # Modules parser.add_argument("--mod-fakeap", action="store_true", default=False, help="Enable fakeAP module") parser.add_argument("--mod-honeypot", action="store_true", default=False, help="Enable honeypot module (TCP services, WiFi/net monitors)") parser.add_argument("--mod-recon", action="store_true", default=False, help="Enable recon module") parser.add_argument("--recon-camera", action="store_true", default=False, help="Enable camera") parser.add_argument("--recon-ble", action="store_true", default=False, help="Enable BLE trilateration") parser.add_argument("--mod-fallback", action="store_true", default=False, help="Enable fallback module (autonomous WiFi recovery)") parser.add_argument("--mod-redteam", action="store_true", default=False, help="Enable red team module (offensive operations)") parser.add_argument("--no-network", action="store_true", default=False, help="Disable network module") parser.add_argument("--no-ota", action="store_true", default=False, help="Disable OTA module") parser.add_argument("--ota-http", action="store_true", default=False, help="Allow OTA over plain HTTP (insecure, local networks)") # Security parser.add_argument("--key", metavar="HEX", help="Master key as 64 hex chars (default: random)") parser.add_argument("--keystore", type=Path, default=None, help=f"C2 keystore path (default: {KEYSTORE_DEFAULT})") # Operating modes parser.add_argument("--provision-only", action="store_true", help="Only generate key and flash factory NVS") parser.add_argument("--flash-only", action="store_true", help="Flash existing build (skip rebuild)") parser.add_argument("--build-only", action="store_true", help="Build only (no flash, no provision)") parser.add_argument("--erase", action="store_true", help="Erase entire flash before writing") parser.add_argument("--monitor", action="store_true", help="Start serial monitor after flash") parser.add_argument("--hostname", help="Device hostname (default: random)") # Config file parser.add_argument("--config", type=Path, help="Load device configs from JSON file") args = parser.parse_args() banner() # ── Config file mode ── if args.config: with open(args.config) as f: data = json.load(f) defaults = data.get("defaults", {}) devices = data.get("devices", [data]) total = len(devices) ok_count = 0 for i, d in enumerate(devices, 1): print(f"\n {C.B}### Device {i}/{total}: {d.get('device_id', '?')} ###{C.RST}") cfg = config_from_dict(d, defaults) try: deploy_one( cfg, build=not args.flash_only, flash=not args.build_only, provision_only=args.provision_only, erase=args.erase, monitor=args.monitor, keystore_path=str(args.keystore) if args.keystore else "", ) ok_count += 1 except SystemExit: err(f"Failed for {cfg.device_id}, continuing...") print(f"\n {C.B}Results: {ok_count}/{total} succeeded{C.RST}") return 0 if ok_count == total else 1 # ── Interactive mode (no args) ── if not args.device_id and not args.port and not args.provision_only: cfg = interactive_wizard() print() if not ask_yn("Proceed with deployment?", True): print(" Aborted.") return 1 deploy_one( cfg, build=not args.flash_only, flash=not args.build_only, erase=args.erase, monitor=args.monitor, keystore_path=str(args.keystore) if args.keystore else "", ) return 0 # ── CLI mode ── if not args.port: err("--port / -p is required in CLI mode") return 1 cfg = DeviceConfig( device_id=args.device_id or secrets.token_hex(4), port=args.port, srv_ip=args.srv or "192.168.1.100", srv_port=args.srv_port, network_mode="gprs" if args.gprs else "wifi", wifi_ssid=args.wifi[0] if args.wifi else "", wifi_pass=args.wifi[1] if args.wifi else "", gprs_apn=args.gprs if args.gprs else "sl2sfr", hostname=args.hostname or "", mod_network=not args.no_network, mod_fakeap=args.mod_fakeap, mod_honeypot=args.mod_honeypot, mod_recon=args.mod_recon, mod_fallback=args.mod_fallback, mod_redteam=args.mod_redteam, recon_camera=args.recon_camera, recon_ble_trilat=args.recon_ble, ota_enabled=not args.no_ota, ota_allow_http=args.ota_http, master_key_hex=args.key or "", ) # Validate WiFi (unless provision-only which doesn't need it) if (cfg.network_mode == "wifi" and not args.provision_only and not args.build_only and (not cfg.wifi_ssid or not cfg.wifi_pass)): err("WiFi mode requires --wifi SSID PASS") return 1 deploy_one( cfg, build=not args.flash_only and not args.provision_only, flash=not args.build_only, provision_only=args.provision_only, erase=args.erase, monitor=args.monitor, keystore_path=str(args.keystore) if args.keystore else "", ) return 0 if __name__ == "__main__": sys.exit(main())