espilon-source/tools/deploy.py
Eun0us 12b851581a tools: replace flasher/provisioning with unified deploy system + espmon
Add deploy.py: multi-device build/flash/provision with JSON config.
Add espmon/: real-time ESP32 fleet monitoring daemon.
Remove tools/flasher/ and tools/provisioning/ (superseded).
2026-02-28 20:15:57 +01:00

995 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Epsilon Deploy - Unified build, provision & flash pipeline for ESP32.
Combines firmware building, crypto key provisioning, and flashing
into a single automated workflow with correct partition offsets.
Usage:
python deploy.py # Interactive wizard
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100 # Quick CLI deploy
python deploy.py --config deploy.json # Batch from config
python deploy.py --provision-only -p /dev/ttyUSB0 -d x # Keys only
"""
import argparse
import json
import os
import secrets
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from glob import glob as _glob
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ─── Paths ────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_DIR = SCRIPT_DIR.parent / "espilon_bot"
KEYSTORE_DEFAULT = SCRIPT_DIR / "C3PO" / "keys.json"
# ─── Partition layout (must match partitions.csv) ─────────────────────────────
CHIP = "esp32"
BAUD = 460800
ADDR_BOOTLOADER = 0x1000
ADDR_PTABLE = 0x8000
ADDR_OTADATA = 0xD000
ADDR_FCTRY = 0x10000
ADDR_OTA_0 = 0x20000
FCTRY_SIZE = 0x6000 # 24 KB
NVS_NAMESPACE = "crypto"
NVS_KEY = "master_key"
KEY_LEN = 32 # 256-bit master key
# ─── Terminal helpers ─────────────────────────────────────────────────────────
class C:
RST = "\033[0m"
B = "\033[1m"
DIM = "\033[2m"
RED = "\033[91m"
GRN = "\033[92m"
YLW = "\033[93m"
BLU = "\033[94m"
CYN = "\033[96m"
def _tag(color: str, tag: str, msg: str):
print(f" {color}{C.B}[{tag}]{C.RST} {msg}")
def ok(m): _tag(C.GRN, " OK ", m)
def info(m): _tag(C.BLU, " .. ", m)
def warn(m): _tag(C.YLW, " !! ", m)
def err(m): _tag(C.RED, " XX ", m)
def step_header(n, m):
pad = max(0, 42 - len(m))
print(f"\n {C.CYN}{C.B}--- Step {n}: {m} {'-' * pad}{C.RST}\n")
def banner():
print(f"""\n{C.CYN}{C.B}\
+==================================================+
| E Epsilon Deploy Tool |
| Build - Provision - Flash - Verify |
+==================================================+{C.RST}\n""")
def box(lines: List[str]):
w = max((len(l) for l in lines), default=0) + 4
print(f" {C.DIM}+{'-' * w}+{C.RST}")
for l in lines:
print(f" {C.DIM}|{C.RST} {l}{' ' * (w - len(l) - 2)}{C.DIM}|{C.RST}")
print(f" {C.DIM}+{'-' * w}+{C.RST}")
# ─── Device Config ────────────────────────────────────────────────────────────
@dataclass
class DeviceConfig:
device_id: str = ""
port: str = ""
srv_ip: str = "192.168.1.100"
srv_port: int = 2626
network_mode: str = "wifi" # "wifi" or "gprs"
wifi_ssid: str = ""
wifi_pass: str = ""
gprs_apn: str = "sl2sfr"
hostname: str = ""
mod_network: bool = True
mod_recon: bool = False
mod_fakeap: bool = False
mod_honeypot: bool = False
mod_canbus: bool = False
mod_fallback: bool = False
mod_redteam: bool = False
recon_camera: bool = False
recon_ble_trilat: bool = False
ota_enabled: bool = True
ota_allow_http: bool = False
master_key_hex: str = "" # empty = generate random
# ══════════════════════════════════════════════════════════════════════════════
# Pipeline Steps
# ══════════════════════════════════════════════════════════════════════════════
# ─── 1. Environment Check ────────────────────────────────────────────────────
def check_environment() -> str:
"""Validate ESP-IDF and tools. Returns idf_path."""
# Find IDF
idf_path = os.environ.get("IDF_PATH", "")
if not idf_path or not os.path.isdir(idf_path):
for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]:
if os.path.isdir(candidate):
idf_path = candidate
break
if not idf_path or not os.path.isdir(idf_path):
err("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.")
sys.exit(1)
ok(f"ESP-IDF : {idf_path}")
# Find esptool
esptool = shutil.which("esptool.py") or shutil.which("esptool")
if esptool:
ok(f"esptool : {esptool}")
else:
warn("esptool.py not in PATH -- will try via IDF export.sh")
# Verify project directory
if not PROJECT_DIR.is_dir():
err(f"Project directory not found: {PROJECT_DIR}")
sys.exit(1)
ok(f"Project : {PROJECT_DIR}")
return idf_path
# ─── 2. sdkconfig Generation ─────────────────────────────────────────────────
def generate_sdkconfig(cfg: DeviceConfig) -> str:
"""Generate sdkconfig.defaults content for a device."""
lines = [
"# Generated by epsilon deploy -- do not edit manually",
"",
"# Device",
f'CONFIG_DEVICE_ID="{cfg.device_id}"',
"",
"# Network",
]
if cfg.network_mode == "wifi":
lines += [
"CONFIG_NETWORK_WIFI=y",
f'CONFIG_WIFI_SSID="{cfg.wifi_ssid}"',
f'CONFIG_WIFI_PASS="{cfg.wifi_pass}"',
]
else:
lines += [
"CONFIG_NETWORK_GPRS=y",
f'CONFIG_GPRS_APN="{cfg.gprs_apn}"',
]
lines += [
"",
"# C2 Server",
f'CONFIG_SERVER_IP="{cfg.srv_ip}"',
f"CONFIG_SERVER_PORT={cfg.srv_port}",
"",
"# Crypto -- ChaCha20-Poly1305 + HKDF (mbedtls, ESP-IDF v5.3)",
'CONFIG_CRYPTO_FCTRY_NS="crypto"',
'CONFIG_CRYPTO_FCTRY_KEY="master_key"',
"CONFIG_MBEDTLS_CHACHA20_C=y",
"CONFIG_MBEDTLS_POLY1305_C=y",
"CONFIG_MBEDTLS_CHACHAPOLY_C=y",
"CONFIG_MBEDTLS_HKDF_C=y",
"",
"# Flash & Partitions",
"CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y",
"CONFIG_PARTITION_TABLE_CUSTOM=y",
f'CONFIG_PARTITION_TABLE_CUSTOM_FILENAME='
f'"{("partitions.csv" if cfg.ota_enabled else "partitions_noota.csv")}"',
"",
"# LWIP",
"CONFIG_LWIP_IPV4_NAPT=y",
"CONFIG_LWIP_IPV4_NAPT_PORTMAP=y",
"CONFIG_LWIP_IP_FORWARD=y",
]
if cfg.hostname:
lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{cfg.hostname}"')
lines += [
"",
"# Modules",
f'CONFIG_MODULE_NETWORK={"y" if cfg.mod_network else "n"}',
f'CONFIG_MODULE_RECON={"y" if cfg.mod_recon else "n"}',
f'CONFIG_MODULE_FAKEAP={"y" if cfg.mod_fakeap else "n"}',
f'CONFIG_MODULE_HONEYPOT={"y" if cfg.mod_honeypot else "n"}',
f'CONFIG_MODULE_CANBUS={"y" if cfg.mod_canbus else "n"}',
f'CONFIG_MODULE_FALLBACK={"y" if cfg.mod_fallback else "n"}',
f'CONFIG_MODULE_REDTEAM={"y" if cfg.mod_redteam else "n"}',
]
if cfg.mod_recon:
lines += [
f'CONFIG_RECON_MODE_CAMERA={"y" if cfg.recon_camera else "n"}',
f'CONFIG_RECON_MODE_MLAT={"y" if cfg.recon_ble_trilat else "n"}',
]
if cfg.recon_camera:
lines += [
"",
"# PSRAM (required for camera frame buffers)",
"CONFIG_SPIRAM=y",
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY=y",
]
if cfg.recon_ble_trilat:
lines += ["", "# BLE", "CONFIG_BT_ENABLED=y",
"CONFIG_BT_BLUEDROID_ENABLED=y", "CONFIG_BT_BLE_ENABLED=y"]
lines += [
"",
"# OTA",
f'CONFIG_ESPILON_OTA_ENABLED={"y" if cfg.ota_enabled else "n"}',
]
if cfg.ota_enabled:
if cfg.ota_allow_http:
lines += [
f'CONFIG_ESPILON_OTA_ALLOW_HTTP=y',
"CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y",
]
else:
lines += [
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE=y",
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_FULL=y",
]
lines += [
"",
"# Logging",
"CONFIG_ESPILON_LOG_LEVEL_INFO=y",
"CONFIG_ESPILON_LOG_BOOT_SUMMARY=y",
]
return "\n".join(lines) + "\n"
def write_sdkconfig(cfg: DeviceConfig):
"""Write sdkconfig.defaults, backing up the existing one."""
sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults"
if sdkconfig_path.exists():
backup = sdkconfig_path.with_suffix(".defaults.bak")
shutil.copy2(sdkconfig_path, backup)
info(f"Backed up existing config -> {backup.name}")
content = generate_sdkconfig(cfg)
with open(sdkconfig_path, "w") as f:
f.write(content)
ok(f"Generated sdkconfig.defaults for {cfg.device_id}")
# ─── 3. Build Firmware ───────────────────────────────────────────────────────
def find_app_binary(build_dir: Path) -> str:
"""Find the application binary in the build directory."""
desc_file = build_dir / "project_description.json"
if desc_file.exists():
with open(desc_file) as f:
desc = json.load(f)
app_bin = desc.get("app_bin", "")
if app_bin and os.path.exists(app_bin):
return app_bin
name = desc.get("project_name", "bot-lwip")
candidate = str(build_dir / f"{name}.bin")
if os.path.exists(candidate):
return candidate
# Fallback: try known names
for name in ["bot-lwip.bin", "epsilon_bot.bin"]:
p = str(build_dir / name)
if os.path.exists(p):
return p
err("Cannot find application binary in build/")
sys.exit(1)
def build_firmware(idf_path: str) -> Dict[str, str]:
"""Build firmware via idf.py. Returns dict of binary paths."""
# Full clean: remove sdkconfig AND build dir to avoid stale CMake cache
# (required when switching between OTA/non-OTA configs)
sdkconfig = PROJECT_DIR / "sdkconfig"
build_dir = PROJECT_DIR / "build"
if sdkconfig.exists():
sdkconfig.unlink()
if build_dir.exists():
shutil.rmtree(build_dir)
info("Cleaned build directory")
info("Running idf.py build (this may take several minutes)...")
cmd = (
f". {idf_path}/export.sh > /dev/null 2>&1 && "
f"idf.py -C {PROJECT_DIR} "
f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults "
f"build 2>&1"
)
result = subprocess.run(
["bash", "-c", cmd],
capture_output=True, text=True, timeout=600,
)
if result.returncode != 0:
err("Build failed!")
lines = result.stdout.strip().split("\n")
for line in lines[-50:]:
print(f" {C.DIM}{line}{C.RST}")
sys.exit(1)
ok("Build succeeded")
return locate_binaries()
def locate_binaries() -> Dict[str, str]:
"""Locate all required binaries in build/."""
build_dir = PROJECT_DIR / "build"
bins: Dict[str, str] = {
"bootloader": str(build_dir / "bootloader" / "bootloader.bin"),
"partition_table": str(build_dir / "partition_table" / "partition-table.bin"),
"app": find_app_binary(build_dir),
}
ota_data = build_dir / "ota_data_initial.bin"
if ota_data.exists():
bins["otadata"] = str(ota_data)
for name, path in bins.items():
if not os.path.exists(path):
err(f"Missing build artifact: {name} -> {path}")
sys.exit(1)
size = os.path.getsize(path)
ok(f"{name:16s} {path} ({size:,} bytes)")
return bins
# ─── 4. Key Generation ───────────────────────────────────────────────────────
def generate_master_key(cfg: DeviceConfig) -> bytes:
"""Generate or parse the 32-byte master key."""
if cfg.master_key_hex:
try:
key = bytes.fromhex(cfg.master_key_hex)
except ValueError:
err("--key must be valid hex")
sys.exit(1)
if len(key) != KEY_LEN:
err(f"Master key must be {KEY_LEN} bytes ({KEY_LEN * 2} hex chars), "
f"got {len(key)}")
sys.exit(1)
info(f"Using provided master key: {key.hex()[:16]}...")
else:
key = secrets.token_bytes(KEY_LEN)
ok(f"Generated master key: {key.hex()[:16]}...")
return key
# ─── 5. NVS Binary Generation ────────────────────────────────────────────────
def generate_nvs_binary(master_key: bytes, tmpdir: str) -> str:
"""Generate factory NVS partition binary from master key."""
csv_path = os.path.join(tmpdir, "fctry.csv")
bin_path = os.path.join(tmpdir, "fctry.bin")
with open(csv_path, "w") as f:
f.write("key,type,encoding,value\n")
f.write(f"{NVS_NAMESPACE},namespace,,\n")
f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n")
# Try the Python module (pip install esp-idf-nvs-partition-gen)
generated = False
try:
r = subprocess.run(
[sys.executable, "-m", "esp_idf_nvs_partition_gen",
"generate", csv_path, bin_path, hex(FCTRY_SIZE)],
capture_output=True, text=True, timeout=30,
)
generated = r.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Fallback: IDF script
if not generated:
idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf"))
nvs_tool = os.path.join(
idf_path, "components", "nvs_flash",
"nvs_partition_generator", "nvs_partition_gen.py",
)
if os.path.exists(nvs_tool):
r = subprocess.run(
[sys.executable, nvs_tool, "generate",
csv_path, bin_path, hex(FCTRY_SIZE)],
capture_output=True, text=True, timeout=30,
)
generated = r.returncode == 0
if not generated:
err("Failed to generate NVS binary. "
"Install: pip install esp-idf-nvs-partition-gen")
sys.exit(1)
size = os.path.getsize(bin_path)
ok(f"NVS binary generated: {size:,} bytes")
return bin_path
# ─── 6. Flash ────────────────────────────────────────────────────────────────
def esptool_cmd(port: str) -> List[str]:
"""Base esptool command."""
return ["esptool.py", "--chip", CHIP, "--port", port, "--baud", str(BAUD)]
def flash_erase(port: str):
"""Full flash erase."""
info("Erasing entire flash...")
try:
subprocess.run(esptool_cmd(port) + ["erase_flash"],
check=True, timeout=60)
ok("Flash erased")
except subprocess.CalledProcessError as e:
err(f"Erase failed: {e}")
sys.exit(1)
def flash_all(cfg: DeviceConfig, binaries: Dict[str, str], nvs_bin: str,
erase: bool = False):
"""Flash bootloader + partition table + factory NVS + app (+ otadata)."""
if erase:
flash_erase(cfg.port)
# Build flash map
flash_map = [
(ADDR_BOOTLOADER, binaries["bootloader"], "Bootloader"),
(ADDR_PTABLE, binaries["partition_table"], "Partition table"),
(ADDR_FCTRY, nvs_bin, "Factory NVS (master key)"),
(ADDR_OTA_0, binaries["app"], "Application (ota_0)"),
]
if "otadata" in binaries:
flash_map.append((ADDR_OTADATA, binaries["otadata"], "OTA data"))
# Display map
box([f"{label:25s} @ {hex(addr)}" for addr, _, label in flash_map])
# Build esptool args
cmd = esptool_cmd(cfg.port) + ["write_flash", "-z"]
for addr, path, _ in flash_map:
cmd.extend([hex(addr), path])
info("Flashing all partitions...")
try:
subprocess.run(cmd, check=True, timeout=120)
ok("All partitions flashed successfully")
except subprocess.CalledProcessError as e:
err(f"Flash failed: {e}")
sys.exit(1)
except FileNotFoundError:
err("esptool.py not found. Install: pip install esptool")
sys.exit(1)
def flash_fctry_only(port: str, nvs_bin: str):
"""Flash only the factory NVS partition (provision-only mode)."""
info(f"Flashing factory NVS to {port} at {hex(ADDR_FCTRY)}...")
try:
subprocess.run(
esptool_cmd(port) + ["write_flash", hex(ADDR_FCTRY), nvs_bin],
check=True, timeout=60,
)
ok("Factory NVS flashed")
except subprocess.CalledProcessError as e:
err(f"Flash failed: {e}")
sys.exit(1)
# ─── 7. Keystore ─────────────────────────────────────────────────────────────
def update_keystore(device_id: str, master_key: bytes, keystore_path: str):
"""Add/update device key in the C2 keystore (keys.json)."""
keys: Dict[str, str] = {}
if os.path.exists(keystore_path):
try:
with open(keystore_path) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
warn(f"Corrupted keystore, starting fresh: {keystore_path}")
keys[device_id] = master_key.hex()
os.makedirs(os.path.dirname(os.path.abspath(keystore_path)), exist_ok=True)
with open(keystore_path, "w") as f:
json.dump(keys, f, indent=2)
ok(f"Keystore updated: {keystore_path}")
ok(f" {device_id} -> {master_key.hex()[:16]}...")
# ─── 8. Serial Monitor ───────────────────────────────────────────────────────
def monitor_serial(port: str, idf_path: str):
"""Launch idf.py monitor for post-flash verification."""
info(f"Starting serial monitor on {port} (Ctrl+] to exit)")
try:
subprocess.run(
["bash", "-c",
f". {idf_path}/export.sh > /dev/null 2>&1 && "
f"idf.py -C {PROJECT_DIR} -p {port} monitor"],
)
except KeyboardInterrupt:
print()
ok("Monitor closed")
# ══════════════════════════════════════════════════════════════════════════════
# Interactive Wizard
# ══════════════════════════════════════════════════════════════════════════════
def detect_serial_ports() -> List[str]:
"""Detect available serial ports on Linux."""
return sorted(
_glob("/dev/ttyUSB*") + _glob("/dev/ttyACM*")
)
def ask(msg: str, default: str = "") -> str:
"""Prompt with optional default."""
if default:
val = input(f" {C.B}{msg}{C.RST} [{C.DIM}{default}{C.RST}]: ").strip()
return val or default
return input(f" {C.B}{msg}{C.RST}: ").strip()
def ask_yn(msg: str, default: bool = True) -> bool:
"""Yes/no prompt."""
hint = "Y/n" if default else "y/N"
val = input(f" {C.B}{msg}{C.RST} [{hint}]: ").strip().lower()
if not val:
return default
return val in ("y", "yes", "o", "oui")
def ask_choice(msg: str, choices: List[str], default: int = 0) -> int:
"""Numbered choice prompt."""
print(f"\n {C.B}{msg}{C.RST}")
for i, c in enumerate(choices):
marker = f"{C.GRN}>{C.RST}" if i == default else " "
print(f" {marker} {i + 1}) {c}")
val = input(f" Choice [{default + 1}]: ").strip()
if not val:
return default
try:
idx = int(val) - 1
if 0 <= idx < len(choices):
return idx
except ValueError:
pass
return default
def interactive_wizard() -> DeviceConfig:
"""Guided interactive setup."""
cfg = DeviceConfig()
print(f"\n {C.B}{C.CYN}--- Device ---{C.RST}\n")
# Port
ports = detect_serial_ports()
if ports:
info(f"Detected ports: {', '.join(ports)}")
cfg.port = ask("Serial port", ports[0])
else:
cfg.port = ask("Serial port (e.g. /dev/ttyUSB0)")
# Device ID
rand_id = secrets.token_hex(4)
cfg.device_id = ask("Device ID (8 hex chars)", rand_id)
# Network
mode = ask_choice("Network mode:", ["WiFi", "GPRS"], 0)
cfg.network_mode = "wifi" if mode == 0 else "gprs"
if cfg.network_mode == "wifi":
cfg.wifi_ssid = ask("WiFi SSID")
cfg.wifi_pass = ask("WiFi password")
else:
cfg.gprs_apn = ask("GPRS APN", "sl2sfr")
# Hostname
cfg.hostname = ask("Device hostname (empty = random)", "")
print(f"\n {C.B}{C.CYN}--- C2 Server ---{C.RST}\n")
cfg.srv_ip = ask("Server IP", "192.168.1.100")
cfg.srv_port = int(ask("Server port", "2626"))
print(f"\n {C.B}{C.CYN}--- Modules ---{C.RST}\n")
cfg.mod_network = ask_yn("Enable network module (ping, arp, proxy, dos)?", True)
cfg.mod_fakeap = ask_yn("Enable fakeAP module (captive portal, sniffer)?", False)
cfg.mod_honeypot = ask_yn("Enable honeypot module (SSH/Telnet/HTTP/FTP, WiFi/net monitor)?", False)
cfg.mod_recon = ask_yn("Enable recon module?", False)
if cfg.mod_recon:
cfg.recon_camera = ask_yn(" Enable camera?", False)
cfg.recon_ble_trilat = ask_yn(" Enable BLE trilateration?", False)
cfg.mod_fallback = ask_yn("Enable fallback module (autonomous WiFi recovery)?", False)
cfg.ota_enabled = ask_yn("Enable OTA updates?", True)
print(f"\n {C.B}{C.CYN}--- Security ---{C.RST}\n")
if ask_yn("Generate new random 256-bit master key?", True):
cfg.master_key_hex = ""
else:
cfg.master_key_hex = ask("Master key (64 hex chars)")
return cfg
# ══════════════════════════════════════════════════════════════════════════════
# Deploy Pipeline
# ══════════════════════════════════════════════════════════════════════════════
def deploy_one(cfg: DeviceConfig, *,
build: bool = True,
flash: bool = True,
provision_only: bool = False,
erase: bool = False,
monitor: bool = False,
keystore_path: str = ""):
"""Full deploy pipeline for a single device."""
ks_path = keystore_path or str(KEYSTORE_DEFAULT)
# Summary
net_info = (f"{cfg.network_mode.upper()} "
+ (f"({cfg.wifi_ssid})" if cfg.network_mode == "wifi"
else f"({cfg.gprs_apn})"))
mods = ", ".join(filter(None, [
"network" if cfg.mod_network else None,
"fakeAP" if cfg.mod_fakeap else None,
"honeypot" if cfg.mod_honeypot else None,
"recon" if cfg.mod_recon else None,
"fallback" if cfg.mod_fallback else None,
"redteam" if cfg.mod_redteam else None,
"canbus" if cfg.mod_canbus else None,
])) or "none"
box([
f"Device ID : {cfg.device_id}",
f"Port : {cfg.port}",
f"Network : {net_info}",
f"C2 Server : {cfg.srv_ip}:{cfg.srv_port}",
f"Modules : {mods}",
f"OTA : {'yes' if cfg.ota_enabled else 'no'}",
f"Erase : {'yes' if erase else 'no'}",
f"Mode : {'provision-only' if provision_only else 'full deploy'}",
])
with tempfile.TemporaryDirectory(prefix="epsilon-deploy-") as tmpdir:
# ── Step 1: Environment ──
step_header(1, "Environment Check")
idf_path = check_environment()
binaries: Dict[str, str] = {}
if not provision_only:
# ── Step 2: sdkconfig ──
step_header(2, "Generate Configuration")
write_sdkconfig(cfg)
# ── Step 3: Build or locate ──
if build:
step_header(3, "Build Firmware")
binaries = build_firmware(idf_path)
else:
step_header(3, "Locate Existing Binaries")
binaries = locate_binaries()
# ── Step 4: Crypto ──
step_header(4, "Master Key")
master_key = generate_master_key(cfg)
# ── Step 5: NVS ──
step_header(5, "Generate Factory NVS Partition")
nvs_bin = generate_nvs_binary(master_key, tmpdir)
# ── Step 6: Flash ──
if flash:
step_header(6, "Flash Device")
if provision_only:
flash_fctry_only(cfg.port, nvs_bin)
else:
flash_all(cfg, binaries, nvs_bin, erase=erase)
# ── Step 7: Keystore ──
step_header(7, "Update C2 Keystore")
update_keystore(cfg.device_id, master_key, ks_path)
# ── Done ──
print(f"\n {C.GRN}{C.B}{'=' * 50}")
print(f" Deploy complete for {cfg.device_id}")
print(f" {'=' * 50}{C.RST}\n")
# ── Optional monitor ──
if monitor and flash:
step_header("*", "Serial Monitor")
monitor_serial(cfg.port, idf_path)
# ══════════════════════════════════════════════════════════════════════════════
# CLI / Entry Point
# ══════════════════════════════════════════════════════════════════════════════
def config_from_dict(d: dict, defaults: Optional[dict] = None) -> DeviceConfig:
"""Create DeviceConfig from a JSON dict with optional defaults.
Supports both flat format (legacy) and nested format:
Flat: {"device_id": "x", "srv_ip": "...", "module_network": true, ...}
Nested: {"device_id": "x", "server": {"ip": "..."}, "modules": {"network": true}, ...}
When defaults is provided, device-level keys override defaults.
"""
dfl = defaults or {}
# Merge nested sections: defaults <- device overrides
def merged(section: str) -> dict:
base = dict(dfl.get(section, {}))
base.update(d.get(section, {}))
return base
server = merged("server")
network = merged("network")
modules = merged("modules")
ota = merged("ota")
return DeviceConfig(
device_id=d.get("device_id", secrets.token_hex(4)),
port=d.get("port", dfl.get("port", "")),
hostname=d.get("hostname", dfl.get("hostname", "")),
# Server
srv_ip=server.get("ip", d.get("srv_ip", "192.168.1.100")),
srv_port=server.get("port", d.get("srv_port", 2626)),
# Network
network_mode=network.get("mode", d.get("network_mode", "wifi")),
wifi_ssid=network.get("wifi_ssid", d.get("wifi_ssid", "")),
wifi_pass=network.get("wifi_pass", d.get("wifi_pass", "")),
gprs_apn=network.get("gprs_apn", d.get("gprs_apn", "sl2sfr")),
# Modules
mod_network=modules.get("network", d.get("module_network", True)),
mod_recon=modules.get("recon", d.get("module_recon", False)),
mod_fakeap=modules.get("fakeap", d.get("module_fakeap", False)),
mod_honeypot=modules.get("honeypot", d.get("module_honeypot", False)),
mod_canbus=modules.get("canbus", d.get("module_canbus", False)),
mod_fallback=modules.get("fallback", d.get("module_fallback", False)),
recon_camera=modules.get("recon_camera", d.get("recon_camera", False)),
recon_ble_trilat=modules.get("recon_ble_trilat", d.get("recon_ble_trilat", False)),
# OTA
ota_enabled=ota.get("enabled", d.get("ota_enabled", True)),
ota_allow_http=ota.get("allow_http", d.get("ota_allow_http", False)),
# Security
master_key_hex=d.get("master_key", dfl.get("master_key", "")),
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Epsilon Deploy - Unified build, provision & flash",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive wizard (recommended for first use)
python deploy.py
# Full deploy with WiFi
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Full deploy with GPRS
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--gprs sl2sfr --srv 203.0.113.10
# Provision only (generate key + flash NVS, no rebuild)
python deploy.py --provision-only -p /dev/ttyUSB0 -d abc12345
# Flash existing build (skip rebuild)
python deploy.py --flash-only -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Clean install (erase flash first)
python deploy.py --erase -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Deploy + watch serial output
python deploy.py --monitor -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Batch deploy from config file
python deploy.py --config deploy.json
""",
)
# Device
parser.add_argument("-d", "--device-id",
help="Device ID (default: random 8 hex)")
parser.add_argument("-p", "--port",
help="Serial port (e.g. /dev/ttyUSB0)")
# Network (mutually exclusive)
net = parser.add_mutually_exclusive_group()
net.add_argument("--wifi", nargs=2, metavar=("SSID", "PASS"),
help="WiFi mode with SSID and password")
net.add_argument("--gprs", metavar="APN", nargs="?", const="sl2sfr",
help="GPRS mode (default APN: sl2sfr)")
# Server
parser.add_argument("--srv", metavar="IP", help="C2 server IP")
parser.add_argument("--srv-port", type=int, default=2626)
# Modules
parser.add_argument("--mod-fakeap", action="store_true", default=False,
help="Enable fakeAP module")
parser.add_argument("--mod-honeypot", action="store_true", default=False,
help="Enable honeypot module (TCP services, WiFi/net monitors)")
parser.add_argument("--mod-recon", action="store_true", default=False,
help="Enable recon module")
parser.add_argument("--recon-camera", action="store_true", default=False,
help="Enable camera")
parser.add_argument("--recon-ble", action="store_true", default=False,
help="Enable BLE trilateration")
parser.add_argument("--mod-fallback", action="store_true", default=False,
help="Enable fallback module (autonomous WiFi recovery)")
parser.add_argument("--mod-redteam", action="store_true", default=False,
help="Enable red team module (offensive operations)")
parser.add_argument("--no-network", action="store_true", default=False,
help="Disable network module")
parser.add_argument("--no-ota", action="store_true", default=False,
help="Disable OTA module")
parser.add_argument("--ota-http", action="store_true", default=False,
help="Allow OTA over plain HTTP (insecure, local networks)")
# Security
parser.add_argument("--key", metavar="HEX",
help="Master key as 64 hex chars (default: random)")
parser.add_argument("--keystore", type=Path, default=None,
help=f"C2 keystore path (default: {KEYSTORE_DEFAULT})")
# Operating modes
parser.add_argument("--provision-only", action="store_true",
help="Only generate key and flash factory NVS")
parser.add_argument("--flash-only", action="store_true",
help="Flash existing build (skip rebuild)")
parser.add_argument("--build-only", action="store_true",
help="Build only (no flash, no provision)")
parser.add_argument("--erase", action="store_true",
help="Erase entire flash before writing")
parser.add_argument("--monitor", action="store_true",
help="Start serial monitor after flash")
parser.add_argument("--hostname",
help="Device hostname (default: random)")
# Config file
parser.add_argument("--config", type=Path,
help="Load device configs from JSON file")
args = parser.parse_args()
banner()
# ── Config file mode ──
if args.config:
with open(args.config) as f:
data = json.load(f)
defaults = data.get("defaults", {})
devices = data.get("devices", [data])
total = len(devices)
ok_count = 0
for i, d in enumerate(devices, 1):
print(f"\n {C.B}### Device {i}/{total}: {d.get('device_id', '?')} ###{C.RST}")
cfg = config_from_dict(d, defaults)
try:
deploy_one(
cfg,
build=not args.flash_only,
flash=not args.build_only,
provision_only=args.provision_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
ok_count += 1
except SystemExit:
err(f"Failed for {cfg.device_id}, continuing...")
print(f"\n {C.B}Results: {ok_count}/{total} succeeded{C.RST}")
return 0 if ok_count == total else 1
# ── Interactive mode (no args) ──
if not args.device_id and not args.port and not args.provision_only:
cfg = interactive_wizard()
print()
if not ask_yn("Proceed with deployment?", True):
print(" Aborted.")
return 1
deploy_one(
cfg,
build=not args.flash_only,
flash=not args.build_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
return 0
# ── CLI mode ──
if not args.port:
err("--port / -p is required in CLI mode")
return 1
cfg = DeviceConfig(
device_id=args.device_id or secrets.token_hex(4),
port=args.port,
srv_ip=args.srv or "192.168.1.100",
srv_port=args.srv_port,
network_mode="gprs" if args.gprs else "wifi",
wifi_ssid=args.wifi[0] if args.wifi else "",
wifi_pass=args.wifi[1] if args.wifi else "",
gprs_apn=args.gprs if args.gprs else "sl2sfr",
hostname=args.hostname or "",
mod_network=not args.no_network,
mod_fakeap=args.mod_fakeap,
mod_honeypot=args.mod_honeypot,
mod_recon=args.mod_recon,
mod_fallback=args.mod_fallback,
mod_redteam=args.mod_redteam,
recon_camera=args.recon_camera,
recon_ble_trilat=args.recon_ble,
ota_enabled=not args.no_ota,
ota_allow_http=args.ota_http,
master_key_hex=args.key or "",
)
# Validate WiFi (unless provision-only which doesn't need it)
if (cfg.network_mode == "wifi"
and not args.provision_only
and not args.build_only
and (not cfg.wifi_ssid or not cfg.wifi_pass)):
err("WiFi mode requires --wifi SSID PASS")
return 1
deploy_one(
cfg,
build=not args.flash_only and not args.provision_only,
flash=not args.build_only,
provision_only=args.provision_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
return 0
if __name__ == "__main__":
sys.exit(main())