Add deploy.py: multi-device build/flash/provision with JSON config. Add espmon/: real-time ESP32 fleet monitoring daemon. Remove tools/flasher/ and tools/provisioning/ (superseded).
995 lines
36 KiB
Python
Executable File
995 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Epsilon Deploy - Unified build, provision & flash pipeline for ESP32.
|
|
|
|
Combines firmware building, crypto key provisioning, and flashing
|
|
into a single automated workflow with correct partition offsets.
|
|
|
|
Usage:
|
|
python deploy.py # Interactive wizard
|
|
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
|
|
--wifi MySSID MyPass --srv 192.168.1.100 # Quick CLI deploy
|
|
python deploy.py --config deploy.json # Batch from config
|
|
python deploy.py --provision-only -p /dev/ttyUSB0 -d x # Keys only
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import secrets
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from glob import glob as _glob
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
# ─── Paths ────────────────────────────────────────────────────────────────────
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
PROJECT_DIR = SCRIPT_DIR.parent / "espilon_bot"
|
|
KEYSTORE_DEFAULT = SCRIPT_DIR / "C3PO" / "keys.json"
|
|
|
|
# ─── Partition layout (must match partitions.csv) ─────────────────────────────
|
|
CHIP = "esp32"
|
|
BAUD = 460800
|
|
ADDR_BOOTLOADER = 0x1000
|
|
ADDR_PTABLE = 0x8000
|
|
ADDR_OTADATA = 0xD000
|
|
ADDR_FCTRY = 0x10000
|
|
ADDR_OTA_0 = 0x20000
|
|
|
|
FCTRY_SIZE = 0x6000 # 24 KB
|
|
NVS_NAMESPACE = "crypto"
|
|
NVS_KEY = "master_key"
|
|
KEY_LEN = 32 # 256-bit master key
|
|
|
|
|
|
# ─── Terminal helpers ─────────────────────────────────────────────────────────
|
|
class C:
|
|
RST = "\033[0m"
|
|
B = "\033[1m"
|
|
DIM = "\033[2m"
|
|
RED = "\033[91m"
|
|
GRN = "\033[92m"
|
|
YLW = "\033[93m"
|
|
BLU = "\033[94m"
|
|
CYN = "\033[96m"
|
|
|
|
|
|
def _tag(color: str, tag: str, msg: str):
|
|
print(f" {color}{C.B}[{tag}]{C.RST} {msg}")
|
|
|
|
|
|
def ok(m): _tag(C.GRN, " OK ", m)
|
|
def info(m): _tag(C.BLU, " .. ", m)
|
|
def warn(m): _tag(C.YLW, " !! ", m)
|
|
def err(m): _tag(C.RED, " XX ", m)
|
|
|
|
|
|
def step_header(n, m):
|
|
pad = max(0, 42 - len(m))
|
|
print(f"\n {C.CYN}{C.B}--- Step {n}: {m} {'-' * pad}{C.RST}\n")
|
|
|
|
|
|
def banner():
|
|
print(f"""\n{C.CYN}{C.B}\
|
|
+==================================================+
|
|
| E Epsilon Deploy Tool |
|
|
| Build - Provision - Flash - Verify |
|
|
+==================================================+{C.RST}\n""")
|
|
|
|
|
|
def box(lines: List[str]):
|
|
w = max((len(l) for l in lines), default=0) + 4
|
|
print(f" {C.DIM}+{'-' * w}+{C.RST}")
|
|
for l in lines:
|
|
print(f" {C.DIM}|{C.RST} {l}{' ' * (w - len(l) - 2)}{C.DIM}|{C.RST}")
|
|
print(f" {C.DIM}+{'-' * w}+{C.RST}")
|
|
|
|
|
|
# ─── Device Config ────────────────────────────────────────────────────────────
|
|
@dataclass
|
|
class DeviceConfig:
|
|
device_id: str = ""
|
|
port: str = ""
|
|
srv_ip: str = "192.168.1.100"
|
|
srv_port: int = 2626
|
|
network_mode: str = "wifi" # "wifi" or "gprs"
|
|
wifi_ssid: str = ""
|
|
wifi_pass: str = ""
|
|
gprs_apn: str = "sl2sfr"
|
|
hostname: str = ""
|
|
mod_network: bool = True
|
|
mod_recon: bool = False
|
|
mod_fakeap: bool = False
|
|
mod_honeypot: bool = False
|
|
mod_canbus: bool = False
|
|
mod_fallback: bool = False
|
|
mod_redteam: bool = False
|
|
recon_camera: bool = False
|
|
recon_ble_trilat: bool = False
|
|
ota_enabled: bool = True
|
|
ota_allow_http: bool = False
|
|
master_key_hex: str = "" # empty = generate random
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Pipeline Steps
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
# ─── 1. Environment Check ────────────────────────────────────────────────────
|
|
|
|
def check_environment() -> str:
|
|
"""Validate ESP-IDF and tools. Returns idf_path."""
|
|
# Find IDF
|
|
idf_path = os.environ.get("IDF_PATH", "")
|
|
if not idf_path or not os.path.isdir(idf_path):
|
|
for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]:
|
|
if os.path.isdir(candidate):
|
|
idf_path = candidate
|
|
break
|
|
if not idf_path or not os.path.isdir(idf_path):
|
|
err("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.")
|
|
sys.exit(1)
|
|
ok(f"ESP-IDF : {idf_path}")
|
|
|
|
# Find esptool
|
|
esptool = shutil.which("esptool.py") or shutil.which("esptool")
|
|
if esptool:
|
|
ok(f"esptool : {esptool}")
|
|
else:
|
|
warn("esptool.py not in PATH -- will try via IDF export.sh")
|
|
|
|
# Verify project directory
|
|
if not PROJECT_DIR.is_dir():
|
|
err(f"Project directory not found: {PROJECT_DIR}")
|
|
sys.exit(1)
|
|
ok(f"Project : {PROJECT_DIR}")
|
|
|
|
return idf_path
|
|
|
|
|
|
# ─── 2. sdkconfig Generation ─────────────────────────────────────────────────
|
|
|
|
def generate_sdkconfig(cfg: DeviceConfig) -> str:
|
|
"""Generate sdkconfig.defaults content for a device."""
|
|
lines = [
|
|
"# Generated by epsilon deploy -- do not edit manually",
|
|
"",
|
|
"# Device",
|
|
f'CONFIG_DEVICE_ID="{cfg.device_id}"',
|
|
"",
|
|
"# Network",
|
|
]
|
|
|
|
if cfg.network_mode == "wifi":
|
|
lines += [
|
|
"CONFIG_NETWORK_WIFI=y",
|
|
f'CONFIG_WIFI_SSID="{cfg.wifi_ssid}"',
|
|
f'CONFIG_WIFI_PASS="{cfg.wifi_pass}"',
|
|
]
|
|
else:
|
|
lines += [
|
|
"CONFIG_NETWORK_GPRS=y",
|
|
f'CONFIG_GPRS_APN="{cfg.gprs_apn}"',
|
|
]
|
|
|
|
lines += [
|
|
"",
|
|
"# C2 Server",
|
|
f'CONFIG_SERVER_IP="{cfg.srv_ip}"',
|
|
f"CONFIG_SERVER_PORT={cfg.srv_port}",
|
|
"",
|
|
"# Crypto -- ChaCha20-Poly1305 + HKDF (mbedtls, ESP-IDF v5.3)",
|
|
'CONFIG_CRYPTO_FCTRY_NS="crypto"',
|
|
'CONFIG_CRYPTO_FCTRY_KEY="master_key"',
|
|
"CONFIG_MBEDTLS_CHACHA20_C=y",
|
|
"CONFIG_MBEDTLS_POLY1305_C=y",
|
|
"CONFIG_MBEDTLS_CHACHAPOLY_C=y",
|
|
"CONFIG_MBEDTLS_HKDF_C=y",
|
|
"",
|
|
"# Flash & Partitions",
|
|
"CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y",
|
|
"CONFIG_PARTITION_TABLE_CUSTOM=y",
|
|
f'CONFIG_PARTITION_TABLE_CUSTOM_FILENAME='
|
|
f'"{("partitions.csv" if cfg.ota_enabled else "partitions_noota.csv")}"',
|
|
"",
|
|
"# LWIP",
|
|
"CONFIG_LWIP_IPV4_NAPT=y",
|
|
"CONFIG_LWIP_IPV4_NAPT_PORTMAP=y",
|
|
"CONFIG_LWIP_IP_FORWARD=y",
|
|
]
|
|
|
|
if cfg.hostname:
|
|
lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{cfg.hostname}"')
|
|
|
|
lines += [
|
|
"",
|
|
"# Modules",
|
|
f'CONFIG_MODULE_NETWORK={"y" if cfg.mod_network else "n"}',
|
|
f'CONFIG_MODULE_RECON={"y" if cfg.mod_recon else "n"}',
|
|
f'CONFIG_MODULE_FAKEAP={"y" if cfg.mod_fakeap else "n"}',
|
|
f'CONFIG_MODULE_HONEYPOT={"y" if cfg.mod_honeypot else "n"}',
|
|
f'CONFIG_MODULE_CANBUS={"y" if cfg.mod_canbus else "n"}',
|
|
f'CONFIG_MODULE_FALLBACK={"y" if cfg.mod_fallback else "n"}',
|
|
f'CONFIG_MODULE_REDTEAM={"y" if cfg.mod_redteam else "n"}',
|
|
]
|
|
|
|
if cfg.mod_recon:
|
|
lines += [
|
|
f'CONFIG_RECON_MODE_CAMERA={"y" if cfg.recon_camera else "n"}',
|
|
f'CONFIG_RECON_MODE_MLAT={"y" if cfg.recon_ble_trilat else "n"}',
|
|
]
|
|
|
|
if cfg.recon_camera:
|
|
lines += [
|
|
"",
|
|
"# PSRAM (required for camera frame buffers)",
|
|
"CONFIG_SPIRAM=y",
|
|
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY=y",
|
|
]
|
|
|
|
if cfg.recon_ble_trilat:
|
|
lines += ["", "# BLE", "CONFIG_BT_ENABLED=y",
|
|
"CONFIG_BT_BLUEDROID_ENABLED=y", "CONFIG_BT_BLE_ENABLED=y"]
|
|
|
|
lines += [
|
|
"",
|
|
"# OTA",
|
|
f'CONFIG_ESPILON_OTA_ENABLED={"y" if cfg.ota_enabled else "n"}',
|
|
]
|
|
|
|
if cfg.ota_enabled:
|
|
if cfg.ota_allow_http:
|
|
lines += [
|
|
f'CONFIG_ESPILON_OTA_ALLOW_HTTP=y',
|
|
"CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y",
|
|
]
|
|
else:
|
|
lines += [
|
|
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE=y",
|
|
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_FULL=y",
|
|
]
|
|
|
|
lines += [
|
|
"",
|
|
"# Logging",
|
|
"CONFIG_ESPILON_LOG_LEVEL_INFO=y",
|
|
"CONFIG_ESPILON_LOG_BOOT_SUMMARY=y",
|
|
]
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def write_sdkconfig(cfg: DeviceConfig):
|
|
"""Write sdkconfig.defaults, backing up the existing one."""
|
|
sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults"
|
|
|
|
if sdkconfig_path.exists():
|
|
backup = sdkconfig_path.with_suffix(".defaults.bak")
|
|
shutil.copy2(sdkconfig_path, backup)
|
|
info(f"Backed up existing config -> {backup.name}")
|
|
|
|
content = generate_sdkconfig(cfg)
|
|
with open(sdkconfig_path, "w") as f:
|
|
f.write(content)
|
|
ok(f"Generated sdkconfig.defaults for {cfg.device_id}")
|
|
|
|
|
|
# ─── 3. Build Firmware ───────────────────────────────────────────────────────
|
|
|
|
def find_app_binary(build_dir: Path) -> str:
|
|
"""Find the application binary in the build directory."""
|
|
desc_file = build_dir / "project_description.json"
|
|
if desc_file.exists():
|
|
with open(desc_file) as f:
|
|
desc = json.load(f)
|
|
app_bin = desc.get("app_bin", "")
|
|
if app_bin and os.path.exists(app_bin):
|
|
return app_bin
|
|
name = desc.get("project_name", "bot-lwip")
|
|
candidate = str(build_dir / f"{name}.bin")
|
|
if os.path.exists(candidate):
|
|
return candidate
|
|
|
|
# Fallback: try known names
|
|
for name in ["bot-lwip.bin", "epsilon_bot.bin"]:
|
|
p = str(build_dir / name)
|
|
if os.path.exists(p):
|
|
return p
|
|
|
|
err("Cannot find application binary in build/")
|
|
sys.exit(1)
|
|
|
|
|
|
def build_firmware(idf_path: str) -> Dict[str, str]:
|
|
"""Build firmware via idf.py. Returns dict of binary paths."""
|
|
# Full clean: remove sdkconfig AND build dir to avoid stale CMake cache
|
|
# (required when switching between OTA/non-OTA configs)
|
|
sdkconfig = PROJECT_DIR / "sdkconfig"
|
|
build_dir = PROJECT_DIR / "build"
|
|
if sdkconfig.exists():
|
|
sdkconfig.unlink()
|
|
if build_dir.exists():
|
|
shutil.rmtree(build_dir)
|
|
info("Cleaned build directory")
|
|
|
|
info("Running idf.py build (this may take several minutes)...")
|
|
cmd = (
|
|
f". {idf_path}/export.sh > /dev/null 2>&1 && "
|
|
f"idf.py -C {PROJECT_DIR} "
|
|
f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults "
|
|
f"build 2>&1"
|
|
)
|
|
|
|
result = subprocess.run(
|
|
["bash", "-c", cmd],
|
|
capture_output=True, text=True, timeout=600,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
err("Build failed!")
|
|
lines = result.stdout.strip().split("\n")
|
|
for line in lines[-50:]:
|
|
print(f" {C.DIM}{line}{C.RST}")
|
|
sys.exit(1)
|
|
|
|
ok("Build succeeded")
|
|
return locate_binaries()
|
|
|
|
|
|
def locate_binaries() -> Dict[str, str]:
|
|
"""Locate all required binaries in build/."""
|
|
build_dir = PROJECT_DIR / "build"
|
|
bins: Dict[str, str] = {
|
|
"bootloader": str(build_dir / "bootloader" / "bootloader.bin"),
|
|
"partition_table": str(build_dir / "partition_table" / "partition-table.bin"),
|
|
"app": find_app_binary(build_dir),
|
|
}
|
|
|
|
ota_data = build_dir / "ota_data_initial.bin"
|
|
if ota_data.exists():
|
|
bins["otadata"] = str(ota_data)
|
|
|
|
for name, path in bins.items():
|
|
if not os.path.exists(path):
|
|
err(f"Missing build artifact: {name} -> {path}")
|
|
sys.exit(1)
|
|
size = os.path.getsize(path)
|
|
ok(f"{name:16s} {path} ({size:,} bytes)")
|
|
|
|
return bins
|
|
|
|
|
|
# ─── 4. Key Generation ───────────────────────────────────────────────────────
|
|
|
|
def generate_master_key(cfg: DeviceConfig) -> bytes:
|
|
"""Generate or parse the 32-byte master key."""
|
|
if cfg.master_key_hex:
|
|
try:
|
|
key = bytes.fromhex(cfg.master_key_hex)
|
|
except ValueError:
|
|
err("--key must be valid hex")
|
|
sys.exit(1)
|
|
if len(key) != KEY_LEN:
|
|
err(f"Master key must be {KEY_LEN} bytes ({KEY_LEN * 2} hex chars), "
|
|
f"got {len(key)}")
|
|
sys.exit(1)
|
|
info(f"Using provided master key: {key.hex()[:16]}...")
|
|
else:
|
|
key = secrets.token_bytes(KEY_LEN)
|
|
ok(f"Generated master key: {key.hex()[:16]}...")
|
|
|
|
return key
|
|
|
|
|
|
# ─── 5. NVS Binary Generation ────────────────────────────────────────────────
|
|
|
|
def generate_nvs_binary(master_key: bytes, tmpdir: str) -> str:
|
|
"""Generate factory NVS partition binary from master key."""
|
|
csv_path = os.path.join(tmpdir, "fctry.csv")
|
|
bin_path = os.path.join(tmpdir, "fctry.bin")
|
|
|
|
with open(csv_path, "w") as f:
|
|
f.write("key,type,encoding,value\n")
|
|
f.write(f"{NVS_NAMESPACE},namespace,,\n")
|
|
f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n")
|
|
|
|
# Try the Python module (pip install esp-idf-nvs-partition-gen)
|
|
generated = False
|
|
try:
|
|
r = subprocess.run(
|
|
[sys.executable, "-m", "esp_idf_nvs_partition_gen",
|
|
"generate", csv_path, bin_path, hex(FCTRY_SIZE)],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
generated = r.returncode == 0
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Fallback: IDF script
|
|
if not generated:
|
|
idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf"))
|
|
nvs_tool = os.path.join(
|
|
idf_path, "components", "nvs_flash",
|
|
"nvs_partition_generator", "nvs_partition_gen.py",
|
|
)
|
|
if os.path.exists(nvs_tool):
|
|
r = subprocess.run(
|
|
[sys.executable, nvs_tool, "generate",
|
|
csv_path, bin_path, hex(FCTRY_SIZE)],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
generated = r.returncode == 0
|
|
|
|
if not generated:
|
|
err("Failed to generate NVS binary. "
|
|
"Install: pip install esp-idf-nvs-partition-gen")
|
|
sys.exit(1)
|
|
|
|
size = os.path.getsize(bin_path)
|
|
ok(f"NVS binary generated: {size:,} bytes")
|
|
return bin_path
|
|
|
|
|
|
# ─── 6. Flash ────────────────────────────────────────────────────────────────
|
|
|
|
def esptool_cmd(port: str) -> List[str]:
|
|
"""Base esptool command."""
|
|
return ["esptool.py", "--chip", CHIP, "--port", port, "--baud", str(BAUD)]
|
|
|
|
|
|
def flash_erase(port: str):
|
|
"""Full flash erase."""
|
|
info("Erasing entire flash...")
|
|
try:
|
|
subprocess.run(esptool_cmd(port) + ["erase_flash"],
|
|
check=True, timeout=60)
|
|
ok("Flash erased")
|
|
except subprocess.CalledProcessError as e:
|
|
err(f"Erase failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def flash_all(cfg: DeviceConfig, binaries: Dict[str, str], nvs_bin: str,
|
|
erase: bool = False):
|
|
"""Flash bootloader + partition table + factory NVS + app (+ otadata)."""
|
|
if erase:
|
|
flash_erase(cfg.port)
|
|
|
|
# Build flash map
|
|
flash_map = [
|
|
(ADDR_BOOTLOADER, binaries["bootloader"], "Bootloader"),
|
|
(ADDR_PTABLE, binaries["partition_table"], "Partition table"),
|
|
(ADDR_FCTRY, nvs_bin, "Factory NVS (master key)"),
|
|
(ADDR_OTA_0, binaries["app"], "Application (ota_0)"),
|
|
]
|
|
if "otadata" in binaries:
|
|
flash_map.append((ADDR_OTADATA, binaries["otadata"], "OTA data"))
|
|
|
|
# Display map
|
|
box([f"{label:25s} @ {hex(addr)}" for addr, _, label in flash_map])
|
|
|
|
# Build esptool args
|
|
cmd = esptool_cmd(cfg.port) + ["write_flash", "-z"]
|
|
for addr, path, _ in flash_map:
|
|
cmd.extend([hex(addr), path])
|
|
|
|
info("Flashing all partitions...")
|
|
try:
|
|
subprocess.run(cmd, check=True, timeout=120)
|
|
ok("All partitions flashed successfully")
|
|
except subprocess.CalledProcessError as e:
|
|
err(f"Flash failed: {e}")
|
|
sys.exit(1)
|
|
except FileNotFoundError:
|
|
err("esptool.py not found. Install: pip install esptool")
|
|
sys.exit(1)
|
|
|
|
|
|
def flash_fctry_only(port: str, nvs_bin: str):
|
|
"""Flash only the factory NVS partition (provision-only mode)."""
|
|
info(f"Flashing factory NVS to {port} at {hex(ADDR_FCTRY)}...")
|
|
try:
|
|
subprocess.run(
|
|
esptool_cmd(port) + ["write_flash", hex(ADDR_FCTRY), nvs_bin],
|
|
check=True, timeout=60,
|
|
)
|
|
ok("Factory NVS flashed")
|
|
except subprocess.CalledProcessError as e:
|
|
err(f"Flash failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
# ─── 7. Keystore ─────────────────────────────────────────────────────────────
|
|
|
|
def update_keystore(device_id: str, master_key: bytes, keystore_path: str):
|
|
"""Add/update device key in the C2 keystore (keys.json)."""
|
|
keys: Dict[str, str] = {}
|
|
if os.path.exists(keystore_path):
|
|
try:
|
|
with open(keystore_path) as f:
|
|
keys = json.load(f)
|
|
except (json.JSONDecodeError, ValueError):
|
|
warn(f"Corrupted keystore, starting fresh: {keystore_path}")
|
|
|
|
keys[device_id] = master_key.hex()
|
|
|
|
os.makedirs(os.path.dirname(os.path.abspath(keystore_path)), exist_ok=True)
|
|
with open(keystore_path, "w") as f:
|
|
json.dump(keys, f, indent=2)
|
|
ok(f"Keystore updated: {keystore_path}")
|
|
ok(f" {device_id} -> {master_key.hex()[:16]}...")
|
|
|
|
|
|
# ─── 8. Serial Monitor ───────────────────────────────────────────────────────
|
|
|
|
def monitor_serial(port: str, idf_path: str):
|
|
"""Launch idf.py monitor for post-flash verification."""
|
|
info(f"Starting serial monitor on {port} (Ctrl+] to exit)")
|
|
try:
|
|
subprocess.run(
|
|
["bash", "-c",
|
|
f". {idf_path}/export.sh > /dev/null 2>&1 && "
|
|
f"idf.py -C {PROJECT_DIR} -p {port} monitor"],
|
|
)
|
|
except KeyboardInterrupt:
|
|
print()
|
|
ok("Monitor closed")
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Interactive Wizard
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def detect_serial_ports() -> List[str]:
|
|
"""Detect available serial ports on Linux."""
|
|
return sorted(
|
|
_glob("/dev/ttyUSB*") + _glob("/dev/ttyACM*")
|
|
)
|
|
|
|
|
|
def ask(msg: str, default: str = "") -> str:
|
|
"""Prompt with optional default."""
|
|
if default:
|
|
val = input(f" {C.B}{msg}{C.RST} [{C.DIM}{default}{C.RST}]: ").strip()
|
|
return val or default
|
|
return input(f" {C.B}{msg}{C.RST}: ").strip()
|
|
|
|
|
|
def ask_yn(msg: str, default: bool = True) -> bool:
|
|
"""Yes/no prompt."""
|
|
hint = "Y/n" if default else "y/N"
|
|
val = input(f" {C.B}{msg}{C.RST} [{hint}]: ").strip().lower()
|
|
if not val:
|
|
return default
|
|
return val in ("y", "yes", "o", "oui")
|
|
|
|
|
|
def ask_choice(msg: str, choices: List[str], default: int = 0) -> int:
|
|
"""Numbered choice prompt."""
|
|
print(f"\n {C.B}{msg}{C.RST}")
|
|
for i, c in enumerate(choices):
|
|
marker = f"{C.GRN}>{C.RST}" if i == default else " "
|
|
print(f" {marker} {i + 1}) {c}")
|
|
val = input(f" Choice [{default + 1}]: ").strip()
|
|
if not val:
|
|
return default
|
|
try:
|
|
idx = int(val) - 1
|
|
if 0 <= idx < len(choices):
|
|
return idx
|
|
except ValueError:
|
|
pass
|
|
return default
|
|
|
|
|
|
def interactive_wizard() -> DeviceConfig:
|
|
"""Guided interactive setup."""
|
|
cfg = DeviceConfig()
|
|
|
|
print(f"\n {C.B}{C.CYN}--- Device ---{C.RST}\n")
|
|
|
|
# Port
|
|
ports = detect_serial_ports()
|
|
if ports:
|
|
info(f"Detected ports: {', '.join(ports)}")
|
|
cfg.port = ask("Serial port", ports[0])
|
|
else:
|
|
cfg.port = ask("Serial port (e.g. /dev/ttyUSB0)")
|
|
|
|
# Device ID
|
|
rand_id = secrets.token_hex(4)
|
|
cfg.device_id = ask("Device ID (8 hex chars)", rand_id)
|
|
|
|
# Network
|
|
mode = ask_choice("Network mode:", ["WiFi", "GPRS"], 0)
|
|
cfg.network_mode = "wifi" if mode == 0 else "gprs"
|
|
|
|
if cfg.network_mode == "wifi":
|
|
cfg.wifi_ssid = ask("WiFi SSID")
|
|
cfg.wifi_pass = ask("WiFi password")
|
|
else:
|
|
cfg.gprs_apn = ask("GPRS APN", "sl2sfr")
|
|
|
|
# Hostname
|
|
cfg.hostname = ask("Device hostname (empty = random)", "")
|
|
|
|
print(f"\n {C.B}{C.CYN}--- C2 Server ---{C.RST}\n")
|
|
|
|
cfg.srv_ip = ask("Server IP", "192.168.1.100")
|
|
cfg.srv_port = int(ask("Server port", "2626"))
|
|
|
|
print(f"\n {C.B}{C.CYN}--- Modules ---{C.RST}\n")
|
|
|
|
cfg.mod_network = ask_yn("Enable network module (ping, arp, proxy, dos)?", True)
|
|
cfg.mod_fakeap = ask_yn("Enable fakeAP module (captive portal, sniffer)?", False)
|
|
cfg.mod_honeypot = ask_yn("Enable honeypot module (SSH/Telnet/HTTP/FTP, WiFi/net monitor)?", False)
|
|
cfg.mod_recon = ask_yn("Enable recon module?", False)
|
|
|
|
if cfg.mod_recon:
|
|
cfg.recon_camera = ask_yn(" Enable camera?", False)
|
|
cfg.recon_ble_trilat = ask_yn(" Enable BLE trilateration?", False)
|
|
|
|
cfg.mod_fallback = ask_yn("Enable fallback module (autonomous WiFi recovery)?", False)
|
|
cfg.ota_enabled = ask_yn("Enable OTA updates?", True)
|
|
|
|
print(f"\n {C.B}{C.CYN}--- Security ---{C.RST}\n")
|
|
|
|
if ask_yn("Generate new random 256-bit master key?", True):
|
|
cfg.master_key_hex = ""
|
|
else:
|
|
cfg.master_key_hex = ask("Master key (64 hex chars)")
|
|
|
|
return cfg
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Deploy Pipeline
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def deploy_one(cfg: DeviceConfig, *,
|
|
build: bool = True,
|
|
flash: bool = True,
|
|
provision_only: bool = False,
|
|
erase: bool = False,
|
|
monitor: bool = False,
|
|
keystore_path: str = ""):
|
|
"""Full deploy pipeline for a single device."""
|
|
|
|
ks_path = keystore_path or str(KEYSTORE_DEFAULT)
|
|
|
|
# Summary
|
|
net_info = (f"{cfg.network_mode.upper()} "
|
|
+ (f"({cfg.wifi_ssid})" if cfg.network_mode == "wifi"
|
|
else f"({cfg.gprs_apn})"))
|
|
mods = ", ".join(filter(None, [
|
|
"network" if cfg.mod_network else None,
|
|
"fakeAP" if cfg.mod_fakeap else None,
|
|
"honeypot" if cfg.mod_honeypot else None,
|
|
"recon" if cfg.mod_recon else None,
|
|
"fallback" if cfg.mod_fallback else None,
|
|
"redteam" if cfg.mod_redteam else None,
|
|
"canbus" if cfg.mod_canbus else None,
|
|
])) or "none"
|
|
|
|
box([
|
|
f"Device ID : {cfg.device_id}",
|
|
f"Port : {cfg.port}",
|
|
f"Network : {net_info}",
|
|
f"C2 Server : {cfg.srv_ip}:{cfg.srv_port}",
|
|
f"Modules : {mods}",
|
|
f"OTA : {'yes' if cfg.ota_enabled else 'no'}",
|
|
f"Erase : {'yes' if erase else 'no'}",
|
|
f"Mode : {'provision-only' if provision_only else 'full deploy'}",
|
|
])
|
|
|
|
with tempfile.TemporaryDirectory(prefix="epsilon-deploy-") as tmpdir:
|
|
|
|
# ── Step 1: Environment ──
|
|
step_header(1, "Environment Check")
|
|
idf_path = check_environment()
|
|
|
|
binaries: Dict[str, str] = {}
|
|
|
|
if not provision_only:
|
|
# ── Step 2: sdkconfig ──
|
|
step_header(2, "Generate Configuration")
|
|
write_sdkconfig(cfg)
|
|
|
|
# ── Step 3: Build or locate ──
|
|
if build:
|
|
step_header(3, "Build Firmware")
|
|
binaries = build_firmware(idf_path)
|
|
else:
|
|
step_header(3, "Locate Existing Binaries")
|
|
binaries = locate_binaries()
|
|
|
|
# ── Step 4: Crypto ──
|
|
step_header(4, "Master Key")
|
|
master_key = generate_master_key(cfg)
|
|
|
|
# ── Step 5: NVS ──
|
|
step_header(5, "Generate Factory NVS Partition")
|
|
nvs_bin = generate_nvs_binary(master_key, tmpdir)
|
|
|
|
# ── Step 6: Flash ──
|
|
if flash:
|
|
step_header(6, "Flash Device")
|
|
if provision_only:
|
|
flash_fctry_only(cfg.port, nvs_bin)
|
|
else:
|
|
flash_all(cfg, binaries, nvs_bin, erase=erase)
|
|
|
|
# ── Step 7: Keystore ──
|
|
step_header(7, "Update C2 Keystore")
|
|
update_keystore(cfg.device_id, master_key, ks_path)
|
|
|
|
# ── Done ──
|
|
print(f"\n {C.GRN}{C.B}{'=' * 50}")
|
|
print(f" Deploy complete for {cfg.device_id}")
|
|
print(f" {'=' * 50}{C.RST}\n")
|
|
|
|
# ── Optional monitor ──
|
|
if monitor and flash:
|
|
step_header("*", "Serial Monitor")
|
|
monitor_serial(cfg.port, idf_path)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# CLI / Entry Point
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def config_from_dict(d: dict, defaults: Optional[dict] = None) -> DeviceConfig:
|
|
"""Create DeviceConfig from a JSON dict with optional defaults.
|
|
|
|
Supports both flat format (legacy) and nested format:
|
|
Flat: {"device_id": "x", "srv_ip": "...", "module_network": true, ...}
|
|
Nested: {"device_id": "x", "server": {"ip": "..."}, "modules": {"network": true}, ...}
|
|
|
|
When defaults is provided, device-level keys override defaults.
|
|
"""
|
|
dfl = defaults or {}
|
|
|
|
# Merge nested sections: defaults <- device overrides
|
|
def merged(section: str) -> dict:
|
|
base = dict(dfl.get(section, {}))
|
|
base.update(d.get(section, {}))
|
|
return base
|
|
|
|
server = merged("server")
|
|
network = merged("network")
|
|
modules = merged("modules")
|
|
ota = merged("ota")
|
|
|
|
return DeviceConfig(
|
|
device_id=d.get("device_id", secrets.token_hex(4)),
|
|
port=d.get("port", dfl.get("port", "")),
|
|
hostname=d.get("hostname", dfl.get("hostname", "")),
|
|
# Server
|
|
srv_ip=server.get("ip", d.get("srv_ip", "192.168.1.100")),
|
|
srv_port=server.get("port", d.get("srv_port", 2626)),
|
|
# Network
|
|
network_mode=network.get("mode", d.get("network_mode", "wifi")),
|
|
wifi_ssid=network.get("wifi_ssid", d.get("wifi_ssid", "")),
|
|
wifi_pass=network.get("wifi_pass", d.get("wifi_pass", "")),
|
|
gprs_apn=network.get("gprs_apn", d.get("gprs_apn", "sl2sfr")),
|
|
# Modules
|
|
mod_network=modules.get("network", d.get("module_network", True)),
|
|
mod_recon=modules.get("recon", d.get("module_recon", False)),
|
|
mod_fakeap=modules.get("fakeap", d.get("module_fakeap", False)),
|
|
mod_honeypot=modules.get("honeypot", d.get("module_honeypot", False)),
|
|
mod_canbus=modules.get("canbus", d.get("module_canbus", False)),
|
|
mod_fallback=modules.get("fallback", d.get("module_fallback", False)),
|
|
recon_camera=modules.get("recon_camera", d.get("recon_camera", False)),
|
|
recon_ble_trilat=modules.get("recon_ble_trilat", d.get("recon_ble_trilat", False)),
|
|
# OTA
|
|
ota_enabled=ota.get("enabled", d.get("ota_enabled", True)),
|
|
ota_allow_http=ota.get("allow_http", d.get("ota_allow_http", False)),
|
|
# Security
|
|
master_key_hex=d.get("master_key", dfl.get("master_key", "")),
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Epsilon Deploy - Unified build, provision & flash",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Interactive wizard (recommended for first use)
|
|
python deploy.py
|
|
|
|
# Full deploy with WiFi
|
|
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
|
|
--wifi MySSID MyPass --srv 192.168.1.100
|
|
|
|
# Full deploy with GPRS
|
|
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
|
|
--gprs sl2sfr --srv 203.0.113.10
|
|
|
|
# Provision only (generate key + flash NVS, no rebuild)
|
|
python deploy.py --provision-only -p /dev/ttyUSB0 -d abc12345
|
|
|
|
# Flash existing build (skip rebuild)
|
|
python deploy.py --flash-only -p /dev/ttyUSB0 -d abc12345 \\
|
|
--wifi MySSID MyPass --srv 192.168.1.100
|
|
|
|
# Clean install (erase flash first)
|
|
python deploy.py --erase -p /dev/ttyUSB0 -d abc12345 \\
|
|
--wifi MySSID MyPass --srv 192.168.1.100
|
|
|
|
# Deploy + watch serial output
|
|
python deploy.py --monitor -p /dev/ttyUSB0 -d abc12345 \\
|
|
--wifi MySSID MyPass --srv 192.168.1.100
|
|
|
|
# Batch deploy from config file
|
|
python deploy.py --config deploy.json
|
|
""",
|
|
)
|
|
|
|
# Device
|
|
parser.add_argument("-d", "--device-id",
|
|
help="Device ID (default: random 8 hex)")
|
|
parser.add_argument("-p", "--port",
|
|
help="Serial port (e.g. /dev/ttyUSB0)")
|
|
|
|
# Network (mutually exclusive)
|
|
net = parser.add_mutually_exclusive_group()
|
|
net.add_argument("--wifi", nargs=2, metavar=("SSID", "PASS"),
|
|
help="WiFi mode with SSID and password")
|
|
net.add_argument("--gprs", metavar="APN", nargs="?", const="sl2sfr",
|
|
help="GPRS mode (default APN: sl2sfr)")
|
|
|
|
# Server
|
|
parser.add_argument("--srv", metavar="IP", help="C2 server IP")
|
|
parser.add_argument("--srv-port", type=int, default=2626)
|
|
|
|
# Modules
|
|
parser.add_argument("--mod-fakeap", action="store_true", default=False,
|
|
help="Enable fakeAP module")
|
|
parser.add_argument("--mod-honeypot", action="store_true", default=False,
|
|
help="Enable honeypot module (TCP services, WiFi/net monitors)")
|
|
parser.add_argument("--mod-recon", action="store_true", default=False,
|
|
help="Enable recon module")
|
|
parser.add_argument("--recon-camera", action="store_true", default=False,
|
|
help="Enable camera")
|
|
parser.add_argument("--recon-ble", action="store_true", default=False,
|
|
help="Enable BLE trilateration")
|
|
parser.add_argument("--mod-fallback", action="store_true", default=False,
|
|
help="Enable fallback module (autonomous WiFi recovery)")
|
|
parser.add_argument("--mod-redteam", action="store_true", default=False,
|
|
help="Enable red team module (offensive operations)")
|
|
parser.add_argument("--no-network", action="store_true", default=False,
|
|
help="Disable network module")
|
|
parser.add_argument("--no-ota", action="store_true", default=False,
|
|
help="Disable OTA module")
|
|
parser.add_argument("--ota-http", action="store_true", default=False,
|
|
help="Allow OTA over plain HTTP (insecure, local networks)")
|
|
|
|
# Security
|
|
parser.add_argument("--key", metavar="HEX",
|
|
help="Master key as 64 hex chars (default: random)")
|
|
parser.add_argument("--keystore", type=Path, default=None,
|
|
help=f"C2 keystore path (default: {KEYSTORE_DEFAULT})")
|
|
|
|
# Operating modes
|
|
parser.add_argument("--provision-only", action="store_true",
|
|
help="Only generate key and flash factory NVS")
|
|
parser.add_argument("--flash-only", action="store_true",
|
|
help="Flash existing build (skip rebuild)")
|
|
parser.add_argument("--build-only", action="store_true",
|
|
help="Build only (no flash, no provision)")
|
|
parser.add_argument("--erase", action="store_true",
|
|
help="Erase entire flash before writing")
|
|
parser.add_argument("--monitor", action="store_true",
|
|
help="Start serial monitor after flash")
|
|
parser.add_argument("--hostname",
|
|
help="Device hostname (default: random)")
|
|
|
|
# Config file
|
|
parser.add_argument("--config", type=Path,
|
|
help="Load device configs from JSON file")
|
|
|
|
args = parser.parse_args()
|
|
|
|
banner()
|
|
|
|
# ── Config file mode ──
|
|
if args.config:
|
|
with open(args.config) as f:
|
|
data = json.load(f)
|
|
defaults = data.get("defaults", {})
|
|
devices = data.get("devices", [data])
|
|
total = len(devices)
|
|
ok_count = 0
|
|
for i, d in enumerate(devices, 1):
|
|
print(f"\n {C.B}### Device {i}/{total}: {d.get('device_id', '?')} ###{C.RST}")
|
|
cfg = config_from_dict(d, defaults)
|
|
try:
|
|
deploy_one(
|
|
cfg,
|
|
build=not args.flash_only,
|
|
flash=not args.build_only,
|
|
provision_only=args.provision_only,
|
|
erase=args.erase,
|
|
monitor=args.monitor,
|
|
keystore_path=str(args.keystore) if args.keystore else "",
|
|
)
|
|
ok_count += 1
|
|
except SystemExit:
|
|
err(f"Failed for {cfg.device_id}, continuing...")
|
|
print(f"\n {C.B}Results: {ok_count}/{total} succeeded{C.RST}")
|
|
return 0 if ok_count == total else 1
|
|
|
|
# ── Interactive mode (no args) ──
|
|
if not args.device_id and not args.port and not args.provision_only:
|
|
cfg = interactive_wizard()
|
|
print()
|
|
if not ask_yn("Proceed with deployment?", True):
|
|
print(" Aborted.")
|
|
return 1
|
|
deploy_one(
|
|
cfg,
|
|
build=not args.flash_only,
|
|
flash=not args.build_only,
|
|
erase=args.erase,
|
|
monitor=args.monitor,
|
|
keystore_path=str(args.keystore) if args.keystore else "",
|
|
)
|
|
return 0
|
|
|
|
# ── CLI mode ──
|
|
if not args.port:
|
|
err("--port / -p is required in CLI mode")
|
|
return 1
|
|
|
|
cfg = DeviceConfig(
|
|
device_id=args.device_id or secrets.token_hex(4),
|
|
port=args.port,
|
|
srv_ip=args.srv or "192.168.1.100",
|
|
srv_port=args.srv_port,
|
|
network_mode="gprs" if args.gprs else "wifi",
|
|
wifi_ssid=args.wifi[0] if args.wifi else "",
|
|
wifi_pass=args.wifi[1] if args.wifi else "",
|
|
gprs_apn=args.gprs if args.gprs else "sl2sfr",
|
|
hostname=args.hostname or "",
|
|
mod_network=not args.no_network,
|
|
mod_fakeap=args.mod_fakeap,
|
|
mod_honeypot=args.mod_honeypot,
|
|
mod_recon=args.mod_recon,
|
|
mod_fallback=args.mod_fallback,
|
|
mod_redteam=args.mod_redteam,
|
|
recon_camera=args.recon_camera,
|
|
recon_ble_trilat=args.recon_ble,
|
|
ota_enabled=not args.no_ota,
|
|
ota_allow_http=args.ota_http,
|
|
master_key_hex=args.key or "",
|
|
)
|
|
|
|
# Validate WiFi (unless provision-only which doesn't need it)
|
|
if (cfg.network_mode == "wifi"
|
|
and not args.provision_only
|
|
and not args.build_only
|
|
and (not cfg.wifi_ssid or not cfg.wifi_pass)):
|
|
err("WiFi mode requires --wifi SSID PASS")
|
|
return 1
|
|
|
|
deploy_one(
|
|
cfg,
|
|
build=not args.flash_only and not args.provision_only,
|
|
flash=not args.build_only,
|
|
provision_only=args.provision_only,
|
|
erase=args.erase,
|
|
monitor=args.monitor,
|
|
keystore_path=str(args.keystore) if args.keystore else "",
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|