diff --git a/tools/README.md b/tools/README.md index 8922bf1..3f78f6e 100644 --- a/tools/README.md +++ b/tools/README.md @@ -1,197 +1,168 @@ # Epsilon Tools -This directory contains tools for managing and deploying Epsilon ESP32 agents. - -## C2 Server (C3PO/) - -The C2 (Command & Control) server manages communication with deployed ESP32 agents. - -### C3PO - Main C2 Server - -**C3PO** is the primary C2 server used to control Epsilon bots. - -Features: - -- Threaded TCP server (sockets + threads) -- Device registry and management with per-device crypto -- Group-based device organization -- Encrypted communications (ChaCha20-Poly1305 AEAD + HKDF key derivation) -- Per-device master key keystore (`keys.json`) -- Interactive CLI interface -- Optional TUI (Textual) and Web dashboard -- Camera UDP receiver + MLAT support -- Command dispatching to individual devices, groups, or all - -See [C3PO/README.md](C3PO/README.md) for complete C2 documentation. - -Quick start: - -```bash -cd C3PO -python3 c3po.py +``` +tools/ + deploy.py Unified build, provision & flash pipeline + deploy.example.json Batch config template + C3PO/ C2 server (TCP + TUI + Web) + nanoPB/ Protobuf definitions (c2.proto) ``` -Authors: **@off-path**, **@eun0us** +## Prerequisites -## Multi-Device Flasher (flasher/) +- Python 3.8+ +- ESP-IDF v5.3.2 (`source ~/esp-idf/export.sh`) +- esptool (`pip install esptool`) -The **flasher** tool automates building and flashing multiple ESP32 devices with custom configurations. +--- -### Features +## Deploy (`deploy.py`) -- Batch processing of multiple devices -- Support for WiFi and GPRS modes -- Per-device configuration (ID, network, modules) -- Automatic hostname randomization -- Build-only and flash-only modes -- Full module configuration (Network, Recon, FakeAP) +Single pipeline: **build** firmware, **generate** crypto keys, **provision** factory NVS, **flash** all partitions, **register** keys in C2 keystore. -### Quick Start +### Usage -1. Edit [flasher/devices.json](flasher/devices.json): +```bash +# Interactive wizard +python deploy.py + +# Single device (WiFi + OTA) +python deploy.py -p /dev/ttyUSB0 -d ce4f626b \ + --wifi MySSID MyPass --srv 192.168.1.51 + +# Single device (WiFi, no OTA) +python deploy.py -p /dev/ttyUSB0 -d a91dd021 --no-ota \ + --wifi MySSID MyPass --srv 192.168.1.51 + +# Batch deploy +python deploy.py --config deploy.example.json +``` + +### Modes + +| Flag | Effect | +|------|--------| +| *(default)* | Full pipeline: build + provision + flash | +| `--provision-only` | Generate key + flash factory NVS only | +| `--flash-only` | Flash existing build (skip rebuild) | +| `--build-only` | Build firmware only (no flash) | +| `--erase` | Erase entire flash before writing | +| `--monitor` | Open serial monitor after flash | + +### OTA vs Non-OTA + +| | OTA | Non-OTA (`--no-ota`) | +|---|---|---| +| Partition table | `partitions.csv` | `partitions_noota.csv` | +| App partitions | 2 x 1.875 MB (ota_0/ota_1) | 1 x 3.875 MB (factory) | +| Firmware updates | HTTPS OTA | Manual reflash | +| mbedTLS cert bundle | Yes | No | + +### Flash Map + +``` +Offset OTA Non-OTA +------ --- ------- +0x1000 bootloader bootloader +0x8000 partition table partition table +0xD000 ota data -- +0x10000 factory NVS (key) factory NVS (key) +0x20000 app (ota_0) app (factory, 3.875 MB) +``` + +### Batch Config + +`deploy.example.json`: ```json - { - "project": "/home/user/epsilon/espilon_bot", - "devices": [ - { - "device_id": "ce4f626b", - "port": "/dev/ttyUSB0", - "srv_ip": "192.168.1.13", - "srv_port": 2626, - "network_mode": "wifi", - "wifi_ssid": "YourWiFi", - "wifi_pass": "YourPassword", - "module_network": true, - "module_recon": false, - "module_fakeap": false - } - ] - } +{ + "devices": [ + { + "device_id": "ce4f626b", + "port": "/dev/ttyUSB0", + "srv_ip": "192.168.1.51", + "srv_port": 2626, + "network_mode": "wifi", + "wifi_ssid": "MyWiFi", + "wifi_pass": "MyPassword", + "module_network": true, + "ota_enabled": true + }, + { + "device_id": "a91dd021", + "port": "/dev/ttyUSB1", + "srv_ip": "192.168.1.51", + "srv_port": 2626, + "network_mode": "wifi", + "wifi_ssid": "MyWiFi", + "wifi_pass": "MyPassword", + "module_network": true, + "module_fakeap": true, + "ota_enabled": false + } + ] +} ``` -2. Flash all devices: +### Config Fields + +| Field | Default | Description | +|-------|---------|-------------| +| `device_id` | random | 8 hex chars unique ID | +| `port` | -- | Serial port | +| `srv_ip` | `192.168.1.100` | C2 server IP | +| `srv_port` | `2626` | C2 server port | +| `network_mode` | `wifi` | `wifi` or `gprs` | +| `wifi_ssid` | -- | WiFi SSID | +| `wifi_pass` | -- | WiFi password | +| `gprs_apn` | `sl2sfr` | GPRS APN | +| `hostname` | random | Device hostname on network | +| `module_network` | `true` | ping, arp, proxy, dos | +| `module_fakeap` | `false` | Fake AP, captive portal, sniffer | +| `module_recon` | `false` | Reconnaissance | +| `recon_camera` | `false` | ESP32-CAM | +| `recon_ble_trilat` | `false` | BLE trilateration | +| `ota_enabled` | `true` | OTA firmware updates | +| `master_key` | random | 64 hex chars (override auto-gen) | + +### Crypto + +Each deploy generates a **256-bit master key** per device: + +1. Random 32-byte key generated (or provided via `--key`) +2. Written to factory NVS (`fctry` @ `0x10000`, namespace `crypto`, key `master_key`) +3. Registered in `C3PO/keys.json` +4. On boot, firmware derives encryption key via **HKDF-SHA256** (salt=device_id, info=`espilon-c2-v1`) +5. All C2 traffic encrypted with **ChaCha20-Poly1305 AEAD** (12-byte nonce + 16-byte tag) + +--- + +## C2 Server (`C3PO/`) + +Command & Control server for deployed ESP32 agents. + +- Threaded TCP server with per-device encrypted communications +- ChaCha20-Poly1305 AEAD + HKDF key derivation +- Device registry + master key keystore (`keys.json`) +- TUI (Textual) + Web dashboard +- Camera UDP receiver + MLAT support +- Command dispatch: single device, group, or broadcast ```bash -cd flasher -python3 flash.py --config devices.json +cd C3PO && python3 c3po.py ``` -### Configuration Options +See [C3PO/README.md](C3PO/README.md) for details. -Each device supports: +--- -| Field | Description | -|-------|-------------| -| `device_id` | Unique device identifier (8 hex chars) | -| `port` | Serial port (e.g., `/dev/ttyUSB0`) | -| `srv_ip` | C2 server IP address | -| `srv_port` | C2 server port (default: 2626) | -| `network_mode` | `"wifi"` or `"gprs"` | -| `wifi_ssid` | WiFi SSID (WiFi mode) | -| `wifi_pass` | WiFi password (WiFi mode) | -| `gprs_apn` | GPRS APN (GPRS mode, default: "sl2sfr") | -| `hostname` | Network hostname (random if not set) | -| `module_network` | Enable network commands (default: true) | -| `module_recon` | Enable reconnaissance module | -| `module_fakeap` | Enable fake AP module | -| `recon_camera` | Enable camera reconnaissance (ESP32-CAM) | -| `recon_ble_trilat` | Enable BLE trilateration | +## Proto Definitions (`nanoPB/`) -> **Note**: Crypto keys are no longer configured here. Each device must be provisioned with a unique master key using `tools/provisioning/provision.py`. +nanoPB protobuf definitions for ESP32 <-> C2 wire protocol. -### Hostname Randomization +- `c2.proto` -- `Command` and `AgentMessage` messages +- `c2.options` -- nanoPB field size constraints -The flasher automatically randomizes device hostnames to blend in on networks: +--- -- iPhone models (iPhone-15-pro-max, iPhone-14, etc.) -- Android devices (galaxy-s24-ultra, pixel-8-pro, xiaomi-14, etc.) -- Windows PCs (DESKTOP-XXXXXXX) - -This helps devices appear as legitimate consumer electronics during authorized security testing. - -### Manual Mode - -Flash a single device without a config file: - -```bash -# WiFi mode -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id abc12345 \ - --port /dev/ttyUSB0 \ - --srv-ip 192.168.1.100 \ - --wifi-ssid MyWiFi \ - --wifi-pass MyPassword - -# GPRS mode -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id def67890 \ - --port /dev/ttyUSB1 \ - --srv-ip 203.0.113.10 \ - --network-mode gprs \ - --gprs-apn sl2sfr -``` - -### Build-Only Mode - -Generate firmware without flashing: - -```bash -python3 flash.py --config devices.json --build-only -``` - -Firmware saved to: `espilon_bot/firmware/.bin` - -### Flash-Only Mode - -Flash pre-built firmware: - -```bash -python3 flash.py --config devices.json --flash-only -``` - -See [flasher/README.md](flasher/README.md) for complete documentation. - -## Device Provisioning (provisioning/) - -The **provisioning** tool generates and flashes unique per-device master keys into factory NVS partitions. - -### Features - -- Generates 32-byte random master keys (cryptographically secure) -- Creates NVS binary for factory partition (`fctry` at offset 0x10000) -- Saves keys to C2 keystore (`keys.json`) for automatic lookup -- Supports flashing directly to connected ESP32 - -### Quick Start - -```bash -cd provisioning -python3 provision.py --device-id my-device --port /dev/ttyUSB0 -``` - -The master key is used by the firmware with HKDF-SHA256 to derive encryption keys for ChaCha20-Poly1305 AEAD. - -## NanoPB Tools (nan/) - -Tools for Protocol Buffers (nanoPB) code generation for the embedded communication protocol. - -Used during development to regenerate Protocol Buffer bindings for ESP32 and Python. - -## Additional Resources - -- [Installation Guide](../docs/INSTALL.md) - Full Epsilon setup -- [Hardware Guide](../docs/HARDWARE.md) - Supported boards -- [Module API](../docs/MODULES.md) - Available commands -- [Protocol Specification](../docs/PROTOCOL.md) - C2 protocol details -- [Security](../docs/SECURITY.md) - Security best practices - -## Contributing - -See [CONTRIBUTING.md](../CONTRIBUTING.md) for guidelines on contributing to Epsilon tools. - -## License - -Part of the Epsilon project. See [LICENSE](../LICENSE) for details. +**Authors:** @off-path, @eun0us diff --git a/tools/deploy.py b/tools/deploy.py new file mode 100755 index 0000000..be47e97 --- /dev/null +++ b/tools/deploy.py @@ -0,0 +1,994 @@ +#!/usr/bin/env python3 +""" +Epsilon Deploy - Unified build, provision & flash pipeline for ESP32. + +Combines firmware building, crypto key provisioning, and flashing +into a single automated workflow with correct partition offsets. + +Usage: + python deploy.py # Interactive wizard + python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ + --wifi MySSID MyPass --srv 192.168.1.100 # Quick CLI deploy + python deploy.py --config deploy.json # Batch from config + python deploy.py --provision-only -p /dev/ttyUSB0 -d x # Keys only +""" + +import argparse +import json +import os +import secrets +import shutil +import subprocess +import sys +import tempfile +from dataclasses import dataclass +from glob import glob as _glob +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +# ─── Paths ──────────────────────────────────────────────────────────────────── +SCRIPT_DIR = Path(__file__).resolve().parent +PROJECT_DIR = SCRIPT_DIR.parent / "espilon_bot" +KEYSTORE_DEFAULT = SCRIPT_DIR / "C3PO" / "keys.json" + +# ─── Partition layout (must match partitions.csv) ───────────────────────────── +CHIP = "esp32" +BAUD = 460800 +ADDR_BOOTLOADER = 0x1000 +ADDR_PTABLE = 0x8000 +ADDR_OTADATA = 0xD000 +ADDR_FCTRY = 0x10000 +ADDR_OTA_0 = 0x20000 + +FCTRY_SIZE = 0x6000 # 24 KB +NVS_NAMESPACE = "crypto" +NVS_KEY = "master_key" +KEY_LEN = 32 # 256-bit master key + + +# ─── Terminal helpers ───────────────────────────────────────────────────────── +class C: + RST = "\033[0m" + B = "\033[1m" + DIM = "\033[2m" + RED = "\033[91m" + GRN = "\033[92m" + YLW = "\033[93m" + BLU = "\033[94m" + CYN = "\033[96m" + + +def _tag(color: str, tag: str, msg: str): + print(f" {color}{C.B}[{tag}]{C.RST} {msg}") + + +def ok(m): _tag(C.GRN, " OK ", m) +def info(m): _tag(C.BLU, " .. ", m) +def warn(m): _tag(C.YLW, " !! ", m) +def err(m): _tag(C.RED, " XX ", m) + + +def step_header(n, m): + pad = max(0, 42 - len(m)) + print(f"\n {C.CYN}{C.B}--- Step {n}: {m} {'-' * pad}{C.RST}\n") + + +def banner(): + print(f"""\n{C.CYN}{C.B}\ + +==================================================+ + | E Epsilon Deploy Tool | + | Build - Provision - Flash - Verify | + +==================================================+{C.RST}\n""") + + +def box(lines: List[str]): + w = max((len(l) for l in lines), default=0) + 4 + print(f" {C.DIM}+{'-' * w}+{C.RST}") + for l in lines: + print(f" {C.DIM}|{C.RST} {l}{' ' * (w - len(l) - 2)}{C.DIM}|{C.RST}") + print(f" {C.DIM}+{'-' * w}+{C.RST}") + + +# ─── Device Config ──────────────────────────────────────────────────────────── +@dataclass +class DeviceConfig: + device_id: str = "" + port: str = "" + srv_ip: str = "192.168.1.100" + srv_port: int = 2626 + network_mode: str = "wifi" # "wifi" or "gprs" + wifi_ssid: str = "" + wifi_pass: str = "" + gprs_apn: str = "sl2sfr" + hostname: str = "" + mod_network: bool = True + mod_recon: bool = False + mod_fakeap: bool = False + mod_honeypot: bool = False + mod_canbus: bool = False + mod_fallback: bool = False + mod_redteam: bool = False + recon_camera: bool = False + recon_ble_trilat: bool = False + ota_enabled: bool = True + ota_allow_http: bool = False + master_key_hex: str = "" # empty = generate random + + +# ══════════════════════════════════════════════════════════════════════════════ +# Pipeline Steps +# ══════════════════════════════════════════════════════════════════════════════ + +# ─── 1. Environment Check ──────────────────────────────────────────────────── + +def check_environment() -> str: + """Validate ESP-IDF and tools. Returns idf_path.""" + # Find IDF + idf_path = os.environ.get("IDF_PATH", "") + if not idf_path or not os.path.isdir(idf_path): + for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]: + if os.path.isdir(candidate): + idf_path = candidate + break + if not idf_path or not os.path.isdir(idf_path): + err("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.") + sys.exit(1) + ok(f"ESP-IDF : {idf_path}") + + # Find esptool + esptool = shutil.which("esptool.py") or shutil.which("esptool") + if esptool: + ok(f"esptool : {esptool}") + else: + warn("esptool.py not in PATH -- will try via IDF export.sh") + + # Verify project directory + if not PROJECT_DIR.is_dir(): + err(f"Project directory not found: {PROJECT_DIR}") + sys.exit(1) + ok(f"Project : {PROJECT_DIR}") + + return idf_path + + +# ─── 2. sdkconfig Generation ───────────────────────────────────────────────── + +def generate_sdkconfig(cfg: DeviceConfig) -> str: + """Generate sdkconfig.defaults content for a device.""" + lines = [ + "# Generated by epsilon deploy -- do not edit manually", + "", + "# Device", + f'CONFIG_DEVICE_ID="{cfg.device_id}"', + "", + "# Network", + ] + + if cfg.network_mode == "wifi": + lines += [ + "CONFIG_NETWORK_WIFI=y", + f'CONFIG_WIFI_SSID="{cfg.wifi_ssid}"', + f'CONFIG_WIFI_PASS="{cfg.wifi_pass}"', + ] + else: + lines += [ + "CONFIG_NETWORK_GPRS=y", + f'CONFIG_GPRS_APN="{cfg.gprs_apn}"', + ] + + lines += [ + "", + "# C2 Server", + f'CONFIG_SERVER_IP="{cfg.srv_ip}"', + f"CONFIG_SERVER_PORT={cfg.srv_port}", + "", + "# Crypto -- ChaCha20-Poly1305 + HKDF (mbedtls, ESP-IDF v5.3)", + 'CONFIG_CRYPTO_FCTRY_NS="crypto"', + 'CONFIG_CRYPTO_FCTRY_KEY="master_key"', + "CONFIG_MBEDTLS_CHACHA20_C=y", + "CONFIG_MBEDTLS_POLY1305_C=y", + "CONFIG_MBEDTLS_CHACHAPOLY_C=y", + "CONFIG_MBEDTLS_HKDF_C=y", + "", + "# Flash & Partitions", + "CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y", + "CONFIG_PARTITION_TABLE_CUSTOM=y", + f'CONFIG_PARTITION_TABLE_CUSTOM_FILENAME=' + f'"{("partitions.csv" if cfg.ota_enabled else "partitions_noota.csv")}"', + "", + "# LWIP", + "CONFIG_LWIP_IPV4_NAPT=y", + "CONFIG_LWIP_IPV4_NAPT_PORTMAP=y", + "CONFIG_LWIP_IP_FORWARD=y", + ] + + if cfg.hostname: + lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{cfg.hostname}"') + + lines += [ + "", + "# Modules", + f'CONFIG_MODULE_NETWORK={"y" if cfg.mod_network else "n"}', + f'CONFIG_MODULE_RECON={"y" if cfg.mod_recon else "n"}', + f'CONFIG_MODULE_FAKEAP={"y" if cfg.mod_fakeap else "n"}', + f'CONFIG_MODULE_HONEYPOT={"y" if cfg.mod_honeypot else "n"}', + f'CONFIG_MODULE_CANBUS={"y" if cfg.mod_canbus else "n"}', + f'CONFIG_MODULE_FALLBACK={"y" if cfg.mod_fallback else "n"}', + f'CONFIG_MODULE_REDTEAM={"y" if cfg.mod_redteam else "n"}', + ] + + if cfg.mod_recon: + lines += [ + f'CONFIG_RECON_MODE_CAMERA={"y" if cfg.recon_camera else "n"}', + f'CONFIG_RECON_MODE_MLAT={"y" if cfg.recon_ble_trilat else "n"}', + ] + + if cfg.recon_camera: + lines += [ + "", + "# PSRAM (required for camera frame buffers)", + "CONFIG_SPIRAM=y", + "CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY=y", + ] + + if cfg.recon_ble_trilat: + lines += ["", "# BLE", "CONFIG_BT_ENABLED=y", + "CONFIG_BT_BLUEDROID_ENABLED=y", "CONFIG_BT_BLE_ENABLED=y"] + + lines += [ + "", + "# OTA", + f'CONFIG_ESPILON_OTA_ENABLED={"y" if cfg.ota_enabled else "n"}', + ] + + if cfg.ota_enabled: + if cfg.ota_allow_http: + lines += [ + f'CONFIG_ESPILON_OTA_ALLOW_HTTP=y', + "CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y", + ] + else: + lines += [ + "CONFIG_MBEDTLS_CERTIFICATE_BUNDLE=y", + "CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_FULL=y", + ] + + lines += [ + "", + "# Logging", + "CONFIG_ESPILON_LOG_LEVEL_INFO=y", + "CONFIG_ESPILON_LOG_BOOT_SUMMARY=y", + ] + + return "\n".join(lines) + "\n" + + +def write_sdkconfig(cfg: DeviceConfig): + """Write sdkconfig.defaults, backing up the existing one.""" + sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults" + + if sdkconfig_path.exists(): + backup = sdkconfig_path.with_suffix(".defaults.bak") + shutil.copy2(sdkconfig_path, backup) + info(f"Backed up existing config -> {backup.name}") + + content = generate_sdkconfig(cfg) + with open(sdkconfig_path, "w") as f: + f.write(content) + ok(f"Generated sdkconfig.defaults for {cfg.device_id}") + + +# ─── 3. Build Firmware ─────────────────────────────────────────────────────── + +def find_app_binary(build_dir: Path) -> str: + """Find the application binary in the build directory.""" + desc_file = build_dir / "project_description.json" + if desc_file.exists(): + with open(desc_file) as f: + desc = json.load(f) + app_bin = desc.get("app_bin", "") + if app_bin and os.path.exists(app_bin): + return app_bin + name = desc.get("project_name", "bot-lwip") + candidate = str(build_dir / f"{name}.bin") + if os.path.exists(candidate): + return candidate + + # Fallback: try known names + for name in ["bot-lwip.bin", "epsilon_bot.bin"]: + p = str(build_dir / name) + if os.path.exists(p): + return p + + err("Cannot find application binary in build/") + sys.exit(1) + + +def build_firmware(idf_path: str) -> Dict[str, str]: + """Build firmware via idf.py. Returns dict of binary paths.""" + # Full clean: remove sdkconfig AND build dir to avoid stale CMake cache + # (required when switching between OTA/non-OTA configs) + sdkconfig = PROJECT_DIR / "sdkconfig" + build_dir = PROJECT_DIR / "build" + if sdkconfig.exists(): + sdkconfig.unlink() + if build_dir.exists(): + shutil.rmtree(build_dir) + info("Cleaned build directory") + + info("Running idf.py build (this may take several minutes)...") + cmd = ( + f". {idf_path}/export.sh > /dev/null 2>&1 && " + f"idf.py -C {PROJECT_DIR} " + f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults " + f"build 2>&1" + ) + + result = subprocess.run( + ["bash", "-c", cmd], + capture_output=True, text=True, timeout=600, + ) + + if result.returncode != 0: + err("Build failed!") + lines = result.stdout.strip().split("\n") + for line in lines[-50:]: + print(f" {C.DIM}{line}{C.RST}") + sys.exit(1) + + ok("Build succeeded") + return locate_binaries() + + +def locate_binaries() -> Dict[str, str]: + """Locate all required binaries in build/.""" + build_dir = PROJECT_DIR / "build" + bins: Dict[str, str] = { + "bootloader": str(build_dir / "bootloader" / "bootloader.bin"), + "partition_table": str(build_dir / "partition_table" / "partition-table.bin"), + "app": find_app_binary(build_dir), + } + + ota_data = build_dir / "ota_data_initial.bin" + if ota_data.exists(): + bins["otadata"] = str(ota_data) + + for name, path in bins.items(): + if not os.path.exists(path): + err(f"Missing build artifact: {name} -> {path}") + sys.exit(1) + size = os.path.getsize(path) + ok(f"{name:16s} {path} ({size:,} bytes)") + + return bins + + +# ─── 4. Key Generation ─────────────────────────────────────────────────────── + +def generate_master_key(cfg: DeviceConfig) -> bytes: + """Generate or parse the 32-byte master key.""" + if cfg.master_key_hex: + try: + key = bytes.fromhex(cfg.master_key_hex) + except ValueError: + err("--key must be valid hex") + sys.exit(1) + if len(key) != KEY_LEN: + err(f"Master key must be {KEY_LEN} bytes ({KEY_LEN * 2} hex chars), " + f"got {len(key)}") + sys.exit(1) + info(f"Using provided master key: {key.hex()[:16]}...") + else: + key = secrets.token_bytes(KEY_LEN) + ok(f"Generated master key: {key.hex()[:16]}...") + + return key + + +# ─── 5. NVS Binary Generation ──────────────────────────────────────────────── + +def generate_nvs_binary(master_key: bytes, tmpdir: str) -> str: + """Generate factory NVS partition binary from master key.""" + csv_path = os.path.join(tmpdir, "fctry.csv") + bin_path = os.path.join(tmpdir, "fctry.bin") + + with open(csv_path, "w") as f: + f.write("key,type,encoding,value\n") + f.write(f"{NVS_NAMESPACE},namespace,,\n") + f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n") + + # Try the Python module (pip install esp-idf-nvs-partition-gen) + generated = False + try: + r = subprocess.run( + [sys.executable, "-m", "esp_idf_nvs_partition_gen", + "generate", csv_path, bin_path, hex(FCTRY_SIZE)], + capture_output=True, text=True, timeout=30, + ) + generated = r.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + # Fallback: IDF script + if not generated: + idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf")) + nvs_tool = os.path.join( + idf_path, "components", "nvs_flash", + "nvs_partition_generator", "nvs_partition_gen.py", + ) + if os.path.exists(nvs_tool): + r = subprocess.run( + [sys.executable, nvs_tool, "generate", + csv_path, bin_path, hex(FCTRY_SIZE)], + capture_output=True, text=True, timeout=30, + ) + generated = r.returncode == 0 + + if not generated: + err("Failed to generate NVS binary. " + "Install: pip install esp-idf-nvs-partition-gen") + sys.exit(1) + + size = os.path.getsize(bin_path) + ok(f"NVS binary generated: {size:,} bytes") + return bin_path + + +# ─── 6. Flash ──────────────────────────────────────────────────────────────── + +def esptool_cmd(port: str) -> List[str]: + """Base esptool command.""" + return ["esptool.py", "--chip", CHIP, "--port", port, "--baud", str(BAUD)] + + +def flash_erase(port: str): + """Full flash erase.""" + info("Erasing entire flash...") + try: + subprocess.run(esptool_cmd(port) + ["erase_flash"], + check=True, timeout=60) + ok("Flash erased") + except subprocess.CalledProcessError as e: + err(f"Erase failed: {e}") + sys.exit(1) + + +def flash_all(cfg: DeviceConfig, binaries: Dict[str, str], nvs_bin: str, + erase: bool = False): + """Flash bootloader + partition table + factory NVS + app (+ otadata).""" + if erase: + flash_erase(cfg.port) + + # Build flash map + flash_map = [ + (ADDR_BOOTLOADER, binaries["bootloader"], "Bootloader"), + (ADDR_PTABLE, binaries["partition_table"], "Partition table"), + (ADDR_FCTRY, nvs_bin, "Factory NVS (master key)"), + (ADDR_OTA_0, binaries["app"], "Application (ota_0)"), + ] + if "otadata" in binaries: + flash_map.append((ADDR_OTADATA, binaries["otadata"], "OTA data")) + + # Display map + box([f"{label:25s} @ {hex(addr)}" for addr, _, label in flash_map]) + + # Build esptool args + cmd = esptool_cmd(cfg.port) + ["write_flash", "-z"] + for addr, path, _ in flash_map: + cmd.extend([hex(addr), path]) + + info("Flashing all partitions...") + try: + subprocess.run(cmd, check=True, timeout=120) + ok("All partitions flashed successfully") + except subprocess.CalledProcessError as e: + err(f"Flash failed: {e}") + sys.exit(1) + except FileNotFoundError: + err("esptool.py not found. Install: pip install esptool") + sys.exit(1) + + +def flash_fctry_only(port: str, nvs_bin: str): + """Flash only the factory NVS partition (provision-only mode).""" + info(f"Flashing factory NVS to {port} at {hex(ADDR_FCTRY)}...") + try: + subprocess.run( + esptool_cmd(port) + ["write_flash", hex(ADDR_FCTRY), nvs_bin], + check=True, timeout=60, + ) + ok("Factory NVS flashed") + except subprocess.CalledProcessError as e: + err(f"Flash failed: {e}") + sys.exit(1) + + +# ─── 7. Keystore ───────────────────────────────────────────────────────────── + +def update_keystore(device_id: str, master_key: bytes, keystore_path: str): + """Add/update device key in the C2 keystore (keys.json).""" + keys: Dict[str, str] = {} + if os.path.exists(keystore_path): + try: + with open(keystore_path) as f: + keys = json.load(f) + except (json.JSONDecodeError, ValueError): + warn(f"Corrupted keystore, starting fresh: {keystore_path}") + + keys[device_id] = master_key.hex() + + os.makedirs(os.path.dirname(os.path.abspath(keystore_path)), exist_ok=True) + with open(keystore_path, "w") as f: + json.dump(keys, f, indent=2) + ok(f"Keystore updated: {keystore_path}") + ok(f" {device_id} -> {master_key.hex()[:16]}...") + + +# ─── 8. Serial Monitor ─────────────────────────────────────────────────────── + +def monitor_serial(port: str, idf_path: str): + """Launch idf.py monitor for post-flash verification.""" + info(f"Starting serial monitor on {port} (Ctrl+] to exit)") + try: + subprocess.run( + ["bash", "-c", + f". {idf_path}/export.sh > /dev/null 2>&1 && " + f"idf.py -C {PROJECT_DIR} -p {port} monitor"], + ) + except KeyboardInterrupt: + print() + ok("Monitor closed") + + +# ══════════════════════════════════════════════════════════════════════════════ +# Interactive Wizard +# ══════════════════════════════════════════════════════════════════════════════ + +def detect_serial_ports() -> List[str]: + """Detect available serial ports on Linux.""" + return sorted( + _glob("/dev/ttyUSB*") + _glob("/dev/ttyACM*") + ) + + +def ask(msg: str, default: str = "") -> str: + """Prompt with optional default.""" + if default: + val = input(f" {C.B}{msg}{C.RST} [{C.DIM}{default}{C.RST}]: ").strip() + return val or default + return input(f" {C.B}{msg}{C.RST}: ").strip() + + +def ask_yn(msg: str, default: bool = True) -> bool: + """Yes/no prompt.""" + hint = "Y/n" if default else "y/N" + val = input(f" {C.B}{msg}{C.RST} [{hint}]: ").strip().lower() + if not val: + return default + return val in ("y", "yes", "o", "oui") + + +def ask_choice(msg: str, choices: List[str], default: int = 0) -> int: + """Numbered choice prompt.""" + print(f"\n {C.B}{msg}{C.RST}") + for i, c in enumerate(choices): + marker = f"{C.GRN}>{C.RST}" if i == default else " " + print(f" {marker} {i + 1}) {c}") + val = input(f" Choice [{default + 1}]: ").strip() + if not val: + return default + try: + idx = int(val) - 1 + if 0 <= idx < len(choices): + return idx + except ValueError: + pass + return default + + +def interactive_wizard() -> DeviceConfig: + """Guided interactive setup.""" + cfg = DeviceConfig() + + print(f"\n {C.B}{C.CYN}--- Device ---{C.RST}\n") + + # Port + ports = detect_serial_ports() + if ports: + info(f"Detected ports: {', '.join(ports)}") + cfg.port = ask("Serial port", ports[0]) + else: + cfg.port = ask("Serial port (e.g. /dev/ttyUSB0)") + + # Device ID + rand_id = secrets.token_hex(4) + cfg.device_id = ask("Device ID (8 hex chars)", rand_id) + + # Network + mode = ask_choice("Network mode:", ["WiFi", "GPRS"], 0) + cfg.network_mode = "wifi" if mode == 0 else "gprs" + + if cfg.network_mode == "wifi": + cfg.wifi_ssid = ask("WiFi SSID") + cfg.wifi_pass = ask("WiFi password") + else: + cfg.gprs_apn = ask("GPRS APN", "sl2sfr") + + # Hostname + cfg.hostname = ask("Device hostname (empty = random)", "") + + print(f"\n {C.B}{C.CYN}--- C2 Server ---{C.RST}\n") + + cfg.srv_ip = ask("Server IP", "192.168.1.100") + cfg.srv_port = int(ask("Server port", "2626")) + + print(f"\n {C.B}{C.CYN}--- Modules ---{C.RST}\n") + + cfg.mod_network = ask_yn("Enable network module (ping, arp, proxy, dos)?", True) + cfg.mod_fakeap = ask_yn("Enable fakeAP module (captive portal, sniffer)?", False) + cfg.mod_honeypot = ask_yn("Enable honeypot module (SSH/Telnet/HTTP/FTP, WiFi/net monitor)?", False) + cfg.mod_recon = ask_yn("Enable recon module?", False) + + if cfg.mod_recon: + cfg.recon_camera = ask_yn(" Enable camera?", False) + cfg.recon_ble_trilat = ask_yn(" Enable BLE trilateration?", False) + + cfg.mod_fallback = ask_yn("Enable fallback module (autonomous WiFi recovery)?", False) + cfg.ota_enabled = ask_yn("Enable OTA updates?", True) + + print(f"\n {C.B}{C.CYN}--- Security ---{C.RST}\n") + + if ask_yn("Generate new random 256-bit master key?", True): + cfg.master_key_hex = "" + else: + cfg.master_key_hex = ask("Master key (64 hex chars)") + + return cfg + + +# ══════════════════════════════════════════════════════════════════════════════ +# Deploy Pipeline +# ══════════════════════════════════════════════════════════════════════════════ + +def deploy_one(cfg: DeviceConfig, *, + build: bool = True, + flash: bool = True, + provision_only: bool = False, + erase: bool = False, + monitor: bool = False, + keystore_path: str = ""): + """Full deploy pipeline for a single device.""" + + ks_path = keystore_path or str(KEYSTORE_DEFAULT) + + # Summary + net_info = (f"{cfg.network_mode.upper()} " + + (f"({cfg.wifi_ssid})" if cfg.network_mode == "wifi" + else f"({cfg.gprs_apn})")) + mods = ", ".join(filter(None, [ + "network" if cfg.mod_network else None, + "fakeAP" if cfg.mod_fakeap else None, + "honeypot" if cfg.mod_honeypot else None, + "recon" if cfg.mod_recon else None, + "fallback" if cfg.mod_fallback else None, + "redteam" if cfg.mod_redteam else None, + "canbus" if cfg.mod_canbus else None, + ])) or "none" + + box([ + f"Device ID : {cfg.device_id}", + f"Port : {cfg.port}", + f"Network : {net_info}", + f"C2 Server : {cfg.srv_ip}:{cfg.srv_port}", + f"Modules : {mods}", + f"OTA : {'yes' if cfg.ota_enabled else 'no'}", + f"Erase : {'yes' if erase else 'no'}", + f"Mode : {'provision-only' if provision_only else 'full deploy'}", + ]) + + with tempfile.TemporaryDirectory(prefix="epsilon-deploy-") as tmpdir: + + # ── Step 1: Environment ── + step_header(1, "Environment Check") + idf_path = check_environment() + + binaries: Dict[str, str] = {} + + if not provision_only: + # ── Step 2: sdkconfig ── + step_header(2, "Generate Configuration") + write_sdkconfig(cfg) + + # ── Step 3: Build or locate ── + if build: + step_header(3, "Build Firmware") + binaries = build_firmware(idf_path) + else: + step_header(3, "Locate Existing Binaries") + binaries = locate_binaries() + + # ── Step 4: Crypto ── + step_header(4, "Master Key") + master_key = generate_master_key(cfg) + + # ── Step 5: NVS ── + step_header(5, "Generate Factory NVS Partition") + nvs_bin = generate_nvs_binary(master_key, tmpdir) + + # ── Step 6: Flash ── + if flash: + step_header(6, "Flash Device") + if provision_only: + flash_fctry_only(cfg.port, nvs_bin) + else: + flash_all(cfg, binaries, nvs_bin, erase=erase) + + # ── Step 7: Keystore ── + step_header(7, "Update C2 Keystore") + update_keystore(cfg.device_id, master_key, ks_path) + + # ── Done ── + print(f"\n {C.GRN}{C.B}{'=' * 50}") + print(f" Deploy complete for {cfg.device_id}") + print(f" {'=' * 50}{C.RST}\n") + + # ── Optional monitor ── + if monitor and flash: + step_header("*", "Serial Monitor") + monitor_serial(cfg.port, idf_path) + + +# ══════════════════════════════════════════════════════════════════════════════ +# CLI / Entry Point +# ══════════════════════════════════════════════════════════════════════════════ + +def config_from_dict(d: dict, defaults: Optional[dict] = None) -> DeviceConfig: + """Create DeviceConfig from a JSON dict with optional defaults. + + Supports both flat format (legacy) and nested format: + Flat: {"device_id": "x", "srv_ip": "...", "module_network": true, ...} + Nested: {"device_id": "x", "server": {"ip": "..."}, "modules": {"network": true}, ...} + + When defaults is provided, device-level keys override defaults. + """ + dfl = defaults or {} + + # Merge nested sections: defaults <- device overrides + def merged(section: str) -> dict: + base = dict(dfl.get(section, {})) + base.update(d.get(section, {})) + return base + + server = merged("server") + network = merged("network") + modules = merged("modules") + ota = merged("ota") + + return DeviceConfig( + device_id=d.get("device_id", secrets.token_hex(4)), + port=d.get("port", dfl.get("port", "")), + hostname=d.get("hostname", dfl.get("hostname", "")), + # Server + srv_ip=server.get("ip", d.get("srv_ip", "192.168.1.100")), + srv_port=server.get("port", d.get("srv_port", 2626)), + # Network + network_mode=network.get("mode", d.get("network_mode", "wifi")), + wifi_ssid=network.get("wifi_ssid", d.get("wifi_ssid", "")), + wifi_pass=network.get("wifi_pass", d.get("wifi_pass", "")), + gprs_apn=network.get("gprs_apn", d.get("gprs_apn", "sl2sfr")), + # Modules + mod_network=modules.get("network", d.get("module_network", True)), + mod_recon=modules.get("recon", d.get("module_recon", False)), + mod_fakeap=modules.get("fakeap", d.get("module_fakeap", False)), + mod_honeypot=modules.get("honeypot", d.get("module_honeypot", False)), + mod_canbus=modules.get("canbus", d.get("module_canbus", False)), + mod_fallback=modules.get("fallback", d.get("module_fallback", False)), + recon_camera=modules.get("recon_camera", d.get("recon_camera", False)), + recon_ble_trilat=modules.get("recon_ble_trilat", d.get("recon_ble_trilat", False)), + # OTA + ota_enabled=ota.get("enabled", d.get("ota_enabled", True)), + ota_allow_http=ota.get("allow_http", d.get("ota_allow_http", False)), + # Security + master_key_hex=d.get("master_key", dfl.get("master_key", "")), + ) + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Epsilon Deploy - Unified build, provision & flash", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Interactive wizard (recommended for first use) + python deploy.py + + # Full deploy with WiFi + python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ + --wifi MySSID MyPass --srv 192.168.1.100 + + # Full deploy with GPRS + python deploy.py -p /dev/ttyUSB0 -d abc12345 \\ + --gprs sl2sfr --srv 203.0.113.10 + + # Provision only (generate key + flash NVS, no rebuild) + python deploy.py --provision-only -p /dev/ttyUSB0 -d abc12345 + + # Flash existing build (skip rebuild) + python deploy.py --flash-only -p /dev/ttyUSB0 -d abc12345 \\ + --wifi MySSID MyPass --srv 192.168.1.100 + + # Clean install (erase flash first) + python deploy.py --erase -p /dev/ttyUSB0 -d abc12345 \\ + --wifi MySSID MyPass --srv 192.168.1.100 + + # Deploy + watch serial output + python deploy.py --monitor -p /dev/ttyUSB0 -d abc12345 \\ + --wifi MySSID MyPass --srv 192.168.1.100 + + # Batch deploy from config file + python deploy.py --config deploy.json +""", + ) + + # Device + parser.add_argument("-d", "--device-id", + help="Device ID (default: random 8 hex)") + parser.add_argument("-p", "--port", + help="Serial port (e.g. /dev/ttyUSB0)") + + # Network (mutually exclusive) + net = parser.add_mutually_exclusive_group() + net.add_argument("--wifi", nargs=2, metavar=("SSID", "PASS"), + help="WiFi mode with SSID and password") + net.add_argument("--gprs", metavar="APN", nargs="?", const="sl2sfr", + help="GPRS mode (default APN: sl2sfr)") + + # Server + parser.add_argument("--srv", metavar="IP", help="C2 server IP") + parser.add_argument("--srv-port", type=int, default=2626) + + # Modules + parser.add_argument("--mod-fakeap", action="store_true", default=False, + help="Enable fakeAP module") + parser.add_argument("--mod-honeypot", action="store_true", default=False, + help="Enable honeypot module (TCP services, WiFi/net monitors)") + parser.add_argument("--mod-recon", action="store_true", default=False, + help="Enable recon module") + parser.add_argument("--recon-camera", action="store_true", default=False, + help="Enable camera") + parser.add_argument("--recon-ble", action="store_true", default=False, + help="Enable BLE trilateration") + parser.add_argument("--mod-fallback", action="store_true", default=False, + help="Enable fallback module (autonomous WiFi recovery)") + parser.add_argument("--mod-redteam", action="store_true", default=False, + help="Enable red team module (offensive operations)") + parser.add_argument("--no-network", action="store_true", default=False, + help="Disable network module") + parser.add_argument("--no-ota", action="store_true", default=False, + help="Disable OTA module") + parser.add_argument("--ota-http", action="store_true", default=False, + help="Allow OTA over plain HTTP (insecure, local networks)") + + # Security + parser.add_argument("--key", metavar="HEX", + help="Master key as 64 hex chars (default: random)") + parser.add_argument("--keystore", type=Path, default=None, + help=f"C2 keystore path (default: {KEYSTORE_DEFAULT})") + + # Operating modes + parser.add_argument("--provision-only", action="store_true", + help="Only generate key and flash factory NVS") + parser.add_argument("--flash-only", action="store_true", + help="Flash existing build (skip rebuild)") + parser.add_argument("--build-only", action="store_true", + help="Build only (no flash, no provision)") + parser.add_argument("--erase", action="store_true", + help="Erase entire flash before writing") + parser.add_argument("--monitor", action="store_true", + help="Start serial monitor after flash") + parser.add_argument("--hostname", + help="Device hostname (default: random)") + + # Config file + parser.add_argument("--config", type=Path, + help="Load device configs from JSON file") + + args = parser.parse_args() + + banner() + + # ── Config file mode ── + if args.config: + with open(args.config) as f: + data = json.load(f) + defaults = data.get("defaults", {}) + devices = data.get("devices", [data]) + total = len(devices) + ok_count = 0 + for i, d in enumerate(devices, 1): + print(f"\n {C.B}### Device {i}/{total}: {d.get('device_id', '?')} ###{C.RST}") + cfg = config_from_dict(d, defaults) + try: + deploy_one( + cfg, + build=not args.flash_only, + flash=not args.build_only, + provision_only=args.provision_only, + erase=args.erase, + monitor=args.monitor, + keystore_path=str(args.keystore) if args.keystore else "", + ) + ok_count += 1 + except SystemExit: + err(f"Failed for {cfg.device_id}, continuing...") + print(f"\n {C.B}Results: {ok_count}/{total} succeeded{C.RST}") + return 0 if ok_count == total else 1 + + # ── Interactive mode (no args) ── + if not args.device_id and not args.port and not args.provision_only: + cfg = interactive_wizard() + print() + if not ask_yn("Proceed with deployment?", True): + print(" Aborted.") + return 1 + deploy_one( + cfg, + build=not args.flash_only, + flash=not args.build_only, + erase=args.erase, + monitor=args.monitor, + keystore_path=str(args.keystore) if args.keystore else "", + ) + return 0 + + # ── CLI mode ── + if not args.port: + err("--port / -p is required in CLI mode") + return 1 + + cfg = DeviceConfig( + device_id=args.device_id or secrets.token_hex(4), + port=args.port, + srv_ip=args.srv or "192.168.1.100", + srv_port=args.srv_port, + network_mode="gprs" if args.gprs else "wifi", + wifi_ssid=args.wifi[0] if args.wifi else "", + wifi_pass=args.wifi[1] if args.wifi else "", + gprs_apn=args.gprs if args.gprs else "sl2sfr", + hostname=args.hostname or "", + mod_network=not args.no_network, + mod_fakeap=args.mod_fakeap, + mod_honeypot=args.mod_honeypot, + mod_recon=args.mod_recon, + mod_fallback=args.mod_fallback, + mod_redteam=args.mod_redteam, + recon_camera=args.recon_camera, + recon_ble_trilat=args.recon_ble, + ota_enabled=not args.no_ota, + ota_allow_http=args.ota_http, + master_key_hex=args.key or "", + ) + + # Validate WiFi (unless provision-only which doesn't need it) + if (cfg.network_mode == "wifi" + and not args.provision_only + and not args.build_only + and (not cfg.wifi_ssid or not cfg.wifi_pass)): + err("WiFi mode requires --wifi SSID PASS") + return 1 + + deploy_one( + cfg, + build=not args.flash_only and not args.provision_only, + flash=not args.build_only, + provision_only=args.provision_only, + erase=args.erase, + monitor=args.monitor, + keystore_path=str(args.keystore) if args.keystore else "", + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/deploy.template.json b/tools/deploy.template.json new file mode 100644 index 0000000..bca11a8 --- /dev/null +++ b/tools/deploy.template.json @@ -0,0 +1,89 @@ +{ + "defaults": { + "server": { + "ip": "192.168.1.100", + "port": 2626 + }, + "network": { + "mode": "wifi", + "wifi_ssid": "YOUR_SSID", + "wifi_pass": "YOUR_PASSWORD", + "gprs_apn": "sl2sfr" + }, + "modules": { + "network": true, + "fakeap": false, + "honeypot": false, + "recon": false, + "canbus": false, + "recon_camera": false, + "recon_ble_trilat": false + }, + "ota": { + "enabled": true, + "allow_http": false + } + }, + + "devices": [ + { + "_comment": "Bot standard — commandes réseau uniquement", + "device_id": "aaaa0001", + "port": "/dev/ttyUSB0", + "hostname": "esp32-bot-01" + }, + + { + "_comment": "Honeypot — services TCP + monitoring WiFi/réseau", + "device_id": "bbbb0001", + "port": "/dev/ttyUSB1", + "hostname": "esp32-honeypot-01", + "modules": { + "network": true, + "honeypot": true + } + }, + + { + "_comment": "FakeAP — portail captif + sniffer WiFi", + "device_id": "cccc0001", + "port": "/dev/ttyUSB2", + "hostname": "esp32-fakeap-01", + "modules": { + "network": true, + "fakeap": true + } + }, + + { + "_comment": "Recon caméra — reconnaissance visuelle", + "device_id": "dddd0001", + "port": "/dev/ttyUSB3", + "hostname": "esp32-cam-01", + "modules": { + "network": false, + "recon": true, + "recon_camera": true + } + }, + + { + "_comment": "Bot GPRS — déployé hors réseau WiFi, clé pré-provisionnée", + "device_id": "eeee0001", + "port": "/dev/ttyUSB4", + "hostname": "esp32-gprs-01", + "master_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + "network": { + "mode": "gprs", + "gprs_apn": "internet" + }, + "server": { + "ip": "203.0.113.10" + }, + "ota": { + "enabled": true, + "allow_http": true + } + } + ] +} diff --git a/tools/espmon/__init__.py b/tools/espmon/__init__.py new file mode 100644 index 0000000..374f79d --- /dev/null +++ b/tools/espmon/__init__.py @@ -0,0 +1,3 @@ +"""espmon - ESP32 serial monitor and C2 command interface for Espilon.""" + +__version__ = "1.0.0" diff --git a/tools/espmon/__main__.py b/tools/espmon/__main__.py new file mode 100644 index 0000000..210e933 --- /dev/null +++ b/tools/espmon/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Entry point for: python -m espmon""" + +import sys +from espmon.cli import main + +sys.exit(main()) diff --git a/tools/espmon/cli.py b/tools/espmon/cli.py new file mode 100644 index 0000000..f9d94de --- /dev/null +++ b/tools/espmon/cli.py @@ -0,0 +1,346 @@ +"""CLI parser and subcommand handlers.""" + +import argparse +import os +import time +import sys + +import requests + +from espmon import __version__ +from espmon.colors import C, ok, info, err, warn +from espmon.config import Config, DEFAULT_BAUD, POLL_INTERVAL, POLL_TIMEOUT +from espmon.client import C3POClient +from espmon.logs import LogManager +from espmon.monitor import SerialMonitor +from espmon.daemon import daemonize, stop_daemon + + +# ─── Helpers ───────────────────────────────────────────────────────────────── + +def _format_duration(seconds: float) -> str: + """Format seconds into a human-readable duration.""" + s = int(seconds) + if s < 60: + return f"{s}s" + if s < 3600: + return f"{s // 60}m{s % 60:02d}s" + h = s // 3600 + m = (s % 3600) // 60 + return f"{h}h{m:02d}m" + + +def _format_size(size: int) -> str: + """Format bytes into human-readable size.""" + if size > 1024 * 1024: + return f"{size / (1024 * 1024):.1f} MB" + if size > 1024: + return f"{size / 1024:.1f} KB" + return f"{size} B" + + +def _detect_ports() -> list: + """Detect available serial ports.""" + from glob import glob + return sorted(glob("/dev/ttyUSB*") + glob("/dev/ttyACM*")) + + +# ─── Subcommand: monitor ──────────────────────────────────────────────────── + +def cmd_monitor(args) -> int: + """Monitor a serial port, logging output to file.""" + port = args.port + + if not os.path.exists(port): + err(f"Port {port} not found") + available = _detect_ports() + if available: + info(f"Available ports: {', '.join(available)}") + return 1 + + LogManager.ensure_dirs() + + if args.bg: + return daemonize(port, args.baud) + + # Foreground mode + log_path = LogManager.session_path(port) + monitor = SerialMonitor(port, args.baud, log_path) + return monitor.run() + + +# ─── Subcommand: tail ──────────────────────────────────────────────────────── + +def cmd_tail(args) -> int: + """Tail the most recent log file.""" + log_path = LogManager.latest_log(port=args.port, device=args.device) + + if not log_path: + err("No log files found") + info("Start monitoring first: espmon monitor /dev/ttyUSB0") + return 1 + + info(f"{log_path}") + + with open(log_path, 'r') as f: + lines = f.readlines() + + n = args.lines + for line in lines[-n:]: + print(line, end='') + + if args.follow: + # Follow mode: keep watching for new lines + try: + with open(log_path, 'r') as f: + f.seek(0, 2) # seek to end + while True: + line = f.readline() + if line: + print(line, end='', flush=True) + else: + time.sleep(0.2) + except KeyboardInterrupt: + pass + + return 0 + + +# ─── Subcommand: cmd ───────────────────────────────────────────────────────── + +def cmd_cmd(args) -> int: + """Send a command to a device via C3PO API and wait for response.""" + config = Config() + client = C3POClient(config) + + try: + result = client.send_command(args.device_id, args.command, args.argv) + except requests.exceptions.ConnectionError: + err(f"Cannot connect to C3PO at {config.base_url}") + err("Is C3PO running? Start with: cd tools/C3PO && python c3po.py --headless") + return 1 + except requests.exceptions.HTTPError as e: + err(f"API error: {e.response.status_code} {e.response.text}") + return 1 + + results = result.get("results", []) + if not results: + err("No results returned") + return 1 + + # Handle multiple devices (broadcast) + pending = [] + for r in results: + if r.get("status") == "ok": + pending.append((r["device_id"], r["request_id"])) + info(f"Sent to {r['device_id']} (req: {r['request_id'][:20]}...)") + else: + err(f"{r.get('device_id', '?')}: {r.get('message', 'error')}") + + if not pending: + return 1 + + # Poll for results + seen = {req_id: 0 for _, req_id in pending} + start = time.time() + completed = set() + timeout = args.timeout + + while time.time() - start < timeout and len(completed) < len(pending): + for device_id, req_id in pending: + if req_id in completed: + continue + + try: + status = client.poll_command(req_id) + except requests.exceptions.RequestException: + time.sleep(POLL_INTERVAL) + continue + + output = status.get("output", []) + prefix = f"[{device_id}] " if len(pending) > 1 else "" + + for line in output[seen[req_id]:]: + print(f"{prefix}{line}") + seen[req_id] = len(output) + + if status.get("status") == "completed": + completed.add(req_id) + + if len(completed) < len(pending): + time.sleep(POLL_INTERVAL) + + if len(completed) == len(pending): + ok(f"Command completed ({len(completed)} device(s))") + return 0 + + warn(f"Timed out after {timeout}s ({len(completed)}/{len(pending)} completed)") + return 1 + + +# ─── Subcommand: status ────────────────────────────────────────────────────── + +def cmd_status(args) -> int: + """List connected devices from C3PO.""" + config = Config() + client = C3POClient(config) + + try: + data = client.list_devices() + except requests.exceptions.ConnectionError: + err(f"Cannot connect to C3PO at {config.base_url}") + return 1 + except requests.exceptions.HTTPError as e: + err(f"API error: {e.response.status_code}") + return 1 + + devices = data.get("devices", []) + if not devices: + info("No devices connected") + return 0 + + print(f"\n {'ID':<16} {'IP':<16} {'Status':<12} {'Chip':<10} " + f"{'Modules':<30} {'Uptime':<10}") + print(f" {'-'*16} {'-'*16} {'-'*12} {'-'*10} {'-'*30} {'-'*10}") + + for d in devices: + status_color = C.GRN if d.get("status") == "Connected" else C.YLW + modules = d.get("modules", "") or "-" + uptime = _format_duration(d.get("connected_for_seconds", 0)) + + print(f" {d['id']:<16} " + f"{d.get('ip', '?'):<16} " + f"{status_color}{d.get('status', '?'):<12}{C.RST} " + f"{d.get('chip', '?'):<10} " + f"{modules:<30} " + f"{uptime:<10}") + + print(f"\n {len(devices)} device(s)\n") + return 0 + + +# ─── Subcommand: logs ──────────────────────────────────────────────────────── + +def cmd_logs(args) -> int: + """List all log files.""" + entries = LogManager.list_all() + + if not entries: + info("No log files found") + return 0 + + print(f"\n {'File':<40} {'Size':<10} {'Last Modified'}") + print(f" {'-'*40} {'-'*10} {'-'*20}") + + total_size = 0 + for e in entries: + total_size += e["size"] + mtime_str = e["mtime"].strftime("%Y-%m-%d %H:%M") + print(f" {str(e['relative']):<40} " + f"{_format_size(e['size']):<10} " + f"{mtime_str}") + + print(f"\n {len(entries)} file(s), {_format_size(total_size)} total\n") + return 0 + + +# ─── Subcommand: stop ──────────────────────────────────────────────────────── + +def cmd_stop(args) -> int: + """Stop the background monitor daemon.""" + return stop_daemon() + + +# ─── Parser ────────────────────────────────────────────────────────────────── + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="espmon", + description="ESP32 serial monitor and C2 command interface", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +Examples: + espmon monitor /dev/ttyUSB0 Monitor serial (foreground) + espmon monitor /dev/ttyUSB0 --bg Monitor as background daemon + espmon tail Tail most recent log + espmon tail -f -n 100 Follow mode, last 100 lines + espmon cmd 03d03f48 system_info Send command to device + espmon cmd all system_mem Broadcast to all devices + espmon status List connected devices + espmon logs List log files + espmon stop Stop background monitor +""", + ) + parser.add_argument("--version", action="version", + version=f"espmon {__version__}") + + sub = parser.add_subparsers(dest="subcommand") + + # monitor + p_mon = sub.add_parser("monitor", aliases=["mon", "m"], + help="Monitor ESP32 serial output") + p_mon.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0)") + p_mon.add_argument("--baud", "-b", type=int, default=DEFAULT_BAUD, + help=f"Baud rate (default: {DEFAULT_BAUD})") + p_mon.add_argument("--bg", action="store_true", + help="Run as background daemon") + p_mon.set_defaults(func=cmd_monitor) + + # tail + p_tail = sub.add_parser("tail", aliases=["t"], + help="Tail a log file") + p_tail.add_argument("--port", "-p", help="Filter by serial port") + p_tail.add_argument("--device", "-d", help="Filter by device ID") + p_tail.add_argument("--lines", "-n", type=int, default=50, + help="Number of lines (default: 50)") + p_tail.add_argument("--follow", "-f", action="store_true", + help="Follow mode (keep watching)") + p_tail.set_defaults(func=cmd_tail) + + # cmd + p_cmd = sub.add_parser("cmd", aliases=["c"], + help="Send command via C3PO API") + p_cmd.add_argument("device_id", + help="Target device ID (or 'all' for broadcast)") + p_cmd.add_argument("command", help="Command name") + p_cmd.add_argument("argv", nargs="*", default=[], + help="Command arguments") + p_cmd.add_argument("--timeout", "-t", type=int, default=POLL_TIMEOUT, + help=f"Poll timeout seconds (default: {POLL_TIMEOUT})") + p_cmd.set_defaults(func=cmd_cmd) + + # status + p_status = sub.add_parser("status", aliases=["s"], + help="List connected devices") + p_status.set_defaults(func=cmd_status) + + # logs + p_logs = sub.add_parser("logs", aliases=["l"], + help="List log files") + p_logs.set_defaults(func=cmd_logs) + + # stop + p_stop = sub.add_parser("stop", help="Stop background monitor") + p_stop.set_defaults(func=cmd_stop) + + return parser + + +# ─── Entry Point ────────────────────────────────────────────────────────────── + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + if not args.subcommand: + parser.print_help() + return 0 + + try: + return args.func(args) + except KeyboardInterrupt: + print() + return 130 + except Exception as e: + err(f"Unexpected error: {e}") + return 1 diff --git a/tools/espmon/client.py b/tools/espmon/client.py new file mode 100644 index 0000000..d3be0ff --- /dev/null +++ b/tools/espmon/client.py @@ -0,0 +1,78 @@ +"""C3PO REST API client.""" + +import requests + +from espmon.config import Config + + +class C3POClient: + """HTTP client for the C3PO command & control API.""" + + def __init__(self, config: Config = None): + cfg = config or Config() + self.base_url = cfg.base_url + self.headers = {"Content-Type": "application/json"} + if cfg.token: + self.headers["Authorization"] = f"Bearer {cfg.token}" + + def send_command(self, device_id: str, command: str, argv: list = None) -> dict: + """Send a command to a device. + + POST /api/commands + Returns: {"results": [{"device_id", "status", "request_id"}]} + """ + body = { + "device_ids": [device_id] if device_id != "all" else "all", + "command": command, + "argv": argv or [], + } + resp = requests.post( + f"{self.base_url}/api/commands", + json=body, + headers=self.headers, + timeout=10, + ) + resp.raise_for_status() + return resp.json() + + def poll_command(self, request_id: str) -> dict: + """Poll a command's result. + + GET /api/commands/ + Returns: {"status": "pending|completed", "output": [...]} + """ + resp = requests.get( + f"{self.base_url}/api/commands/{request_id}", + headers=self.headers, + timeout=10, + ) + resp.raise_for_status() + return resp.json() + + def list_devices(self) -> dict: + """List connected devices. + + GET /api/devices + Returns: {"devices": [...], "count": N} + """ + resp = requests.get( + f"{self.base_url}/api/devices", + headers=self.headers, + timeout=10, + ) + resp.raise_for_status() + return resp.json() + + def list_commands(self) -> dict: + """List recent commands. + + GET /api/commands + Returns: {"commands": [...]} + """ + resp = requests.get( + f"{self.base_url}/api/commands", + headers=self.headers, + timeout=10, + ) + resp.raise_for_status() + return resp.json() diff --git a/tools/espmon/colors.py b/tools/espmon/colors.py new file mode 100644 index 0000000..654a161 --- /dev/null +++ b/tools/espmon/colors.py @@ -0,0 +1,22 @@ +"""Terminal color helpers (same style as deploy.py).""" + + +class C: + RST = "\033[0m" + B = "\033[1m" + DIM = "\033[2m" + RED = "\033[91m" + GRN = "\033[92m" + YLW = "\033[93m" + BLU = "\033[94m" + CYN = "\033[96m" + + +def _tag(color: str, tag: str, msg: str): + print(f" {color}{C.B}[{tag}]{C.RST} {msg}") + + +def ok(m): _tag(C.GRN, " OK ", m) +def info(m): _tag(C.BLU, " .. ", m) +def warn(m): _tag(C.YLW, " !! ", m) +def err(m): _tag(C.RED, " XX ", m) diff --git a/tools/espmon/config.py b/tools/espmon/config.py new file mode 100644 index 0000000..3d26678 --- /dev/null +++ b/tools/espmon/config.py @@ -0,0 +1,84 @@ +"""Configuration, paths, and constants for espmon.""" + +import os +import re +from pathlib import Path + + +# Paths +SCRIPT_DIR = Path(__file__).resolve().parent # tools/espmon/ +TOOLS_DIR = SCRIPT_DIR.parent # tools/ +LOGS_DIR = TOOLS_DIR / "logs" +C3PO_DIR = TOOLS_DIR / "C3PO" +ENV_FILE = C3PO_DIR / ".env" +DEPLOY_JSON = TOOLS_DIR / "deploy.json" +PID_FILE = LOGS_DIR / ".espmon.pid" + +# Serial +DEFAULT_BAUD = 115200 + +# API polling +POLL_INTERVAL = 0.5 # seconds between polls +POLL_TIMEOUT = 120 # max wait for command result + +# ANSI escape codes +ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\(B') + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape codes from text.""" + return ANSI_RE.sub('', text) + + +class Config: + """C3PO connection configuration. + + Reads from (in order of priority): + 1. Environment variables: ESPMON_HOST, ESPMON_PORT, ESPMON_TOKEN + 2. tools/C3PO/.env file + 3. Hardcoded defaults + """ + + def __init__(self): + self.host = "localhost" + self.port = 8000 + self.token = "" + self._load() + + def _load(self): + env_vars = {} + if ENV_FILE.exists(): + with open(ENV_FILE) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + if '=' in line: + key, _, value = line.partition('=') + env_vars[key.strip()] = value.strip().strip('"').strip("'") + + # Host + self.host = ( + os.environ.get("ESPMON_HOST") + or env_vars.get("WEB_HOST", "").replace("0.0.0.0", "localhost") + or "localhost" + ) + + # Port + try: + self.port = int( + os.environ.get("ESPMON_PORT") + or env_vars.get("WEB_PORT", "8000") + ) + except ValueError: + self.port = 8000 + + # Auth token + self.token = ( + os.environ.get("ESPMON_TOKEN") + or env_vars.get("MULTILAT_AUTH_TOKEN", "") + ) + + @property + def base_url(self) -> str: + return f"http://{self.host}:{self.port}" diff --git a/tools/espmon/daemon.py b/tools/espmon/daemon.py new file mode 100644 index 0000000..10130da --- /dev/null +++ b/tools/espmon/daemon.py @@ -0,0 +1,99 @@ +"""Background daemon management — double-fork, PID file, stop.""" + +import os +import signal +import time + +from espmon.colors import ok, info, err, warn +from espmon.config import PID_FILE, LOGS_DIR +from espmon.logs import LogManager +from espmon.monitor import SerialMonitor + + +def daemonize(port: str, baud: int) -> int: + """Start the serial monitor as a background daemon. + + Uses the Unix double-fork pattern to fully detach from the terminal. + Returns 0 to the parent process, never returns in the daemon. + """ + log_path = LogManager.session_path(port) + + pid = os.fork() + if pid > 0: + # Parent: wait for first child + os.waitpid(pid, 0) + ok(f"Monitor started in background") + ok(f"Logging to {log_path}") + info(f"PID file: {PID_FILE}") + info(f"Stop with: python -m espmon stop") + return 0 + + # First child: new session + os.setsid() + pid2 = os.fork() + if pid2 > 0: + os._exit(0) + + # Second child (daemon process) + devnull = os.open(os.devnull, os.O_RDWR) + os.dup2(devnull, 0) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + os.close(devnull) + + # Write PID file + LOGS_DIR.mkdir(parents=True, exist_ok=True) + PID_FILE.write_text(str(os.getpid())) + + # Run monitor with signal handler for clean shutdown + monitor = SerialMonitor(port, baud, log_path) + signal.signal(signal.SIGTERM, lambda s, f: monitor.stop()) + signal.signal(signal.SIGINT, lambda s, f: monitor.stop()) + + try: + monitor.run() + finally: + PID_FILE.unlink(missing_ok=True) + + os._exit(0) + + +def stop_daemon() -> int: + """Stop a running background monitor daemon.""" + if not PID_FILE.exists(): + err("No background monitor running (no PID file)") + return 1 + + try: + pid = int(PID_FILE.read_text().strip()) + except (ValueError, OSError): + err(f"Invalid PID file: {PID_FILE}") + PID_FILE.unlink(missing_ok=True) + return 1 + + try: + os.kill(pid, signal.SIGTERM) + info(f"Sent SIGTERM to PID {pid}") + + # Wait for process to exit (max 5 seconds) + for _ in range(50): + try: + os.kill(pid, 0) + time.sleep(0.1) + except OSError: + ok("Background monitor stopped") + PID_FILE.unlink(missing_ok=True) + return 0 + + warn(f"Process {pid} did not exit, sending SIGKILL") + os.kill(pid, signal.SIGKILL) + PID_FILE.unlink(missing_ok=True) + return 0 + + except ProcessLookupError: + info(f"Process {pid} already gone") + PID_FILE.unlink(missing_ok=True) + return 0 + except PermissionError: + err(f"Permission denied to kill PID {pid}") + return 1 diff --git a/tools/espmon/logs.py b/tools/espmon/logs.py new file mode 100644 index 0000000..9212d43 --- /dev/null +++ b/tools/espmon/logs.py @@ -0,0 +1,103 @@ +"""Log file management — session paths, listing, tail.""" + +import json +from datetime import datetime +from pathlib import Path +from typing import Optional + +from espmon.config import LOGS_DIR, DEPLOY_JSON + + +class LogManager: + """Manage serial monitor log files in tools/logs/.""" + + @staticmethod + def ensure_dirs(): + """Create the logs root directory if needed.""" + LOGS_DIR.mkdir(exist_ok=True) + + @staticmethod + def session_path(port: str) -> Path: + """Create a new log file path for a monitor session. + + Format: tools/logs//YYYY-MM-DD_HHhMM.log + """ + port_name = port.replace("/dev/", "").replace("/", "_") + port_dir = LOGS_DIR / port_name + port_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y-%m-%d_%Hh%M") + path = port_dir / f"{timestamp}.log" + + # Avoid collision if multiple sessions in same minute + if path.exists(): + secs = datetime.now().strftime("%S") + path = port_dir / f"{timestamp}_{secs}.log" + + return path + + @staticmethod + def latest_log(port: str = None, device: str = None) -> Optional[Path]: + """Find the most recent log file. + + Args: + port: Filter by serial port (e.g. /dev/ttyUSB0) + device: Filter by device ID (looks up port in deploy.json) + """ + if device: + mapped = LogManager._device_to_port(device) + if mapped: + port = mapped + + if port: + port_name = port.replace("/dev/", "").replace("/", "_") + port_dir = LOGS_DIR / port_name + if not port_dir.exists(): + return None + logs = sorted(port_dir.glob("*.log"), key=lambda p: p.stat().st_mtime) + return logs[-1] if logs else None + + # No filter: most recent across all ports + if not LOGS_DIR.exists(): + return None + all_logs = sorted(LOGS_DIR.rglob("*.log"), key=lambda p: p.stat().st_mtime) + return all_logs[-1] if all_logs else None + + @staticmethod + def list_all() -> list: + """List all log files with metadata. + + Returns list of dicts with: path, relative, size, mtime + """ + if not LOGS_DIR.exists(): + return [] + + entries = [] + for log_file in sorted( + LOGS_DIR.rglob("*.log"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ): + stat = log_file.stat() + entries.append({ + "path": log_file, + "relative": log_file.relative_to(LOGS_DIR), + "size": stat.st_size, + "mtime": datetime.fromtimestamp(stat.st_mtime), + }) + return entries + + @staticmethod + def _device_to_port(device_id: str) -> Optional[str]: + """Look up serial port for a device ID from deploy.json.""" + if not DEPLOY_JSON.exists(): + return None + try: + with open(DEPLOY_JSON) as f: + data = json.load(f) + for dev in data.get("devices", []): + if dev.get("device_id") == device_id: + return dev.get("port") + except (json.JSONDecodeError, KeyError, OSError): + pass + return None diff --git a/tools/espmon/monitor.py b/tools/espmon/monitor.py new file mode 100644 index 0000000..2e2448e --- /dev/null +++ b/tools/espmon/monitor.py @@ -0,0 +1,103 @@ +"""Serial port monitor — read, display, and log ESP32 output.""" + +import threading +import time +from datetime import datetime +from pathlib import Path + +from espmon.colors import info, err +from espmon.config import DEFAULT_BAUD, strip_ansi + + +class SerialMonitor: + """Read a serial port, print to stdout with colors, log to file without ANSI.""" + + def __init__(self, port: str, baud: int = DEFAULT_BAUD, log_path: Path = None): + self.port = port + self.baud = baud + self.log_path = log_path + self._stop = threading.Event() + self._log_file = None + + def run(self) -> int: + """Run the monitor (blocking). Returns exit code.""" + import serial + + try: + # Open without toggling DTR/RTS to avoid resetting the ESP32 + ser = serial.Serial() + ser.port = self.port + ser.baudrate = self.baud + ser.timeout = 0.5 + ser.dtr = False + ser.rts = False + ser.open() + except serial.SerialException as e: + err(f"Cannot open {self.port}: {e}") + return 1 + + if self.log_path: + self.log_path.parent.mkdir(parents=True, exist_ok=True) + self._log_file = open(self.log_path, 'a', encoding='utf-8') + info(f"Logging to {self.log_path}") + + info(f"Monitoring {self.port} @ {self.baud} baud (Ctrl+C to stop)") + + if self._log_file: + self._log_file.write( + f"# espmon session started {datetime.now().isoformat()}\n" + ) + self._log_file.write( + f"# port: {self.port} baud: {self.baud}\n" + ) + self._log_file.flush() + + buf = b"" + try: + while not self._stop.is_set(): + try: + chunk = ser.read(256) + except serial.SerialException: + if self._stop.is_set(): + break + err(f"Serial read error on {self.port}") + time.sleep(0.5) + continue + + if not chunk: + continue + + buf += chunk + while b"\n" in buf: + line_bytes, buf = buf.split(b"\n", 1) + line = line_bytes.decode("utf-8", errors="replace").rstrip("\r") + + # stdout with original ANSI colors + print(line, flush=True) + + # log file: timestamped, ANSI stripped + if self._log_file: + ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + clean = strip_ansi(line) + self._log_file.write(f"[{ts}] {clean}\n") + self._log_file.flush() + + except KeyboardInterrupt: + pass + finally: + try: + ser.close() + except Exception: + pass + if self._log_file: + self._log_file.write( + f"# espmon session ended {datetime.now().isoformat()}\n" + ) + self._log_file.close() + info("Monitor stopped") + + return 0 + + def stop(self): + """Signal the monitor to stop.""" + self._stop.set() diff --git a/tools/flasher/README.md b/tools/flasher/README.md deleted file mode 100644 index df0283e..0000000 --- a/tools/flasher/README.md +++ /dev/null @@ -1,470 +0,0 @@ -# Espilon Multi-Device Flasher - -Automated tool for building and flashing multiple ESP32 devices with custom configurations for the Espilon project. - -## Features - -- Build firmware with custom configurations per device -- Support for WiFi and GPRS modes -- Configurable modules (Network, Recon, FakeAP) -- Multi-device batch processing -- Individual device manual configuration -- Automatic hostname generation -- Build-only and flash-only modes - -## Prerequisites - -- Python 3.8+ -- ESP-IDF v5.3.2 (properly configured with `export.sh`) -- esptool.py (usually included with ESP-IDF) -- ESP32 development boards connected via USB - -## Installation - -No installation required. The script is standalone. - -```bash -cd tools/flasher -chmod +x flash.py -``` - -## Configuration File - -### Structure - -The `devices.json` file contains: -- **project**: Path to the `espilon_bot` directory -- **devices**: Array of device configurations - -### Example: devices.json - -```json -{ - "project": "/home/user/epsilon/espilon_bot", - "devices": [ - ## WiFi AGENT ## - { - "device_id": "ce4f626b", - "port": "/dev/ttyUSB0", - "srv_ip": "192.168.1.13", - "srv_port": 2626, - "network_mode": "wifi", - "wifi_ssid": "MyWiFi", - "wifi_pass": "MyPassword123", - "hostname": "pixel-8-pro", - "module_network": true, - "module_recon": false, - "module_fakeap": false, - "recon_camera": false, - "recon_ble_trilat": false, - "crypto_key": "testde32chars00000000000000000000", - "crypto_nonce": "noncenonceno" - }, - ## GPRS AGENT ## - { - "device_id": "a91dd021", - "port": "/dev/ttyUSB1", - "srv_ip": "203.0.113.10", - "srv_port": 2626, - "network_mode": "gprs", - "gprs_apn": "sl2sfr", - "hostname": "galaxy-s24-ultra", - "module_network": true, - "module_recon": false, - "module_fakeap": false - } - ] -} -``` - -### Device Configuration Fields - -| Field | Required | Default | Description | -|-------|----------|---------|-------------| -| `device_id` | Yes | - | Unique device identifier (8 hex chars) | -| `port` | Yes | - | Serial port (e.g., `/dev/ttyUSB0`, `COM3`) | -| `srv_ip` | Yes | - | C2 server IP address | -| `srv_port` | No | 2626 | C2 server port | -| `network_mode` | No | "wifi" | Network mode: `"wifi"` or `"gprs"` | -| `wifi_ssid` | WiFi only | - | WiFi network SSID | -| `wifi_pass` | WiFi only | - | WiFi network password | -| `gprs_apn` | GPRS only | "sl2sfr" | GPRS APN | -| `hostname` | No | Random | Device hostname for network identification | -| `module_network` | No | true | Enable network commands module | -| `module_recon` | No | false | Enable reconnaissance module | -| `module_fakeap` | No | false | Enable fake AP module | -| `recon_camera` | No | false | Enable camera reconnaissance (ESP32-CAM) | -| `recon_ble_trilat` | No | false | Enable BLE trilateration | -| `crypto_key` | No | Test key | ChaCha20 encryption key (32 chars) | -| `crypto_nonce` | No | Test nonce | ChaCha20 nonce (12 chars) | - -## Usage - -### 1. Flash All Devices from Config - -```bash -python3 flash.py --config devices.json -``` - -This will: -1. Read device configurations from `devices.json` -2. Build firmware for each device -3. Flash each device sequentially -4. Display summary report - -### 2. Manual Single Device (WiFi) - -```bash -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id abc12345 \ - --port /dev/ttyUSB0 \ - --srv-ip 192.168.1.100 \ - --wifi-ssid MyWiFi \ - --wifi-pass MyPassword123 -``` - -### 3. Manual Single Device (GPRS) - -```bash -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id def67890 \ - --port /dev/ttyUSB1 \ - --srv-ip 203.0.113.10 \ - --network-mode gprs \ - --gprs-apn sl2sfr -``` - -### 4. Build Only (No Flash) - -Useful for generating firmware files without flashing: - -```bash -python3 flash.py --config devices.json --build-only -``` - -Firmware files are saved to: `espilon_bot/firmware/.bin` - -### 5. Flash Only (Skip Build) - -Flash pre-built firmware: - -```bash -python3 flash.py --config devices.json --flash-only -``` - -Requires firmware files in `espilon_bot/firmware/` directory. - -### 6. Enable Modules - -```bash -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id xyz98765 \ - --port /dev/ttyUSB0 \ - --srv-ip 192.168.1.100 \ - --wifi-ssid MyWiFi \ - --wifi-pass MyPassword123 \ - --enable-recon \ - --enable-fakeap \ - --enable-ble-trilat -``` - -### 7. Custom Encryption Keys - -```bash -python3 flash.py --manual \ - --project /home/user/epsilon/espilon_bot \ - --device-id secure01 \ - --port /dev/ttyUSB0 \ - --srv-ip 192.168.1.100 \ - --wifi-ssid MyWiFi \ - --wifi-pass MyPassword123 \ - --crypto-key "your32charencryptionkeyhere!!" \ - --crypto-nonce "yournonce12b" -``` - -## Advanced Features - -### Hostname Generation - -If `hostname` is not specified, the script generates a realistic device name: -- iPhone models (iPhone-15-pro-max, etc.) -- Android devices (galaxy-s24-ultra, pixel-8-pro, etc.) -- Windows PCs (DESKTOP-XXXXXXX) - -This helps devices blend in on networks during authorized testing. - -### Configuration Backup - -Before building, the script automatically backs up the existing `sdkconfig.defaults` to `sdkconfig.defaults.bak`. - -### Firmware Storage - -Built firmware is saved in: -``` -espilon_bot/firmware/.bin -``` - -This allows: -- Reuse without rebuilding (--flash-only) -- Firmware version archival -- Quick reflashing of multiple devices - -## Workflow Examples - -### Scenario 1: Initial Setup (3 devices) - -1. Edit `devices.json`: - ```json - { - "project": "/home/user/epsilon/espilon_bot", - "devices": [ - {"device_id": "dev00001", "port": "/dev/ttyUSB0", ...}, - {"device_id": "dev00002", "port": "/dev/ttyUSB1", ...}, - {"device_id": "dev00003", "port": "/dev/ttyUSB2", ...} - ] - } - ``` - -2. Flash all: - ```bash - python3 flash.py --config devices.json - ``` - -3. Devices are ready for deployment - -### Scenario 2: Update Single Device - -1. Modify configuration in `devices.json` -2. Flash only that device: - ```bash - # Remove other devices from JSON or use manual mode - python3 flash.py --manual --device-id dev00002 --port /dev/ttyUSB1 ... - ``` - -### Scenario 3: Quick Reflash - -```bash -# Build once -python3 flash.py --config devices.json --build-only - -# Flash multiple times (testing, replacement devices) -python3 flash.py --config devices.json --flash-only -``` - -### Scenario 4: WiFi + GPRS Mixed Fleet - -```json -{ - "project": "/home/user/epsilon/espilon_bot", - "devices": [ - { - "device_id": "wifi001", - "network_mode": "wifi", - "wifi_ssid": "HomeNetwork", - "wifi_pass": "password123", - ... - }, - { - "device_id": "gprs001", - "network_mode": "gprs", - "gprs_apn": "sl2sfr", - ... - } - ] -} -``` - -## Troubleshooting - -### Error: "Port not found" - -``` -❌ Flash failed: Serial port /dev/ttyUSB0 not found -``` - -**Solution**: Check device connection and port: -```bash -ls /dev/ttyUSB* /dev/ttyACM* -``` - -Add user to dialout group: -```bash -sudo usermod -a -G dialout $USER -# Log out and log back in -``` - -### Error: "Build failed" - -``` -❌ Build failed for device_id -``` - -**Solutions**: -1. Verify ESP-IDF is sourced: - ```bash - . $HOME/esp-idf/export.sh - ``` - -2. Check project path in `devices.json` - -3. Manually test build: - ```bash - cd espilon_bot - idf.py build - ``` - -### Error: "Permission denied" - -``` -❌ Permission denied: /dev/ttyUSB0 -``` - -**Solution**: -```bash -sudo chmod 666 /dev/ttyUSB0 -# Or add user to dialout group (permanent) -sudo usermod -a -G dialout $USER -``` - -### Error: "Binary not found" - -``` -❌ Binary not found: epsilon_bot.bin -``` - -**Solution**: Check build output for compilation errors. The binary name should match the project configuration. - -### WiFi Mode Missing Credentials - -``` -ValueError: WiFi mode requires wifi_ssid and wifi_pass -``` - -**Solution**: Ensure `wifi_ssid` and `wifi_pass` are set for devices with `network_mode: "wifi"`. - -## Output Example - -``` -============================================================ -# Device 1/3: ce4f626b -############################################################ - -============================================================ -🔧 Building firmware for: ce4f626b (WIFI) on /dev/ttyUSB0 -============================================================ -✅ Generated sdkconfig.defaults for ce4f626b -🗑️ Removed old sdkconfig -⚙️ Running idf.py build... -✅ Firmware saved: espilon_bot/firmware/ce4f626b.bin - -============================================================ -🚀 Flashing: ce4f626b (WIFI) on /dev/ttyUSB0 -============================================================ -📁 Bootloader: espilon_bot/build/bootloader/bootloader.bin -📁 Partitions: espilon_bot/build/partition_table/partition-table.bin -📁 Application: espilon_bot/firmware/ce4f626b.bin -🔌 Port: /dev/ttyUSB0 -✅ Successfully flashed ce4f626b - -============================================================ -📊 SUMMARY -============================================================ -✅ Success: 3/3 -❌ Failed: 0/3 -============================================================ -``` - -## Security Considerations - -### Encryption Keys - -**WARNING**: The default crypto keys are for TESTING ONLY: -- `crypto_key`: "testde32chars00000000000000000000" -- `crypto_nonce`: "noncenonceno" - -For production use: -1. Generate secure keys: - ```bash - # 32-byte key - openssl rand -hex 32 - - # 12-byte nonce - openssl rand -hex 12 - ``` - -2. Update in `devices.json` or use `--crypto-key` and `--crypto-nonce` - -3. **Never commit real keys to version control** - -### Device IDs - -Generate random device IDs: -```bash -openssl rand -hex 4 # Generates 8-character hex ID -``` - -## Files Generated - -During operation, the script creates: - -``` -espilon_bot/ -├── sdkconfig # Generated during build (auto-deleted) -├── sdkconfig.defaults # Overwritten per device -├── sdkconfig.defaults.bak # Backup of previous config -├── build/ # ESP-IDF build artifacts -│ ├── bootloader/ -│ ├── partition_table/ -│ └── epsilon_bot.bin -└── firmware/ # Saved firmware binaries - ├── ce4f626b.bin - ├── a91dd021.bin - └── f34592e0.bin -``` - -## Tips - -1. **Batch Processing**: Connect multiple ESP32s to different USB ports, configure all in `devices.json`, and flash them all at once. - -2. **Parallel Builds**: For faster processing with many devices, consider building in parallel (future enhancement). - -3. **Configuration Templates**: Keep multiple `devices.json` files for different deployment scenarios: - - `devices-wifi.json` - - `devices-gprs.json` - - `devices-production.json` - - `devices-testing.json` - -4. **Firmware Archive**: Save firmware binaries with version tags: - ```bash - mkdir -p firmware-archive/v1.0 - cp espilon_bot/firmware/*.bin firmware-archive/v1.0/ - ``` - -5. **Serial Port Mapping**: Create udev rules for consistent port naming (Linux): - ```bash - # /etc/udev/rules.d/99-esp32.rules - SUBSYSTEM=="tty", ATTRS{idVendor}=="1a86", ATTRS{idProduct}=="7523", SYMLINK+="esp32-dev1" - SUBSYSTEM=="tty", ATTRS{idVendor}=="10c4", ATTRS{idProduct}=="ea60", SYMLINK+="esp32-dev2" - ``` - -## Help - -```bash -python3 flash.py --help -``` - -Displays full usage information with examples. - -## Related Documentation - -- [Installation Guide](../../docs/INSTALL.md) - Full Espilon setup -- [Hardware Guide](../../docs/HARDWARE.md) - Supported boards and wiring -- [Module API](../../docs/MODULES.md) - Available commands and modules -- [Security](../../docs/SECURITY.md) - Security best practices - -## License - -Part of the Espilon project. See [LICENSE](../../LICENSE) for details. diff --git a/tools/flasher/flash.py b/tools/flasher/flash.py deleted file mode 100644 index fc85e89..0000000 --- a/tools/flasher/flash.py +++ /dev/null @@ -1,518 +0,0 @@ -#!/usr/bin/env python3 -""" -Epsilon Multi-Device Flasher -Automates building and flashing ESP32 devices with custom configurations. -""" - -import os -import json -import subprocess -import string -import random -import argparse -from typing import List, Optional -from dataclasses import dataclass - - -# Configuration -CONFIG_FILE = "devices.json" -SDKCONFIG_FILENAME = "sdkconfig.defaults" - - -@dataclass -class Device: - """Represents an ESP32 device configuration""" - device_id: str - port: str - srv_ip: str - srv_port: int = 2626 - - # Network configuration - network_mode: str = "wifi" # "wifi" or "gprs" - wifi_ssid: Optional[str] = None - wifi_pass: Optional[str] = None - gprs_apn: Optional[str] = "sl2sfr" - - # Optional settings - hostname: Optional[str] = None - - # Modules - module_network: bool = True - module_recon: bool = False - module_fakeap: bool = False - - # Recon settings - recon_camera: bool = False - recon_ble_trilat: bool = False - - # Security (master key provisioned separately via provision.py) - - def __post_init__(self): - """Generate hostname if not provided""" - if not self.hostname: - self.hostname = self._generate_hostname() - - # Validate network mode - if self.network_mode not in ["wifi", "gprs"]: - raise ValueError(f"Invalid network_mode: {self.network_mode}. Must be 'wifi' or 'gprs'") - - # Validate WiFi mode has credentials - if self.network_mode == "wifi" and (not self.wifi_ssid or not self.wifi_pass): - raise ValueError("WiFi mode requires wifi_ssid and wifi_pass") - - @staticmethod - def _generate_hostname() -> str: - """Generate a realistic device hostname""" - hostnames = [ - "iPhone", - "Android", - f"DESKTOP-{''.join([random.choice(string.digits + string.ascii_uppercase) for _ in range(7)])}", - - # iPhones - "iPhone-15-pro-max", "iPhone-15-pro", "iPhone-15", "iPhone-15-plus", - "iPhone-14-pro-max", "iPhone-14-pro", "iPhone-14", "iPhone-14-plus", - "iPhone-se-3rd-gen", "iPhone-13-pro-max", - - # Samsung - "galaxy-s24-ultra", "galaxy-s24", "galaxy-z-fold5", "galaxy-z-flip5", "galaxy-a55", - - # Xiaomi - "xiaomi-14-ultra", "xiaomi-14", "redmi-note-13-pro-plus", "redmi-note-13-5g", "poco-f6-pro", - - # OnePlus - "oneplus-12", "oneplus-12r", "oneplus-11", "oneplus-nord-3", "oneplus-nord-ce-3-lite", - - # Google - "pixel-8-pro", "pixel-8", "pixel-7a", "pixel-fold", "pixel-6a", - - # Motorola - "moto-edge-50-ultra", "moto-g-stylus-5g-2024", "moto-g-power-2024", - "razr-50-ultra", "moto-e32", - - # Sony - "xperia-1-vi", "xperia-10-vi", "xperia-5-v", "xperia-l5", "xperia-pro-i", - - # Oppo - "oppo-find-x6-pro", "oppo-reno9-pro", "oppo-a78", "oppo-f21-pro", "oppo-a17", - - # Vivo - "vivo-x90-pro-plus", "vivo-x90-pro", "vivo-y35", "vivo-y75", "vivo-v29e", - - # Realme - "realme-11-pro-plus", "realme-10x", "realme-9i", "realme-c33", "realme-11x", - - # Asus - "rog-phone-8-pro", "zenfone-10", "rog-phone-7", "rog-phone-6d", "asus-zenfone-9", - - # Lenovo - "lenovo-legion-y90", "lenovo-k14-note", "lenovo-k14-plus", "lenovo-tab-m10", - - # Honor - "honor-90", "honor-x8a", "honor-70-pro", "honor-magic5-pro", "honor-x7a", - - # Huawei - "huawei-p60-pro", "huawei-p50-pro", "huawei-mate-50-pro", "huawei-mate-xs-2", "huawei-nova-11", - - # LG - "lg-wing", "lg-velvet", "lg-g8x-thinQ", "lg-v60-thinQ", "lg-k92-5g" - ] - return random.choice(hostnames) - - @classmethod - def from_dict(cls, data: dict): - """Create Device from dictionary""" - return cls( - device_id=data["device_id"], - port=data["port"], - srv_ip=data["srv_ip"], - srv_port=data.get("srv_port", 2626), - network_mode=data.get("network_mode", "wifi"), - wifi_ssid=data.get("wifi_ssid"), - wifi_pass=data.get("wifi_pass"), - gprs_apn=data.get("gprs_apn", "sl2sfr"), - hostname=data.get("hostname"), - module_network=data.get("module_network", True), - module_recon=data.get("module_recon", False), - module_fakeap=data.get("module_fakeap", False), - recon_camera=data.get("recon_camera", False), - recon_ble_trilat=data.get("recon_ble_trilat", False) - ) - - def __str__(self): - return f"{self.device_id} ({self.network_mode.upper()}) on {self.port}" - - -class FirmwareBuilder: - """Handles firmware building for ESP32 devices""" - - def __init__(self, project_path: str): - self.project_path = project_path - self.sdkconfig_path = os.path.join(self.project_path, SDKCONFIG_FILENAME) - self.build_dir = os.path.join(self.project_path, "build") - self.firmware_dir = os.path.join(self.project_path, "firmware") - os.makedirs(self.firmware_dir, exist_ok=True) - - def generate_sdkconfig(self, device: Device): - """Generate sdkconfig.defaults for a specific device""" - - # Backup existing config - if os.path.exists(self.sdkconfig_path): - backup_path = f"{self.sdkconfig_path}.bak" - with open(self.sdkconfig_path, "r") as src, open(backup_path, "w") as dst: - dst.write(src.read()) - - # Generate new config - config_lines = [] - - # Device ID - config_lines.append(f'CONFIG_DEVICE_ID="{device.device_id}"') - - # Network Mode - if device.network_mode == "wifi": - config_lines.append("CONFIG_NETWORK_WIFI=y") - config_lines.append("CONFIG_NETWORK_GPRS=n") - config_lines.append(f'CONFIG_WIFI_SSID="{device.wifi_ssid}"') - config_lines.append(f'CONFIG_WIFI_PASS="{device.wifi_pass}"') - else: # gprs - config_lines.append("CONFIG_NETWORK_WIFI=n") - config_lines.append("CONFIG_NETWORK_GPRS=y") - config_lines.append(f'CONFIG_GPRS_APN="{device.gprs_apn}"') - - # Server - config_lines.append(f'CONFIG_SERVER_IP="{device.srv_ip}"') - config_lines.append(f'CONFIG_SERVER_PORT={device.srv_port}') - - # Security (master key provisioned via provision.py into factory NVS) - - # Modules - config_lines.append(f'CONFIG_MODULE_NETWORK={"y" if device.module_network else "n"}') - config_lines.append(f'CONFIG_MODULE_RECON={"y" if device.module_recon else "n"}') - config_lines.append(f'CONFIG_MODULE_FAKEAP={"y" if device.module_fakeap else "n"}') - - # Recon settings (only if module enabled) - if device.module_recon: - config_lines.append(f'CONFIG_RECON_MODE_CAMERA={"y" if device.recon_camera else "n"}') - config_lines.append(f'CONFIG_RECON_MODE_BLE_TRILAT={"y" if device.recon_ble_trilat else "n"}') - - # Crypto: ChaCha20-Poly1305 + HKDF (mbedtls legacy API, ESP-IDF v5.3) - config_lines.append("CONFIG_MBEDTLS_CHACHA20_C=y") - config_lines.append("CONFIG_MBEDTLS_POLY1305_C=y") - config_lines.append("CONFIG_MBEDTLS_CHACHAPOLY_C=y") - config_lines.append("CONFIG_MBEDTLS_HKDF_C=y") - - # Partition table (custom with factory NVS) - config_lines.append("CONFIG_PARTITION_TABLE_CUSTOM=y") - config_lines.append('CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions.csv"') - config_lines.append("CONFIG_LWIP_IPV4_NAPT=y") - config_lines.append("CONFIG_LWIP_IPV4_NAPT_PORTMAP=y") - config_lines.append("CONFIG_LWIP_IP_FORWARD=y") - config_lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{device.hostname}"') - - # Bluetooth (for BLE trilateration) - if device.recon_ble_trilat: - config_lines.append("CONFIG_BT_ENABLED=y") - config_lines.append("CONFIG_BT_BLUEDROID_ENABLED=y") - config_lines.append("CONFIG_BT_BLE_ENABLED=y") - - # Write config - with open(self.sdkconfig_path, "w") as f: - f.write("\n".join(config_lines) + "\n") - - print(f"✅ Generated sdkconfig.defaults for {device.device_id}") - - def build(self, device: Device) -> Optional[str]: - """Build firmware for a specific device""" - print(f"\n{'='*60}") - print(f"🔧 Building firmware for: {device}") - print(f"{'='*60}") - - # Generate config - self.generate_sdkconfig(device) - - # Remove old sdkconfig to force reconfiguration - sdkconfig = os.path.join(self.project_path, "sdkconfig") - if os.path.exists(sdkconfig): - os.remove(sdkconfig) - print("🗑️ Removed old sdkconfig") - - # Build - try: - print("⚙️ Running idf.py build...") - result = subprocess.run( - ["bash", "-c", f". $HOME/esp-idf/export.sh > /dev/null 2>&1 && idf.py -C {self.project_path} -D SDKCONFIG_DEFAULTS={SDKCONFIG_FILENAME} build 2>&1"], - capture_output=True, - text=True, - timeout=300 # 5 minutes timeout - ) - - if result.returncode != 0: - print(f"❌ Build failed for {device.device_id}") - print("Error output:") - print(result.stdout[-2000:] if len(result.stdout) > 2000 else result.stdout) - return None - - except subprocess.TimeoutExpired: - print(f"❌ Build timeout for {device.device_id}") - return None - except Exception as e: - print(f"❌ Build error for {device.device_id}: {e}") - return None - - # Find binary - bin_name = "epsilon_bot.bin" - bin_path = os.path.join(self.build_dir, bin_name) - - if not os.path.exists(bin_path): - print(f"❌ Binary not found: {bin_path}") - return None - - # Copy to firmware directory with device ID - output_bin = os.path.join(self.firmware_dir, f"{device.device_id}.bin") - subprocess.run(["cp", bin_path, output_bin], check=True) - - print(f"✅ Firmware saved: {output_bin}") - return output_bin - - -class Flasher: - """Handles flashing ESP32 devices""" - - def __init__(self, project_path: str): - self.project_path = project_path - self.build_dir = os.path.join(project_path, "build") - - def flash(self, device: Device, bin_path: str): - """Flash a device with compiled firmware""" - print(f"\n{'='*60}") - print(f"🚀 Flashing: {device}") - print(f"{'='*60}") - - # Locate required files - bootloader = os.path.join(self.build_dir, "bootloader", "bootloader.bin") - partitions = os.path.join(self.build_dir, "partition_table", "partition-table.bin") - - # Check all files exist - if not os.path.exists(bootloader): - print(f"❌ Missing bootloader: {bootloader}") - return False - - if not os.path.exists(partitions): - print(f"❌ Missing partition table: {partitions}") - return False - - if not os.path.exists(bin_path): - print(f"❌ Missing application binary: {bin_path}") - return False - - print(f"📁 Bootloader: {bootloader}") - print(f"📁 Partitions: {partitions}") - print(f"📁 Application: {bin_path}") - print(f"🔌 Port: {device.port}") - - # Flash - try: - subprocess.run([ - "esptool.py", - "--chip", "esp32", - "--port", device.port, - "--baud", "460800", - "write_flash", "-z", - "0x1000", bootloader, - "0x8000", partitions, - "0x10000", bin_path - ], check=True) - - print(f"✅ Successfully flashed {device.device_id}") - return True - - except subprocess.CalledProcessError as e: - print(f"❌ Flash failed for {device.device_id}: {e}") - return False - except Exception as e: - print(f"❌ Unexpected error flashing {device.device_id}: {e}") - return False - - -def load_devices_from_config(config_path: str) -> tuple[str, List[Device]]: - """Load devices from JSON config file""" - if not os.path.exists(config_path): - raise FileNotFoundError(f"Config file not found: {config_path}") - - with open(config_path, "r") as f: - data = json.load(f) - - project_path = data.get("project") - if not project_path: - raise ValueError("Missing 'project' field in config") - - devices_data = data.get("devices", []) - devices = [Device.from_dict(d) for d in devices_data] - - return project_path, devices - - -def main(): - parser = argparse.ArgumentParser( - description="Epsilon ESP32 Multi-Device Flasher", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Flash all devices from config file - python flash.py --config devices.json - - # Flash a single device manually (WiFi) - python flash.py --manual \\ - --project /home/user/epsilon/espilon_bot \\ - --device-id abc12345 \\ - --port /dev/ttyUSB0 \\ - --srv-ip 192.168.1.100 \\ - --wifi-ssid MyWiFi \\ - --wifi-pass MyPassword - - # Flash a single device manually (GPRS) - python flash.py --manual \\ - --project /home/user/epsilon/espilon_bot \\ - --device-id def67890 \\ - --port /dev/ttyUSB1 \\ - --srv-ip 203.0.113.10 \\ - --network-mode gprs \\ - --gprs-apn sl2sfr - - # Build only (no flash) - python flash.py --config devices.json --build-only - """ - ) - - # Mode selection - mode_group = parser.add_mutually_exclusive_group(required=True) - mode_group.add_argument("--config", type=str, help="Path to devices.json config file") - mode_group.add_argument("--manual", action="store_true", help="Manual device configuration") - - # Manual device configuration - parser.add_argument("--project", type=str, help="Path to epsilon_bot project directory") - parser.add_argument("--device-id", type=str, help="Device ID (8 hex chars)") - parser.add_argument("--port", type=str, help="Serial port (e.g., /dev/ttyUSB0)") - parser.add_argument("--srv-ip", type=str, help="C2 server IP address") - parser.add_argument("--srv-port", type=int, default=2626, help="C2 server port (default: 2626)") - - # Network configuration - parser.add_argument("--network-mode", choices=["wifi", "gprs"], default="wifi", help="Network mode") - parser.add_argument("--wifi-ssid", type=str, help="WiFi SSID (required for WiFi mode)") - parser.add_argument("--wifi-pass", type=str, help="WiFi password (required for WiFi mode)") - parser.add_argument("--gprs-apn", type=str, default="sl2sfr", help="GPRS APN (default: sl2sfr)") - - # Optional settings - parser.add_argument("--hostname", type=str, help="Device hostname (random if not specified)") - # Crypto keys are now provisioned separately via provision.py - - # Modules - parser.add_argument("--enable-recon", action="store_true", help="Enable recon module") - parser.add_argument("--enable-fakeap", action="store_true", help="Enable fake AP module") - parser.add_argument("--enable-camera", action="store_true", help="Enable camera reconnaissance") - parser.add_argument("--enable-ble-trilat", action="store_true", help="Enable BLE trilateration") - - # Actions - parser.add_argument("--build-only", action="store_true", help="Build firmware without flashing") - parser.add_argument("--flash-only", action="store_true", help="Flash existing firmware without rebuilding") - - args = parser.parse_args() - - # Load devices - if args.config: - # Load from config file - project_path, devices = load_devices_from_config(args.config) - print(f"📋 Loaded {len(devices)} device(s) from {args.config}") - print(f"📂 Project: {project_path}") - - else: - # Manual device - required = ["project", "device_id", "port", "srv_ip"] - missing = [f for f in required if not getattr(args, f.replace("-", "_"))] - if missing: - parser.error(f"--manual mode requires: {', '.join(f'--{f}' for f in missing)}") - - # Validate WiFi requirements - if args.network_mode == "wifi" and (not args.wifi_ssid or not args.wifi_pass): - parser.error("WiFi mode requires --wifi-ssid and --wifi-pass") - - project_path = args.project - device = Device( - device_id=args.device_id, - port=args.port, - srv_ip=args.srv_ip, - srv_port=args.srv_port, - network_mode=args.network_mode, - wifi_ssid=args.wifi_ssid, - wifi_pass=args.wifi_pass, - gprs_apn=args.gprs_apn, - hostname=args.hostname, - module_network=True, - module_recon=args.enable_recon, - module_fakeap=args.enable_fakeap, - recon_camera=args.enable_camera, - recon_ble_trilat=args.enable_ble_trilat - ) - devices = [device] - print(f"📋 Manual device configuration: {device}") - - # Validate project path - if not os.path.exists(project_path): - print(f"❌ Project path not found: {project_path}") - return 1 - - # Initialize tools - builder = FirmwareBuilder(project_path) - flasher = Flasher(project_path) - - # Process each device - success_count = 0 - fail_count = 0 - - for i, device in enumerate(devices, 1): - print(f"\n{'#'*60}") - print(f"# Device {i}/{len(devices)}: {device.device_id}") - print(f"{'#'*60}") - - try: - # Build - if not args.flash_only: - bin_path = builder.build(device) - if not bin_path: - print(f"⚠️ Skipping flash for {device.device_id} (build failed)") - fail_count += 1 - continue - else: - # Use existing binary - bin_path = os.path.join(builder.firmware_dir, f"{device.device_id}.bin") - if not os.path.exists(bin_path): - print(f"❌ Firmware not found: {bin_path}") - fail_count += 1 - continue - - # Flash - if not args.build_only: - if flasher.flash(device, bin_path): - success_count += 1 - else: - fail_count += 1 - else: - print(f"ℹ️ Build-only mode, skipping flash") - success_count += 1 - - except Exception as e: - print(f"❌ Error processing {device.device_id}: {e}") - fail_count += 1 - - # Summary - print(f"\n{'='*60}") - print(f"📊 SUMMARY") - print(f"{'='*60}") - print(f"✅ Success: {success_count}/{len(devices)}") - print(f"❌ Failed: {fail_count}/{len(devices)}") - print(f"{'='*60}\n") - - return 0 if fail_count == 0 else 1 - - -if __name__ == "__main__": - exit(main()) diff --git a/tools/provisioning/devices/espilon-demo.csv b/tools/provisioning/devices/espilon-demo.csv deleted file mode 100644 index 5131ef3..0000000 --- a/tools/provisioning/devices/espilon-demo.csv +++ /dev/null @@ -1,3 +0,0 @@ -key,type,encoding,value -crypto,namespace,, -master_key,data,hex2bin,0d99c1e0e86eb289b51c7e11bf913feb5180b1d266aade18466a3ef591c4986c diff --git a/tools/provisioning/provision.py b/tools/provisioning/provision.py deleted file mode 100644 index 4afef39..0000000 --- a/tools/provisioning/provision.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python3 -""" -Epsilon Device Provisioning Tool - -Generates a unique 32-byte master key for an ESP32 device, -flashes it into the factory NVS partition, and registers it -in the C2 keystore. - -Usage: - python provision.py --device-id abc12345 --port /dev/ttyUSB0 - python provision.py --device-id abc12345 --port /dev/ttyUSB0 --keystore ../C3PO/keys.json -""" - -import argparse -import json -import os -import subprocess -import sys -import tempfile - -FCTRY_PARTITION_OFFSET = 0x10000 -FCTRY_PARTITION_SIZE = 0x6000 -NVS_NAMESPACE = "crypto" -NVS_KEY = "master_key" - - -def generate_master_key() -> bytes: - """Generate a cryptographically secure 32-byte master key.""" - return os.urandom(32) - - -def create_nvs_csv(master_key: bytes, csv_path: str) -> None: - """Create a CSV file for nvs_partition_gen with the master key.""" - with open(csv_path, "w") as f: - f.write("key,type,encoding,value\n") - f.write(f"{NVS_NAMESPACE},namespace,,\n") - f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n") - - -def generate_nvs_binary(csv_path: str, bin_path: str) -> bool: - """Generate NVS partition binary from CSV using nvs_partition_gen.""" - try: - result = subprocess.run( - [ - sys.executable, "-m", "esp_idf_nvs_partition_gen", - "generate", csv_path, bin_path, hex(FCTRY_PARTITION_SIZE), - ], - capture_output=True, - text=True, - timeout=30, - ) - if result.returncode != 0: - # Fallback: try the script directly from ESP-IDF - idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf")) - nvs_tool = os.path.join( - idf_path, "components", "nvs_flash", "nvs_partition_generator", - "nvs_partition_gen.py" - ) - if os.path.exists(nvs_tool): - result = subprocess.run( - [sys.executable, nvs_tool, "generate", - csv_path, bin_path, hex(FCTRY_PARTITION_SIZE)], - capture_output=True, - text=True, - timeout=30, - ) - if result.returncode != 0: - print(f"nvs_partition_gen failed:\n{result.stdout}\n{result.stderr}") - return False - return True - except FileNotFoundError: - print("Error: nvs_partition_gen not found. Ensure ESP-IDF is installed.") - return False - except subprocess.TimeoutExpired: - print("Error: nvs_partition_gen timed out") - return False - - -def flash_partition(port: str, bin_path: str, offset: int) -> bool: - """Flash a binary to the specified partition offset.""" - try: - subprocess.run( - [ - "esptool.py", - "--chip", "esp32", - "--port", port, - "--baud", "460800", - "write_flash", - hex(offset), bin_path, - ], - check=True, - timeout=60, - ) - return True - except subprocess.CalledProcessError as e: - print(f"Flash failed: {e}") - return False - except FileNotFoundError: - print("Error: esptool.py not found. Install with: pip install esptool") - return False - except subprocess.TimeoutExpired: - print("Error: flash timed out") - return False - - -def update_keystore(keystore_path: str, device_id: str, master_key: bytes) -> None: - """Add or update the device's master key in the C2 keystore.""" - keys = {} - if os.path.exists(keystore_path): - try: - with open(keystore_path, "r") as f: - keys = json.load(f) - except (json.JSONDecodeError, ValueError): - pass - - keys[device_id] = master_key.hex() - - with open(keystore_path, "w") as f: - json.dump(keys, f, indent=2) - - print(f"Keystore updated: {keystore_path}") - - -def main(): - parser = argparse.ArgumentParser( - description="Epsilon ESP32 Device Provisioning", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Provision a device and register in default keystore - python provision.py --device-id abc12345 --port /dev/ttyUSB0 - - # Provision with custom keystore path - python provision.py --device-id abc12345 --port /dev/ttyUSB0 \\ - --keystore ../C3PO/keys.json - - # Generate key only (no flash) - python provision.py --device-id abc12345 --no-flash - """, - ) - - parser.add_argument("--device-id", required=True, help="Device ID") - parser.add_argument("--port", help="Serial port (e.g., /dev/ttyUSB0)") - parser.add_argument( - "--keystore", - default=os.path.join(os.path.dirname(__file__), "..", "C3PO", "keys.json"), - help="Path to C2 keystore JSON (default: ../C3PO/keys.json)", - ) - parser.add_argument("--no-flash", action="store_true", - help="Generate key and update keystore without flashing") - parser.add_argument("--key", help="Use a specific hex-encoded 32-byte key instead of random") - - args = parser.parse_args() - - if not args.no_flash and not args.port: - parser.error("--port is required unless --no-flash is specified") - - print(f"{'='*50}") - print(f" Epsilon Device Provisioning") - print(f"{'='*50}") - print(f" Device ID : {args.device_id}") - print(f" Port : {args.port or 'N/A (no-flash)'}") - print(f" Keystore : {args.keystore}") - print(f"{'='*50}") - - # 1) Generate or parse master key - if args.key: - try: - master_key = bytes.fromhex(args.key) - if len(master_key) != 32: - print(f"Error: --key must be 32 bytes (64 hex chars), got {len(master_key)}") - return 1 - except ValueError: - print("Error: --key must be valid hex") - return 1 - print(f" Using provided key: {master_key.hex()[:16]}...") - else: - master_key = generate_master_key() - print(f" Generated key : {master_key.hex()[:16]}...") - - # 2) Flash to device if requested - if not args.no_flash: - with tempfile.TemporaryDirectory() as tmpdir: - csv_path = os.path.join(tmpdir, "fctry.csv") - bin_path = os.path.join(tmpdir, "fctry.bin") - - print("\nGenerating NVS binary...") - create_nvs_csv(master_key, csv_path) - - if not generate_nvs_binary(csv_path, bin_path): - print("Failed to generate NVS binary") - return 1 - - print(f"Flashing factory NVS to {args.port} at {hex(FCTRY_PARTITION_OFFSET)}...") - if not flash_partition(args.port, bin_path, FCTRY_PARTITION_OFFSET): - print("Failed to flash factory NVS") - return 1 - - print("Factory NVS flashed successfully") - - # 3) Update keystore - keystore_path = os.path.abspath(args.keystore) - update_keystore(keystore_path, args.device_id, master_key) - - print(f"\n{'='*50}") - print(f" Provisioning complete for {args.device_id}") - print(f"{'='*50}") - return 0 - - -if __name__ == "__main__": - sys.exit(main())