tools: replace flasher/provisioning with unified deploy system + espmon

Add deploy.py: multi-device build/flash/provision with JSON config.
Add espmon/: real-time ESP32 fleet monitoring daemon.
Remove tools/flasher/ and tools/provisioning/ (superseded).
This commit is contained in:
Eun0us 2026-02-28 20:15:57 +01:00
parent 79c2a4d4bf
commit 12b851581a
16 changed files with 2072 additions and 1376 deletions

View File

@ -1,197 +1,168 @@
# Epsilon Tools
This directory contains tools for managing and deploying Epsilon ESP32 agents.
## C2 Server (C3PO/)
The C2 (Command & Control) server manages communication with deployed ESP32 agents.
### C3PO - Main C2 Server
**C3PO** is the primary C2 server used to control Epsilon bots.
Features:
- Threaded TCP server (sockets + threads)
- Device registry and management with per-device crypto
- Group-based device organization
- Encrypted communications (ChaCha20-Poly1305 AEAD + HKDF key derivation)
- Per-device master key keystore (`keys.json`)
- Interactive CLI interface
- Optional TUI (Textual) and Web dashboard
- Camera UDP receiver + MLAT support
- Command dispatching to individual devices, groups, or all
See [C3PO/README.md](C3PO/README.md) for complete C2 documentation.
Quick start:
```bash
cd C3PO
python3 c3po.py
```
tools/
deploy.py Unified build, provision & flash pipeline
deploy.example.json Batch config template
C3PO/ C2 server (TCP + TUI + Web)
nanoPB/ Protobuf definitions (c2.proto)
```
Authors: **@off-path**, **@eun0us**
## Prerequisites
## Multi-Device Flasher (flasher/)
- Python 3.8+
- ESP-IDF v5.3.2 (`source ~/esp-idf/export.sh`)
- esptool (`pip install esptool`)
The **flasher** tool automates building and flashing multiple ESP32 devices with custom configurations.
---
### Features
## Deploy (`deploy.py`)
- Batch processing of multiple devices
- Support for WiFi and GPRS modes
- Per-device configuration (ID, network, modules)
- Automatic hostname randomization
- Build-only and flash-only modes
- Full module configuration (Network, Recon, FakeAP)
Single pipeline: **build** firmware, **generate** crypto keys, **provision** factory NVS, **flash** all partitions, **register** keys in C2 keystore.
### Quick Start
### Usage
1. Edit [flasher/devices.json](flasher/devices.json):
```bash
# Interactive wizard
python deploy.py
# Single device (WiFi + OTA)
python deploy.py -p /dev/ttyUSB0 -d ce4f626b \
--wifi MySSID MyPass --srv 192.168.1.51
# Single device (WiFi, no OTA)
python deploy.py -p /dev/ttyUSB0 -d a91dd021 --no-ota \
--wifi MySSID MyPass --srv 192.168.1.51
# Batch deploy
python deploy.py --config deploy.example.json
```
### Modes
| Flag | Effect |
|------|--------|
| *(default)* | Full pipeline: build + provision + flash |
| `--provision-only` | Generate key + flash factory NVS only |
| `--flash-only` | Flash existing build (skip rebuild) |
| `--build-only` | Build firmware only (no flash) |
| `--erase` | Erase entire flash before writing |
| `--monitor` | Open serial monitor after flash |
### OTA vs Non-OTA
| | OTA | Non-OTA (`--no-ota`) |
|---|---|---|
| Partition table | `partitions.csv` | `partitions_noota.csv` |
| App partitions | 2 x 1.875 MB (ota_0/ota_1) | 1 x 3.875 MB (factory) |
| Firmware updates | HTTPS OTA | Manual reflash |
| mbedTLS cert bundle | Yes | No |
### Flash Map
```
Offset OTA Non-OTA
------ --- -------
0x1000 bootloader bootloader
0x8000 partition table partition table
0xD000 ota data --
0x10000 factory NVS (key) factory NVS (key)
0x20000 app (ota_0) app (factory, 3.875 MB)
```
### Batch Config
`deploy.example.json`:
```json
{
"project": "/home/user/epsilon/espilon_bot",
"devices": [
{
"device_id": "ce4f626b",
"port": "/dev/ttyUSB0",
"srv_ip": "192.168.1.13",
"srv_port": 2626,
"network_mode": "wifi",
"wifi_ssid": "YourWiFi",
"wifi_pass": "YourPassword",
"module_network": true,
"module_recon": false,
"module_fakeap": false
}
]
}
{
"devices": [
{
"device_id": "ce4f626b",
"port": "/dev/ttyUSB0",
"srv_ip": "192.168.1.51",
"srv_port": 2626,
"network_mode": "wifi",
"wifi_ssid": "MyWiFi",
"wifi_pass": "MyPassword",
"module_network": true,
"ota_enabled": true
},
{
"device_id": "a91dd021",
"port": "/dev/ttyUSB1",
"srv_ip": "192.168.1.51",
"srv_port": 2626,
"network_mode": "wifi",
"wifi_ssid": "MyWiFi",
"wifi_pass": "MyPassword",
"module_network": true,
"module_fakeap": true,
"ota_enabled": false
}
]
}
```
2. Flash all devices:
### Config Fields
| Field | Default | Description |
|-------|---------|-------------|
| `device_id` | random | 8 hex chars unique ID |
| `port` | -- | Serial port |
| `srv_ip` | `192.168.1.100` | C2 server IP |
| `srv_port` | `2626` | C2 server port |
| `network_mode` | `wifi` | `wifi` or `gprs` |
| `wifi_ssid` | -- | WiFi SSID |
| `wifi_pass` | -- | WiFi password |
| `gprs_apn` | `sl2sfr` | GPRS APN |
| `hostname` | random | Device hostname on network |
| `module_network` | `true` | ping, arp, proxy, dos |
| `module_fakeap` | `false` | Fake AP, captive portal, sniffer |
| `module_recon` | `false` | Reconnaissance |
| `recon_camera` | `false` | ESP32-CAM |
| `recon_ble_trilat` | `false` | BLE trilateration |
| `ota_enabled` | `true` | OTA firmware updates |
| `master_key` | random | 64 hex chars (override auto-gen) |
### Crypto
Each deploy generates a **256-bit master key** per device:
1. Random 32-byte key generated (or provided via `--key`)
2. Written to factory NVS (`fctry` @ `0x10000`, namespace `crypto`, key `master_key`)
3. Registered in `C3PO/keys.json`
4. On boot, firmware derives encryption key via **HKDF-SHA256** (salt=device_id, info=`espilon-c2-v1`)
5. All C2 traffic encrypted with **ChaCha20-Poly1305 AEAD** (12-byte nonce + 16-byte tag)
---
## C2 Server (`C3PO/`)
Command & Control server for deployed ESP32 agents.
- Threaded TCP server with per-device encrypted communications
- ChaCha20-Poly1305 AEAD + HKDF key derivation
- Device registry + master key keystore (`keys.json`)
- TUI (Textual) + Web dashboard
- Camera UDP receiver + MLAT support
- Command dispatch: single device, group, or broadcast
```bash
cd flasher
python3 flash.py --config devices.json
cd C3PO && python3 c3po.py
```
### Configuration Options
See [C3PO/README.md](C3PO/README.md) for details.
Each device supports:
---
| Field | Description |
|-------|-------------|
| `device_id` | Unique device identifier (8 hex chars) |
| `port` | Serial port (e.g., `/dev/ttyUSB0`) |
| `srv_ip` | C2 server IP address |
| `srv_port` | C2 server port (default: 2626) |
| `network_mode` | `"wifi"` or `"gprs"` |
| `wifi_ssid` | WiFi SSID (WiFi mode) |
| `wifi_pass` | WiFi password (WiFi mode) |
| `gprs_apn` | GPRS APN (GPRS mode, default: "sl2sfr") |
| `hostname` | Network hostname (random if not set) |
| `module_network` | Enable network commands (default: true) |
| `module_recon` | Enable reconnaissance module |
| `module_fakeap` | Enable fake AP module |
| `recon_camera` | Enable camera reconnaissance (ESP32-CAM) |
| `recon_ble_trilat` | Enable BLE trilateration |
## Proto Definitions (`nanoPB/`)
> **Note**: Crypto keys are no longer configured here. Each device must be provisioned with a unique master key using `tools/provisioning/provision.py`.
nanoPB protobuf definitions for ESP32 <-> C2 wire protocol.
### Hostname Randomization
- `c2.proto` -- `Command` and `AgentMessage` messages
- `c2.options` -- nanoPB field size constraints
The flasher automatically randomizes device hostnames to blend in on networks:
---
- iPhone models (iPhone-15-pro-max, iPhone-14, etc.)
- Android devices (galaxy-s24-ultra, pixel-8-pro, xiaomi-14, etc.)
- Windows PCs (DESKTOP-XXXXXXX)
This helps devices appear as legitimate consumer electronics during authorized security testing.
### Manual Mode
Flash a single device without a config file:
```bash
# WiFi mode
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id abc12345 \
--port /dev/ttyUSB0 \
--srv-ip 192.168.1.100 \
--wifi-ssid MyWiFi \
--wifi-pass MyPassword
# GPRS mode
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id def67890 \
--port /dev/ttyUSB1 \
--srv-ip 203.0.113.10 \
--network-mode gprs \
--gprs-apn sl2sfr
```
### Build-Only Mode
Generate firmware without flashing:
```bash
python3 flash.py --config devices.json --build-only
```
Firmware saved to: `espilon_bot/firmware/<device_id>.bin`
### Flash-Only Mode
Flash pre-built firmware:
```bash
python3 flash.py --config devices.json --flash-only
```
See [flasher/README.md](flasher/README.md) for complete documentation.
## Device Provisioning (provisioning/)
The **provisioning** tool generates and flashes unique per-device master keys into factory NVS partitions.
### Features
- Generates 32-byte random master keys (cryptographically secure)
- Creates NVS binary for factory partition (`fctry` at offset 0x10000)
- Saves keys to C2 keystore (`keys.json`) for automatic lookup
- Supports flashing directly to connected ESP32
### Quick Start
```bash
cd provisioning
python3 provision.py --device-id my-device --port /dev/ttyUSB0
```
The master key is used by the firmware with HKDF-SHA256 to derive encryption keys for ChaCha20-Poly1305 AEAD.
## NanoPB Tools (nan/)
Tools for Protocol Buffers (nanoPB) code generation for the embedded communication protocol.
Used during development to regenerate Protocol Buffer bindings for ESP32 and Python.
## Additional Resources
- [Installation Guide](../docs/INSTALL.md) - Full Epsilon setup
- [Hardware Guide](../docs/HARDWARE.md) - Supported boards
- [Module API](../docs/MODULES.md) - Available commands
- [Protocol Specification](../docs/PROTOCOL.md) - C2 protocol details
- [Security](../docs/SECURITY.md) - Security best practices
## Contributing
See [CONTRIBUTING.md](../CONTRIBUTING.md) for guidelines on contributing to Epsilon tools.
## License
Part of the Epsilon project. See [LICENSE](../LICENSE) for details.
**Authors:** @off-path, @eun0us

994
tools/deploy.py Executable file
View File

@ -0,0 +1,994 @@
#!/usr/bin/env python3
"""
Epsilon Deploy - Unified build, provision & flash pipeline for ESP32.
Combines firmware building, crypto key provisioning, and flashing
into a single automated workflow with correct partition offsets.
Usage:
python deploy.py # Interactive wizard
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100 # Quick CLI deploy
python deploy.py --config deploy.json # Batch from config
python deploy.py --provision-only -p /dev/ttyUSB0 -d x # Keys only
"""
import argparse
import json
import os
import secrets
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from glob import glob as _glob
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ─── Paths ────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_DIR = SCRIPT_DIR.parent / "espilon_bot"
KEYSTORE_DEFAULT = SCRIPT_DIR / "C3PO" / "keys.json"
# ─── Partition layout (must match partitions.csv) ─────────────────────────────
CHIP = "esp32"
BAUD = 460800
ADDR_BOOTLOADER = 0x1000
ADDR_PTABLE = 0x8000
ADDR_OTADATA = 0xD000
ADDR_FCTRY = 0x10000
ADDR_OTA_0 = 0x20000
FCTRY_SIZE = 0x6000 # 24 KB
NVS_NAMESPACE = "crypto"
NVS_KEY = "master_key"
KEY_LEN = 32 # 256-bit master key
# ─── Terminal helpers ─────────────────────────────────────────────────────────
class C:
RST = "\033[0m"
B = "\033[1m"
DIM = "\033[2m"
RED = "\033[91m"
GRN = "\033[92m"
YLW = "\033[93m"
BLU = "\033[94m"
CYN = "\033[96m"
def _tag(color: str, tag: str, msg: str):
print(f" {color}{C.B}[{tag}]{C.RST} {msg}")
def ok(m): _tag(C.GRN, " OK ", m)
def info(m): _tag(C.BLU, " .. ", m)
def warn(m): _tag(C.YLW, " !! ", m)
def err(m): _tag(C.RED, " XX ", m)
def step_header(n, m):
pad = max(0, 42 - len(m))
print(f"\n {C.CYN}{C.B}--- Step {n}: {m} {'-' * pad}{C.RST}\n")
def banner():
print(f"""\n{C.CYN}{C.B}\
+==================================================+
| E Epsilon Deploy Tool |
| Build - Provision - Flash - Verify |
+==================================================+{C.RST}\n""")
def box(lines: List[str]):
w = max((len(l) for l in lines), default=0) + 4
print(f" {C.DIM}+{'-' * w}+{C.RST}")
for l in lines:
print(f" {C.DIM}|{C.RST} {l}{' ' * (w - len(l) - 2)}{C.DIM}|{C.RST}")
print(f" {C.DIM}+{'-' * w}+{C.RST}")
# ─── Device Config ────────────────────────────────────────────────────────────
@dataclass
class DeviceConfig:
device_id: str = ""
port: str = ""
srv_ip: str = "192.168.1.100"
srv_port: int = 2626
network_mode: str = "wifi" # "wifi" or "gprs"
wifi_ssid: str = ""
wifi_pass: str = ""
gprs_apn: str = "sl2sfr"
hostname: str = ""
mod_network: bool = True
mod_recon: bool = False
mod_fakeap: bool = False
mod_honeypot: bool = False
mod_canbus: bool = False
mod_fallback: bool = False
mod_redteam: bool = False
recon_camera: bool = False
recon_ble_trilat: bool = False
ota_enabled: bool = True
ota_allow_http: bool = False
master_key_hex: str = "" # empty = generate random
# ══════════════════════════════════════════════════════════════════════════════
# Pipeline Steps
# ══════════════════════════════════════════════════════════════════════════════
# ─── 1. Environment Check ────────────────────────────────────────────────────
def check_environment() -> str:
"""Validate ESP-IDF and tools. Returns idf_path."""
# Find IDF
idf_path = os.environ.get("IDF_PATH", "")
if not idf_path or not os.path.isdir(idf_path):
for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]:
if os.path.isdir(candidate):
idf_path = candidate
break
if not idf_path or not os.path.isdir(idf_path):
err("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.")
sys.exit(1)
ok(f"ESP-IDF : {idf_path}")
# Find esptool
esptool = shutil.which("esptool.py") or shutil.which("esptool")
if esptool:
ok(f"esptool : {esptool}")
else:
warn("esptool.py not in PATH -- will try via IDF export.sh")
# Verify project directory
if not PROJECT_DIR.is_dir():
err(f"Project directory not found: {PROJECT_DIR}")
sys.exit(1)
ok(f"Project : {PROJECT_DIR}")
return idf_path
# ─── 2. sdkconfig Generation ─────────────────────────────────────────────────
def generate_sdkconfig(cfg: DeviceConfig) -> str:
"""Generate sdkconfig.defaults content for a device."""
lines = [
"# Generated by epsilon deploy -- do not edit manually",
"",
"# Device",
f'CONFIG_DEVICE_ID="{cfg.device_id}"',
"",
"# Network",
]
if cfg.network_mode == "wifi":
lines += [
"CONFIG_NETWORK_WIFI=y",
f'CONFIG_WIFI_SSID="{cfg.wifi_ssid}"',
f'CONFIG_WIFI_PASS="{cfg.wifi_pass}"',
]
else:
lines += [
"CONFIG_NETWORK_GPRS=y",
f'CONFIG_GPRS_APN="{cfg.gprs_apn}"',
]
lines += [
"",
"# C2 Server",
f'CONFIG_SERVER_IP="{cfg.srv_ip}"',
f"CONFIG_SERVER_PORT={cfg.srv_port}",
"",
"# Crypto -- ChaCha20-Poly1305 + HKDF (mbedtls, ESP-IDF v5.3)",
'CONFIG_CRYPTO_FCTRY_NS="crypto"',
'CONFIG_CRYPTO_FCTRY_KEY="master_key"',
"CONFIG_MBEDTLS_CHACHA20_C=y",
"CONFIG_MBEDTLS_POLY1305_C=y",
"CONFIG_MBEDTLS_CHACHAPOLY_C=y",
"CONFIG_MBEDTLS_HKDF_C=y",
"",
"# Flash & Partitions",
"CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y",
"CONFIG_PARTITION_TABLE_CUSTOM=y",
f'CONFIG_PARTITION_TABLE_CUSTOM_FILENAME='
f'"{("partitions.csv" if cfg.ota_enabled else "partitions_noota.csv")}"',
"",
"# LWIP",
"CONFIG_LWIP_IPV4_NAPT=y",
"CONFIG_LWIP_IPV4_NAPT_PORTMAP=y",
"CONFIG_LWIP_IP_FORWARD=y",
]
if cfg.hostname:
lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{cfg.hostname}"')
lines += [
"",
"# Modules",
f'CONFIG_MODULE_NETWORK={"y" if cfg.mod_network else "n"}',
f'CONFIG_MODULE_RECON={"y" if cfg.mod_recon else "n"}',
f'CONFIG_MODULE_FAKEAP={"y" if cfg.mod_fakeap else "n"}',
f'CONFIG_MODULE_HONEYPOT={"y" if cfg.mod_honeypot else "n"}',
f'CONFIG_MODULE_CANBUS={"y" if cfg.mod_canbus else "n"}',
f'CONFIG_MODULE_FALLBACK={"y" if cfg.mod_fallback else "n"}',
f'CONFIG_MODULE_REDTEAM={"y" if cfg.mod_redteam else "n"}',
]
if cfg.mod_recon:
lines += [
f'CONFIG_RECON_MODE_CAMERA={"y" if cfg.recon_camera else "n"}',
f'CONFIG_RECON_MODE_MLAT={"y" if cfg.recon_ble_trilat else "n"}',
]
if cfg.recon_camera:
lines += [
"",
"# PSRAM (required for camera frame buffers)",
"CONFIG_SPIRAM=y",
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY=y",
]
if cfg.recon_ble_trilat:
lines += ["", "# BLE", "CONFIG_BT_ENABLED=y",
"CONFIG_BT_BLUEDROID_ENABLED=y", "CONFIG_BT_BLE_ENABLED=y"]
lines += [
"",
"# OTA",
f'CONFIG_ESPILON_OTA_ENABLED={"y" if cfg.ota_enabled else "n"}',
]
if cfg.ota_enabled:
if cfg.ota_allow_http:
lines += [
f'CONFIG_ESPILON_OTA_ALLOW_HTTP=y',
"CONFIG_ESP_HTTPS_OTA_ALLOW_HTTP=y",
]
else:
lines += [
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE=y",
"CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_FULL=y",
]
lines += [
"",
"# Logging",
"CONFIG_ESPILON_LOG_LEVEL_INFO=y",
"CONFIG_ESPILON_LOG_BOOT_SUMMARY=y",
]
return "\n".join(lines) + "\n"
def write_sdkconfig(cfg: DeviceConfig):
"""Write sdkconfig.defaults, backing up the existing one."""
sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults"
if sdkconfig_path.exists():
backup = sdkconfig_path.with_suffix(".defaults.bak")
shutil.copy2(sdkconfig_path, backup)
info(f"Backed up existing config -> {backup.name}")
content = generate_sdkconfig(cfg)
with open(sdkconfig_path, "w") as f:
f.write(content)
ok(f"Generated sdkconfig.defaults for {cfg.device_id}")
# ─── 3. Build Firmware ───────────────────────────────────────────────────────
def find_app_binary(build_dir: Path) -> str:
"""Find the application binary in the build directory."""
desc_file = build_dir / "project_description.json"
if desc_file.exists():
with open(desc_file) as f:
desc = json.load(f)
app_bin = desc.get("app_bin", "")
if app_bin and os.path.exists(app_bin):
return app_bin
name = desc.get("project_name", "bot-lwip")
candidate = str(build_dir / f"{name}.bin")
if os.path.exists(candidate):
return candidate
# Fallback: try known names
for name in ["bot-lwip.bin", "epsilon_bot.bin"]:
p = str(build_dir / name)
if os.path.exists(p):
return p
err("Cannot find application binary in build/")
sys.exit(1)
def build_firmware(idf_path: str) -> Dict[str, str]:
"""Build firmware via idf.py. Returns dict of binary paths."""
# Full clean: remove sdkconfig AND build dir to avoid stale CMake cache
# (required when switching between OTA/non-OTA configs)
sdkconfig = PROJECT_DIR / "sdkconfig"
build_dir = PROJECT_DIR / "build"
if sdkconfig.exists():
sdkconfig.unlink()
if build_dir.exists():
shutil.rmtree(build_dir)
info("Cleaned build directory")
info("Running idf.py build (this may take several minutes)...")
cmd = (
f". {idf_path}/export.sh > /dev/null 2>&1 && "
f"idf.py -C {PROJECT_DIR} "
f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults "
f"build 2>&1"
)
result = subprocess.run(
["bash", "-c", cmd],
capture_output=True, text=True, timeout=600,
)
if result.returncode != 0:
err("Build failed!")
lines = result.stdout.strip().split("\n")
for line in lines[-50:]:
print(f" {C.DIM}{line}{C.RST}")
sys.exit(1)
ok("Build succeeded")
return locate_binaries()
def locate_binaries() -> Dict[str, str]:
"""Locate all required binaries in build/."""
build_dir = PROJECT_DIR / "build"
bins: Dict[str, str] = {
"bootloader": str(build_dir / "bootloader" / "bootloader.bin"),
"partition_table": str(build_dir / "partition_table" / "partition-table.bin"),
"app": find_app_binary(build_dir),
}
ota_data = build_dir / "ota_data_initial.bin"
if ota_data.exists():
bins["otadata"] = str(ota_data)
for name, path in bins.items():
if not os.path.exists(path):
err(f"Missing build artifact: {name} -> {path}")
sys.exit(1)
size = os.path.getsize(path)
ok(f"{name:16s} {path} ({size:,} bytes)")
return bins
# ─── 4. Key Generation ───────────────────────────────────────────────────────
def generate_master_key(cfg: DeviceConfig) -> bytes:
"""Generate or parse the 32-byte master key."""
if cfg.master_key_hex:
try:
key = bytes.fromhex(cfg.master_key_hex)
except ValueError:
err("--key must be valid hex")
sys.exit(1)
if len(key) != KEY_LEN:
err(f"Master key must be {KEY_LEN} bytes ({KEY_LEN * 2} hex chars), "
f"got {len(key)}")
sys.exit(1)
info(f"Using provided master key: {key.hex()[:16]}...")
else:
key = secrets.token_bytes(KEY_LEN)
ok(f"Generated master key: {key.hex()[:16]}...")
return key
# ─── 5. NVS Binary Generation ────────────────────────────────────────────────
def generate_nvs_binary(master_key: bytes, tmpdir: str) -> str:
"""Generate factory NVS partition binary from master key."""
csv_path = os.path.join(tmpdir, "fctry.csv")
bin_path = os.path.join(tmpdir, "fctry.bin")
with open(csv_path, "w") as f:
f.write("key,type,encoding,value\n")
f.write(f"{NVS_NAMESPACE},namespace,,\n")
f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n")
# Try the Python module (pip install esp-idf-nvs-partition-gen)
generated = False
try:
r = subprocess.run(
[sys.executable, "-m", "esp_idf_nvs_partition_gen",
"generate", csv_path, bin_path, hex(FCTRY_SIZE)],
capture_output=True, text=True, timeout=30,
)
generated = r.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Fallback: IDF script
if not generated:
idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf"))
nvs_tool = os.path.join(
idf_path, "components", "nvs_flash",
"nvs_partition_generator", "nvs_partition_gen.py",
)
if os.path.exists(nvs_tool):
r = subprocess.run(
[sys.executable, nvs_tool, "generate",
csv_path, bin_path, hex(FCTRY_SIZE)],
capture_output=True, text=True, timeout=30,
)
generated = r.returncode == 0
if not generated:
err("Failed to generate NVS binary. "
"Install: pip install esp-idf-nvs-partition-gen")
sys.exit(1)
size = os.path.getsize(bin_path)
ok(f"NVS binary generated: {size:,} bytes")
return bin_path
# ─── 6. Flash ────────────────────────────────────────────────────────────────
def esptool_cmd(port: str) -> List[str]:
"""Base esptool command."""
return ["esptool.py", "--chip", CHIP, "--port", port, "--baud", str(BAUD)]
def flash_erase(port: str):
"""Full flash erase."""
info("Erasing entire flash...")
try:
subprocess.run(esptool_cmd(port) + ["erase_flash"],
check=True, timeout=60)
ok("Flash erased")
except subprocess.CalledProcessError as e:
err(f"Erase failed: {e}")
sys.exit(1)
def flash_all(cfg: DeviceConfig, binaries: Dict[str, str], nvs_bin: str,
erase: bool = False):
"""Flash bootloader + partition table + factory NVS + app (+ otadata)."""
if erase:
flash_erase(cfg.port)
# Build flash map
flash_map = [
(ADDR_BOOTLOADER, binaries["bootloader"], "Bootloader"),
(ADDR_PTABLE, binaries["partition_table"], "Partition table"),
(ADDR_FCTRY, nvs_bin, "Factory NVS (master key)"),
(ADDR_OTA_0, binaries["app"], "Application (ota_0)"),
]
if "otadata" in binaries:
flash_map.append((ADDR_OTADATA, binaries["otadata"], "OTA data"))
# Display map
box([f"{label:25s} @ {hex(addr)}" for addr, _, label in flash_map])
# Build esptool args
cmd = esptool_cmd(cfg.port) + ["write_flash", "-z"]
for addr, path, _ in flash_map:
cmd.extend([hex(addr), path])
info("Flashing all partitions...")
try:
subprocess.run(cmd, check=True, timeout=120)
ok("All partitions flashed successfully")
except subprocess.CalledProcessError as e:
err(f"Flash failed: {e}")
sys.exit(1)
except FileNotFoundError:
err("esptool.py not found. Install: pip install esptool")
sys.exit(1)
def flash_fctry_only(port: str, nvs_bin: str):
"""Flash only the factory NVS partition (provision-only mode)."""
info(f"Flashing factory NVS to {port} at {hex(ADDR_FCTRY)}...")
try:
subprocess.run(
esptool_cmd(port) + ["write_flash", hex(ADDR_FCTRY), nvs_bin],
check=True, timeout=60,
)
ok("Factory NVS flashed")
except subprocess.CalledProcessError as e:
err(f"Flash failed: {e}")
sys.exit(1)
# ─── 7. Keystore ─────────────────────────────────────────────────────────────
def update_keystore(device_id: str, master_key: bytes, keystore_path: str):
"""Add/update device key in the C2 keystore (keys.json)."""
keys: Dict[str, str] = {}
if os.path.exists(keystore_path):
try:
with open(keystore_path) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
warn(f"Corrupted keystore, starting fresh: {keystore_path}")
keys[device_id] = master_key.hex()
os.makedirs(os.path.dirname(os.path.abspath(keystore_path)), exist_ok=True)
with open(keystore_path, "w") as f:
json.dump(keys, f, indent=2)
ok(f"Keystore updated: {keystore_path}")
ok(f" {device_id} -> {master_key.hex()[:16]}...")
# ─── 8. Serial Monitor ───────────────────────────────────────────────────────
def monitor_serial(port: str, idf_path: str):
"""Launch idf.py monitor for post-flash verification."""
info(f"Starting serial monitor on {port} (Ctrl+] to exit)")
try:
subprocess.run(
["bash", "-c",
f". {idf_path}/export.sh > /dev/null 2>&1 && "
f"idf.py -C {PROJECT_DIR} -p {port} monitor"],
)
except KeyboardInterrupt:
print()
ok("Monitor closed")
# ══════════════════════════════════════════════════════════════════════════════
# Interactive Wizard
# ══════════════════════════════════════════════════════════════════════════════
def detect_serial_ports() -> List[str]:
"""Detect available serial ports on Linux."""
return sorted(
_glob("/dev/ttyUSB*") + _glob("/dev/ttyACM*")
)
def ask(msg: str, default: str = "") -> str:
"""Prompt with optional default."""
if default:
val = input(f" {C.B}{msg}{C.RST} [{C.DIM}{default}{C.RST}]: ").strip()
return val or default
return input(f" {C.B}{msg}{C.RST}: ").strip()
def ask_yn(msg: str, default: bool = True) -> bool:
"""Yes/no prompt."""
hint = "Y/n" if default else "y/N"
val = input(f" {C.B}{msg}{C.RST} [{hint}]: ").strip().lower()
if not val:
return default
return val in ("y", "yes", "o", "oui")
def ask_choice(msg: str, choices: List[str], default: int = 0) -> int:
"""Numbered choice prompt."""
print(f"\n {C.B}{msg}{C.RST}")
for i, c in enumerate(choices):
marker = f"{C.GRN}>{C.RST}" if i == default else " "
print(f" {marker} {i + 1}) {c}")
val = input(f" Choice [{default + 1}]: ").strip()
if not val:
return default
try:
idx = int(val) - 1
if 0 <= idx < len(choices):
return idx
except ValueError:
pass
return default
def interactive_wizard() -> DeviceConfig:
"""Guided interactive setup."""
cfg = DeviceConfig()
print(f"\n {C.B}{C.CYN}--- Device ---{C.RST}\n")
# Port
ports = detect_serial_ports()
if ports:
info(f"Detected ports: {', '.join(ports)}")
cfg.port = ask("Serial port", ports[0])
else:
cfg.port = ask("Serial port (e.g. /dev/ttyUSB0)")
# Device ID
rand_id = secrets.token_hex(4)
cfg.device_id = ask("Device ID (8 hex chars)", rand_id)
# Network
mode = ask_choice("Network mode:", ["WiFi", "GPRS"], 0)
cfg.network_mode = "wifi" if mode == 0 else "gprs"
if cfg.network_mode == "wifi":
cfg.wifi_ssid = ask("WiFi SSID")
cfg.wifi_pass = ask("WiFi password")
else:
cfg.gprs_apn = ask("GPRS APN", "sl2sfr")
# Hostname
cfg.hostname = ask("Device hostname (empty = random)", "")
print(f"\n {C.B}{C.CYN}--- C2 Server ---{C.RST}\n")
cfg.srv_ip = ask("Server IP", "192.168.1.100")
cfg.srv_port = int(ask("Server port", "2626"))
print(f"\n {C.B}{C.CYN}--- Modules ---{C.RST}\n")
cfg.mod_network = ask_yn("Enable network module (ping, arp, proxy, dos)?", True)
cfg.mod_fakeap = ask_yn("Enable fakeAP module (captive portal, sniffer)?", False)
cfg.mod_honeypot = ask_yn("Enable honeypot module (SSH/Telnet/HTTP/FTP, WiFi/net monitor)?", False)
cfg.mod_recon = ask_yn("Enable recon module?", False)
if cfg.mod_recon:
cfg.recon_camera = ask_yn(" Enable camera?", False)
cfg.recon_ble_trilat = ask_yn(" Enable BLE trilateration?", False)
cfg.mod_fallback = ask_yn("Enable fallback module (autonomous WiFi recovery)?", False)
cfg.ota_enabled = ask_yn("Enable OTA updates?", True)
print(f"\n {C.B}{C.CYN}--- Security ---{C.RST}\n")
if ask_yn("Generate new random 256-bit master key?", True):
cfg.master_key_hex = ""
else:
cfg.master_key_hex = ask("Master key (64 hex chars)")
return cfg
# ══════════════════════════════════════════════════════════════════════════════
# Deploy Pipeline
# ══════════════════════════════════════════════════════════════════════════════
def deploy_one(cfg: DeviceConfig, *,
build: bool = True,
flash: bool = True,
provision_only: bool = False,
erase: bool = False,
monitor: bool = False,
keystore_path: str = ""):
"""Full deploy pipeline for a single device."""
ks_path = keystore_path or str(KEYSTORE_DEFAULT)
# Summary
net_info = (f"{cfg.network_mode.upper()} "
+ (f"({cfg.wifi_ssid})" if cfg.network_mode == "wifi"
else f"({cfg.gprs_apn})"))
mods = ", ".join(filter(None, [
"network" if cfg.mod_network else None,
"fakeAP" if cfg.mod_fakeap else None,
"honeypot" if cfg.mod_honeypot else None,
"recon" if cfg.mod_recon else None,
"fallback" if cfg.mod_fallback else None,
"redteam" if cfg.mod_redteam else None,
"canbus" if cfg.mod_canbus else None,
])) or "none"
box([
f"Device ID : {cfg.device_id}",
f"Port : {cfg.port}",
f"Network : {net_info}",
f"C2 Server : {cfg.srv_ip}:{cfg.srv_port}",
f"Modules : {mods}",
f"OTA : {'yes' if cfg.ota_enabled else 'no'}",
f"Erase : {'yes' if erase else 'no'}",
f"Mode : {'provision-only' if provision_only else 'full deploy'}",
])
with tempfile.TemporaryDirectory(prefix="epsilon-deploy-") as tmpdir:
# ── Step 1: Environment ──
step_header(1, "Environment Check")
idf_path = check_environment()
binaries: Dict[str, str] = {}
if not provision_only:
# ── Step 2: sdkconfig ──
step_header(2, "Generate Configuration")
write_sdkconfig(cfg)
# ── Step 3: Build or locate ──
if build:
step_header(3, "Build Firmware")
binaries = build_firmware(idf_path)
else:
step_header(3, "Locate Existing Binaries")
binaries = locate_binaries()
# ── Step 4: Crypto ──
step_header(4, "Master Key")
master_key = generate_master_key(cfg)
# ── Step 5: NVS ──
step_header(5, "Generate Factory NVS Partition")
nvs_bin = generate_nvs_binary(master_key, tmpdir)
# ── Step 6: Flash ──
if flash:
step_header(6, "Flash Device")
if provision_only:
flash_fctry_only(cfg.port, nvs_bin)
else:
flash_all(cfg, binaries, nvs_bin, erase=erase)
# ── Step 7: Keystore ──
step_header(7, "Update C2 Keystore")
update_keystore(cfg.device_id, master_key, ks_path)
# ── Done ──
print(f"\n {C.GRN}{C.B}{'=' * 50}")
print(f" Deploy complete for {cfg.device_id}")
print(f" {'=' * 50}{C.RST}\n")
# ── Optional monitor ──
if monitor and flash:
step_header("*", "Serial Monitor")
monitor_serial(cfg.port, idf_path)
# ══════════════════════════════════════════════════════════════════════════════
# CLI / Entry Point
# ══════════════════════════════════════════════════════════════════════════════
def config_from_dict(d: dict, defaults: Optional[dict] = None) -> DeviceConfig:
"""Create DeviceConfig from a JSON dict with optional defaults.
Supports both flat format (legacy) and nested format:
Flat: {"device_id": "x", "srv_ip": "...", "module_network": true, ...}
Nested: {"device_id": "x", "server": {"ip": "..."}, "modules": {"network": true}, ...}
When defaults is provided, device-level keys override defaults.
"""
dfl = defaults or {}
# Merge nested sections: defaults <- device overrides
def merged(section: str) -> dict:
base = dict(dfl.get(section, {}))
base.update(d.get(section, {}))
return base
server = merged("server")
network = merged("network")
modules = merged("modules")
ota = merged("ota")
return DeviceConfig(
device_id=d.get("device_id", secrets.token_hex(4)),
port=d.get("port", dfl.get("port", "")),
hostname=d.get("hostname", dfl.get("hostname", "")),
# Server
srv_ip=server.get("ip", d.get("srv_ip", "192.168.1.100")),
srv_port=server.get("port", d.get("srv_port", 2626)),
# Network
network_mode=network.get("mode", d.get("network_mode", "wifi")),
wifi_ssid=network.get("wifi_ssid", d.get("wifi_ssid", "")),
wifi_pass=network.get("wifi_pass", d.get("wifi_pass", "")),
gprs_apn=network.get("gprs_apn", d.get("gprs_apn", "sl2sfr")),
# Modules
mod_network=modules.get("network", d.get("module_network", True)),
mod_recon=modules.get("recon", d.get("module_recon", False)),
mod_fakeap=modules.get("fakeap", d.get("module_fakeap", False)),
mod_honeypot=modules.get("honeypot", d.get("module_honeypot", False)),
mod_canbus=modules.get("canbus", d.get("module_canbus", False)),
mod_fallback=modules.get("fallback", d.get("module_fallback", False)),
recon_camera=modules.get("recon_camera", d.get("recon_camera", False)),
recon_ble_trilat=modules.get("recon_ble_trilat", d.get("recon_ble_trilat", False)),
# OTA
ota_enabled=ota.get("enabled", d.get("ota_enabled", True)),
ota_allow_http=ota.get("allow_http", d.get("ota_allow_http", False)),
# Security
master_key_hex=d.get("master_key", dfl.get("master_key", "")),
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Epsilon Deploy - Unified build, provision & flash",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive wizard (recommended for first use)
python deploy.py
# Full deploy with WiFi
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Full deploy with GPRS
python deploy.py -p /dev/ttyUSB0 -d abc12345 \\
--gprs sl2sfr --srv 203.0.113.10
# Provision only (generate key + flash NVS, no rebuild)
python deploy.py --provision-only -p /dev/ttyUSB0 -d abc12345
# Flash existing build (skip rebuild)
python deploy.py --flash-only -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Clean install (erase flash first)
python deploy.py --erase -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Deploy + watch serial output
python deploy.py --monitor -p /dev/ttyUSB0 -d abc12345 \\
--wifi MySSID MyPass --srv 192.168.1.100
# Batch deploy from config file
python deploy.py --config deploy.json
""",
)
# Device
parser.add_argument("-d", "--device-id",
help="Device ID (default: random 8 hex)")
parser.add_argument("-p", "--port",
help="Serial port (e.g. /dev/ttyUSB0)")
# Network (mutually exclusive)
net = parser.add_mutually_exclusive_group()
net.add_argument("--wifi", nargs=2, metavar=("SSID", "PASS"),
help="WiFi mode with SSID and password")
net.add_argument("--gprs", metavar="APN", nargs="?", const="sl2sfr",
help="GPRS mode (default APN: sl2sfr)")
# Server
parser.add_argument("--srv", metavar="IP", help="C2 server IP")
parser.add_argument("--srv-port", type=int, default=2626)
# Modules
parser.add_argument("--mod-fakeap", action="store_true", default=False,
help="Enable fakeAP module")
parser.add_argument("--mod-honeypot", action="store_true", default=False,
help="Enable honeypot module (TCP services, WiFi/net monitors)")
parser.add_argument("--mod-recon", action="store_true", default=False,
help="Enable recon module")
parser.add_argument("--recon-camera", action="store_true", default=False,
help="Enable camera")
parser.add_argument("--recon-ble", action="store_true", default=False,
help="Enable BLE trilateration")
parser.add_argument("--mod-fallback", action="store_true", default=False,
help="Enable fallback module (autonomous WiFi recovery)")
parser.add_argument("--mod-redteam", action="store_true", default=False,
help="Enable red team module (offensive operations)")
parser.add_argument("--no-network", action="store_true", default=False,
help="Disable network module")
parser.add_argument("--no-ota", action="store_true", default=False,
help="Disable OTA module")
parser.add_argument("--ota-http", action="store_true", default=False,
help="Allow OTA over plain HTTP (insecure, local networks)")
# Security
parser.add_argument("--key", metavar="HEX",
help="Master key as 64 hex chars (default: random)")
parser.add_argument("--keystore", type=Path, default=None,
help=f"C2 keystore path (default: {KEYSTORE_DEFAULT})")
# Operating modes
parser.add_argument("--provision-only", action="store_true",
help="Only generate key and flash factory NVS")
parser.add_argument("--flash-only", action="store_true",
help="Flash existing build (skip rebuild)")
parser.add_argument("--build-only", action="store_true",
help="Build only (no flash, no provision)")
parser.add_argument("--erase", action="store_true",
help="Erase entire flash before writing")
parser.add_argument("--monitor", action="store_true",
help="Start serial monitor after flash")
parser.add_argument("--hostname",
help="Device hostname (default: random)")
# Config file
parser.add_argument("--config", type=Path,
help="Load device configs from JSON file")
args = parser.parse_args()
banner()
# ── Config file mode ──
if args.config:
with open(args.config) as f:
data = json.load(f)
defaults = data.get("defaults", {})
devices = data.get("devices", [data])
total = len(devices)
ok_count = 0
for i, d in enumerate(devices, 1):
print(f"\n {C.B}### Device {i}/{total}: {d.get('device_id', '?')} ###{C.RST}")
cfg = config_from_dict(d, defaults)
try:
deploy_one(
cfg,
build=not args.flash_only,
flash=not args.build_only,
provision_only=args.provision_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
ok_count += 1
except SystemExit:
err(f"Failed for {cfg.device_id}, continuing...")
print(f"\n {C.B}Results: {ok_count}/{total} succeeded{C.RST}")
return 0 if ok_count == total else 1
# ── Interactive mode (no args) ──
if not args.device_id and not args.port and not args.provision_only:
cfg = interactive_wizard()
print()
if not ask_yn("Proceed with deployment?", True):
print(" Aborted.")
return 1
deploy_one(
cfg,
build=not args.flash_only,
flash=not args.build_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
return 0
# ── CLI mode ──
if not args.port:
err("--port / -p is required in CLI mode")
return 1
cfg = DeviceConfig(
device_id=args.device_id or secrets.token_hex(4),
port=args.port,
srv_ip=args.srv or "192.168.1.100",
srv_port=args.srv_port,
network_mode="gprs" if args.gprs else "wifi",
wifi_ssid=args.wifi[0] if args.wifi else "",
wifi_pass=args.wifi[1] if args.wifi else "",
gprs_apn=args.gprs if args.gprs else "sl2sfr",
hostname=args.hostname or "",
mod_network=not args.no_network,
mod_fakeap=args.mod_fakeap,
mod_honeypot=args.mod_honeypot,
mod_recon=args.mod_recon,
mod_fallback=args.mod_fallback,
mod_redteam=args.mod_redteam,
recon_camera=args.recon_camera,
recon_ble_trilat=args.recon_ble,
ota_enabled=not args.no_ota,
ota_allow_http=args.ota_http,
master_key_hex=args.key or "",
)
# Validate WiFi (unless provision-only which doesn't need it)
if (cfg.network_mode == "wifi"
and not args.provision_only
and not args.build_only
and (not cfg.wifi_ssid or not cfg.wifi_pass)):
err("WiFi mode requires --wifi SSID PASS")
return 1
deploy_one(
cfg,
build=not args.flash_only and not args.provision_only,
flash=not args.build_only,
provision_only=args.provision_only,
erase=args.erase,
monitor=args.monitor,
keystore_path=str(args.keystore) if args.keystore else "",
)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,89 @@
{
"defaults": {
"server": {
"ip": "192.168.1.100",
"port": 2626
},
"network": {
"mode": "wifi",
"wifi_ssid": "YOUR_SSID",
"wifi_pass": "YOUR_PASSWORD",
"gprs_apn": "sl2sfr"
},
"modules": {
"network": true,
"fakeap": false,
"honeypot": false,
"recon": false,
"canbus": false,
"recon_camera": false,
"recon_ble_trilat": false
},
"ota": {
"enabled": true,
"allow_http": false
}
},
"devices": [
{
"_comment": "Bot standard — commandes réseau uniquement",
"device_id": "aaaa0001",
"port": "/dev/ttyUSB0",
"hostname": "esp32-bot-01"
},
{
"_comment": "Honeypot — services TCP + monitoring WiFi/réseau",
"device_id": "bbbb0001",
"port": "/dev/ttyUSB1",
"hostname": "esp32-honeypot-01",
"modules": {
"network": true,
"honeypot": true
}
},
{
"_comment": "FakeAP — portail captif + sniffer WiFi",
"device_id": "cccc0001",
"port": "/dev/ttyUSB2",
"hostname": "esp32-fakeap-01",
"modules": {
"network": true,
"fakeap": true
}
},
{
"_comment": "Recon caméra — reconnaissance visuelle",
"device_id": "dddd0001",
"port": "/dev/ttyUSB3",
"hostname": "esp32-cam-01",
"modules": {
"network": false,
"recon": true,
"recon_camera": true
}
},
{
"_comment": "Bot GPRS — déployé hors réseau WiFi, clé pré-provisionnée",
"device_id": "eeee0001",
"port": "/dev/ttyUSB4",
"hostname": "esp32-gprs-01",
"master_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
"network": {
"mode": "gprs",
"gprs_apn": "internet"
},
"server": {
"ip": "203.0.113.10"
},
"ota": {
"enabled": true,
"allow_http": true
}
}
]
}

3
tools/espmon/__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""espmon - ESP32 serial monitor and C2 command interface for Espilon."""
__version__ = "1.0.0"

7
tools/espmon/__main__.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env python3
"""Entry point for: python -m espmon"""
import sys
from espmon.cli import main
sys.exit(main())

346
tools/espmon/cli.py Normal file
View File

@ -0,0 +1,346 @@
"""CLI parser and subcommand handlers."""
import argparse
import os
import time
import sys
import requests
from espmon import __version__
from espmon.colors import C, ok, info, err, warn
from espmon.config import Config, DEFAULT_BAUD, POLL_INTERVAL, POLL_TIMEOUT
from espmon.client import C3POClient
from espmon.logs import LogManager
from espmon.monitor import SerialMonitor
from espmon.daemon import daemonize, stop_daemon
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _format_duration(seconds: float) -> str:
"""Format seconds into a human-readable duration."""
s = int(seconds)
if s < 60:
return f"{s}s"
if s < 3600:
return f"{s // 60}m{s % 60:02d}s"
h = s // 3600
m = (s % 3600) // 60
return f"{h}h{m:02d}m"
def _format_size(size: int) -> str:
"""Format bytes into human-readable size."""
if size > 1024 * 1024:
return f"{size / (1024 * 1024):.1f} MB"
if size > 1024:
return f"{size / 1024:.1f} KB"
return f"{size} B"
def _detect_ports() -> list:
"""Detect available serial ports."""
from glob import glob
return sorted(glob("/dev/ttyUSB*") + glob("/dev/ttyACM*"))
# ─── Subcommand: monitor ────────────────────────────────────────────────────
def cmd_monitor(args) -> int:
"""Monitor a serial port, logging output to file."""
port = args.port
if not os.path.exists(port):
err(f"Port {port} not found")
available = _detect_ports()
if available:
info(f"Available ports: {', '.join(available)}")
return 1
LogManager.ensure_dirs()
if args.bg:
return daemonize(port, args.baud)
# Foreground mode
log_path = LogManager.session_path(port)
monitor = SerialMonitor(port, args.baud, log_path)
return monitor.run()
# ─── Subcommand: tail ────────────────────────────────────────────────────────
def cmd_tail(args) -> int:
"""Tail the most recent log file."""
log_path = LogManager.latest_log(port=args.port, device=args.device)
if not log_path:
err("No log files found")
info("Start monitoring first: espmon monitor /dev/ttyUSB0")
return 1
info(f"{log_path}")
with open(log_path, 'r') as f:
lines = f.readlines()
n = args.lines
for line in lines[-n:]:
print(line, end='')
if args.follow:
# Follow mode: keep watching for new lines
try:
with open(log_path, 'r') as f:
f.seek(0, 2) # seek to end
while True:
line = f.readline()
if line:
print(line, end='', flush=True)
else:
time.sleep(0.2)
except KeyboardInterrupt:
pass
return 0
# ─── Subcommand: cmd ─────────────────────────────────────────────────────────
def cmd_cmd(args) -> int:
"""Send a command to a device via C3PO API and wait for response."""
config = Config()
client = C3POClient(config)
try:
result = client.send_command(args.device_id, args.command, args.argv)
except requests.exceptions.ConnectionError:
err(f"Cannot connect to C3PO at {config.base_url}")
err("Is C3PO running? Start with: cd tools/C3PO && python c3po.py --headless")
return 1
except requests.exceptions.HTTPError as e:
err(f"API error: {e.response.status_code} {e.response.text}")
return 1
results = result.get("results", [])
if not results:
err("No results returned")
return 1
# Handle multiple devices (broadcast)
pending = []
for r in results:
if r.get("status") == "ok":
pending.append((r["device_id"], r["request_id"]))
info(f"Sent to {r['device_id']} (req: {r['request_id'][:20]}...)")
else:
err(f"{r.get('device_id', '?')}: {r.get('message', 'error')}")
if not pending:
return 1
# Poll for results
seen = {req_id: 0 for _, req_id in pending}
start = time.time()
completed = set()
timeout = args.timeout
while time.time() - start < timeout and len(completed) < len(pending):
for device_id, req_id in pending:
if req_id in completed:
continue
try:
status = client.poll_command(req_id)
except requests.exceptions.RequestException:
time.sleep(POLL_INTERVAL)
continue
output = status.get("output", [])
prefix = f"[{device_id}] " if len(pending) > 1 else ""
for line in output[seen[req_id]:]:
print(f"{prefix}{line}")
seen[req_id] = len(output)
if status.get("status") == "completed":
completed.add(req_id)
if len(completed) < len(pending):
time.sleep(POLL_INTERVAL)
if len(completed) == len(pending):
ok(f"Command completed ({len(completed)} device(s))")
return 0
warn(f"Timed out after {timeout}s ({len(completed)}/{len(pending)} completed)")
return 1
# ─── Subcommand: status ──────────────────────────────────────────────────────
def cmd_status(args) -> int:
"""List connected devices from C3PO."""
config = Config()
client = C3POClient(config)
try:
data = client.list_devices()
except requests.exceptions.ConnectionError:
err(f"Cannot connect to C3PO at {config.base_url}")
return 1
except requests.exceptions.HTTPError as e:
err(f"API error: {e.response.status_code}")
return 1
devices = data.get("devices", [])
if not devices:
info("No devices connected")
return 0
print(f"\n {'ID':<16} {'IP':<16} {'Status':<12} {'Chip':<10} "
f"{'Modules':<30} {'Uptime':<10}")
print(f" {'-'*16} {'-'*16} {'-'*12} {'-'*10} {'-'*30} {'-'*10}")
for d in devices:
status_color = C.GRN if d.get("status") == "Connected" else C.YLW
modules = d.get("modules", "") or "-"
uptime = _format_duration(d.get("connected_for_seconds", 0))
print(f" {d['id']:<16} "
f"{d.get('ip', '?'):<16} "
f"{status_color}{d.get('status', '?'):<12}{C.RST} "
f"{d.get('chip', '?'):<10} "
f"{modules:<30} "
f"{uptime:<10}")
print(f"\n {len(devices)} device(s)\n")
return 0
# ─── Subcommand: logs ────────────────────────────────────────────────────────
def cmd_logs(args) -> int:
"""List all log files."""
entries = LogManager.list_all()
if not entries:
info("No log files found")
return 0
print(f"\n {'File':<40} {'Size':<10} {'Last Modified'}")
print(f" {'-'*40} {'-'*10} {'-'*20}")
total_size = 0
for e in entries:
total_size += e["size"]
mtime_str = e["mtime"].strftime("%Y-%m-%d %H:%M")
print(f" {str(e['relative']):<40} "
f"{_format_size(e['size']):<10} "
f"{mtime_str}")
print(f"\n {len(entries)} file(s), {_format_size(total_size)} total\n")
return 0
# ─── Subcommand: stop ────────────────────────────────────────────────────────
def cmd_stop(args) -> int:
"""Stop the background monitor daemon."""
return stop_daemon()
# ─── Parser ──────────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="espmon",
description="ESP32 serial monitor and C2 command interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
espmon monitor /dev/ttyUSB0 Monitor serial (foreground)
espmon monitor /dev/ttyUSB0 --bg Monitor as background daemon
espmon tail Tail most recent log
espmon tail -f -n 100 Follow mode, last 100 lines
espmon cmd 03d03f48 system_info Send command to device
espmon cmd all system_mem Broadcast to all devices
espmon status List connected devices
espmon logs List log files
espmon stop Stop background monitor
""",
)
parser.add_argument("--version", action="version",
version=f"espmon {__version__}")
sub = parser.add_subparsers(dest="subcommand")
# monitor
p_mon = sub.add_parser("monitor", aliases=["mon", "m"],
help="Monitor ESP32 serial output")
p_mon.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0)")
p_mon.add_argument("--baud", "-b", type=int, default=DEFAULT_BAUD,
help=f"Baud rate (default: {DEFAULT_BAUD})")
p_mon.add_argument("--bg", action="store_true",
help="Run as background daemon")
p_mon.set_defaults(func=cmd_monitor)
# tail
p_tail = sub.add_parser("tail", aliases=["t"],
help="Tail a log file")
p_tail.add_argument("--port", "-p", help="Filter by serial port")
p_tail.add_argument("--device", "-d", help="Filter by device ID")
p_tail.add_argument("--lines", "-n", type=int, default=50,
help="Number of lines (default: 50)")
p_tail.add_argument("--follow", "-f", action="store_true",
help="Follow mode (keep watching)")
p_tail.set_defaults(func=cmd_tail)
# cmd
p_cmd = sub.add_parser("cmd", aliases=["c"],
help="Send command via C3PO API")
p_cmd.add_argument("device_id",
help="Target device ID (or 'all' for broadcast)")
p_cmd.add_argument("command", help="Command name")
p_cmd.add_argument("argv", nargs="*", default=[],
help="Command arguments")
p_cmd.add_argument("--timeout", "-t", type=int, default=POLL_TIMEOUT,
help=f"Poll timeout seconds (default: {POLL_TIMEOUT})")
p_cmd.set_defaults(func=cmd_cmd)
# status
p_status = sub.add_parser("status", aliases=["s"],
help="List connected devices")
p_status.set_defaults(func=cmd_status)
# logs
p_logs = sub.add_parser("logs", aliases=["l"],
help="List log files")
p_logs.set_defaults(func=cmd_logs)
# stop
p_stop = sub.add_parser("stop", help="Stop background monitor")
p_stop.set_defaults(func=cmd_stop)
return parser
# ─── Entry Point ──────────────────────────────────────────────────────────────
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if not args.subcommand:
parser.print_help()
return 0
try:
return args.func(args)
except KeyboardInterrupt:
print()
return 130
except Exception as e:
err(f"Unexpected error: {e}")
return 1

78
tools/espmon/client.py Normal file
View File

@ -0,0 +1,78 @@
"""C3PO REST API client."""
import requests
from espmon.config import Config
class C3POClient:
"""HTTP client for the C3PO command & control API."""
def __init__(self, config: Config = None):
cfg = config or Config()
self.base_url = cfg.base_url
self.headers = {"Content-Type": "application/json"}
if cfg.token:
self.headers["Authorization"] = f"Bearer {cfg.token}"
def send_command(self, device_id: str, command: str, argv: list = None) -> dict:
"""Send a command to a device.
POST /api/commands
Returns: {"results": [{"device_id", "status", "request_id"}]}
"""
body = {
"device_ids": [device_id] if device_id != "all" else "all",
"command": command,
"argv": argv or [],
}
resp = requests.post(
f"{self.base_url}/api/commands",
json=body,
headers=self.headers,
timeout=10,
)
resp.raise_for_status()
return resp.json()
def poll_command(self, request_id: str) -> dict:
"""Poll a command's result.
GET /api/commands/<request_id>
Returns: {"status": "pending|completed", "output": [...]}
"""
resp = requests.get(
f"{self.base_url}/api/commands/{request_id}",
headers=self.headers,
timeout=10,
)
resp.raise_for_status()
return resp.json()
def list_devices(self) -> dict:
"""List connected devices.
GET /api/devices
Returns: {"devices": [...], "count": N}
"""
resp = requests.get(
f"{self.base_url}/api/devices",
headers=self.headers,
timeout=10,
)
resp.raise_for_status()
return resp.json()
def list_commands(self) -> dict:
"""List recent commands.
GET /api/commands
Returns: {"commands": [...]}
"""
resp = requests.get(
f"{self.base_url}/api/commands",
headers=self.headers,
timeout=10,
)
resp.raise_for_status()
return resp.json()

22
tools/espmon/colors.py Normal file
View File

@ -0,0 +1,22 @@
"""Terminal color helpers (same style as deploy.py)."""
class C:
RST = "\033[0m"
B = "\033[1m"
DIM = "\033[2m"
RED = "\033[91m"
GRN = "\033[92m"
YLW = "\033[93m"
BLU = "\033[94m"
CYN = "\033[96m"
def _tag(color: str, tag: str, msg: str):
print(f" {color}{C.B}[{tag}]{C.RST} {msg}")
def ok(m): _tag(C.GRN, " OK ", m)
def info(m): _tag(C.BLU, " .. ", m)
def warn(m): _tag(C.YLW, " !! ", m)
def err(m): _tag(C.RED, " XX ", m)

84
tools/espmon/config.py Normal file
View File

@ -0,0 +1,84 @@
"""Configuration, paths, and constants for espmon."""
import os
import re
from pathlib import Path
# Paths
SCRIPT_DIR = Path(__file__).resolve().parent # tools/espmon/
TOOLS_DIR = SCRIPT_DIR.parent # tools/
LOGS_DIR = TOOLS_DIR / "logs"
C3PO_DIR = TOOLS_DIR / "C3PO"
ENV_FILE = C3PO_DIR / ".env"
DEPLOY_JSON = TOOLS_DIR / "deploy.json"
PID_FILE = LOGS_DIR / ".espmon.pid"
# Serial
DEFAULT_BAUD = 115200
# API polling
POLL_INTERVAL = 0.5 # seconds between polls
POLL_TIMEOUT = 120 # max wait for command result
# ANSI escape codes
ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\(B')
def strip_ansi(text: str) -> str:
"""Remove ANSI escape codes from text."""
return ANSI_RE.sub('', text)
class Config:
"""C3PO connection configuration.
Reads from (in order of priority):
1. Environment variables: ESPMON_HOST, ESPMON_PORT, ESPMON_TOKEN
2. tools/C3PO/.env file
3. Hardcoded defaults
"""
def __init__(self):
self.host = "localhost"
self.port = 8000
self.token = ""
self._load()
def _load(self):
env_vars = {}
if ENV_FILE.exists():
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, _, value = line.partition('=')
env_vars[key.strip()] = value.strip().strip('"').strip("'")
# Host
self.host = (
os.environ.get("ESPMON_HOST")
or env_vars.get("WEB_HOST", "").replace("0.0.0.0", "localhost")
or "localhost"
)
# Port
try:
self.port = int(
os.environ.get("ESPMON_PORT")
or env_vars.get("WEB_PORT", "8000")
)
except ValueError:
self.port = 8000
# Auth token
self.token = (
os.environ.get("ESPMON_TOKEN")
or env_vars.get("MULTILAT_AUTH_TOKEN", "")
)
@property
def base_url(self) -> str:
return f"http://{self.host}:{self.port}"

99
tools/espmon/daemon.py Normal file
View File

@ -0,0 +1,99 @@
"""Background daemon management — double-fork, PID file, stop."""
import os
import signal
import time
from espmon.colors import ok, info, err, warn
from espmon.config import PID_FILE, LOGS_DIR
from espmon.logs import LogManager
from espmon.monitor import SerialMonitor
def daemonize(port: str, baud: int) -> int:
"""Start the serial monitor as a background daemon.
Uses the Unix double-fork pattern to fully detach from the terminal.
Returns 0 to the parent process, never returns in the daemon.
"""
log_path = LogManager.session_path(port)
pid = os.fork()
if pid > 0:
# Parent: wait for first child
os.waitpid(pid, 0)
ok(f"Monitor started in background")
ok(f"Logging to {log_path}")
info(f"PID file: {PID_FILE}")
info(f"Stop with: python -m espmon stop")
return 0
# First child: new session
os.setsid()
pid2 = os.fork()
if pid2 > 0:
os._exit(0)
# Second child (daemon process)
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
os.close(devnull)
# Write PID file
LOGS_DIR.mkdir(parents=True, exist_ok=True)
PID_FILE.write_text(str(os.getpid()))
# Run monitor with signal handler for clean shutdown
monitor = SerialMonitor(port, baud, log_path)
signal.signal(signal.SIGTERM, lambda s, f: monitor.stop())
signal.signal(signal.SIGINT, lambda s, f: monitor.stop())
try:
monitor.run()
finally:
PID_FILE.unlink(missing_ok=True)
os._exit(0)
def stop_daemon() -> int:
"""Stop a running background monitor daemon."""
if not PID_FILE.exists():
err("No background monitor running (no PID file)")
return 1
try:
pid = int(PID_FILE.read_text().strip())
except (ValueError, OSError):
err(f"Invalid PID file: {PID_FILE}")
PID_FILE.unlink(missing_ok=True)
return 1
try:
os.kill(pid, signal.SIGTERM)
info(f"Sent SIGTERM to PID {pid}")
# Wait for process to exit (max 5 seconds)
for _ in range(50):
try:
os.kill(pid, 0)
time.sleep(0.1)
except OSError:
ok("Background monitor stopped")
PID_FILE.unlink(missing_ok=True)
return 0
warn(f"Process {pid} did not exit, sending SIGKILL")
os.kill(pid, signal.SIGKILL)
PID_FILE.unlink(missing_ok=True)
return 0
except ProcessLookupError:
info(f"Process {pid} already gone")
PID_FILE.unlink(missing_ok=True)
return 0
except PermissionError:
err(f"Permission denied to kill PID {pid}")
return 1

103
tools/espmon/logs.py Normal file
View File

@ -0,0 +1,103 @@
"""Log file management — session paths, listing, tail."""
import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from espmon.config import LOGS_DIR, DEPLOY_JSON
class LogManager:
"""Manage serial monitor log files in tools/logs/."""
@staticmethod
def ensure_dirs():
"""Create the logs root directory if needed."""
LOGS_DIR.mkdir(exist_ok=True)
@staticmethod
def session_path(port: str) -> Path:
"""Create a new log file path for a monitor session.
Format: tools/logs/<port_name>/YYYY-MM-DD_HHhMM.log
"""
port_name = port.replace("/dev/", "").replace("/", "_")
port_dir = LOGS_DIR / port_name
port_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%Hh%M")
path = port_dir / f"{timestamp}.log"
# Avoid collision if multiple sessions in same minute
if path.exists():
secs = datetime.now().strftime("%S")
path = port_dir / f"{timestamp}_{secs}.log"
return path
@staticmethod
def latest_log(port: str = None, device: str = None) -> Optional[Path]:
"""Find the most recent log file.
Args:
port: Filter by serial port (e.g. /dev/ttyUSB0)
device: Filter by device ID (looks up port in deploy.json)
"""
if device:
mapped = LogManager._device_to_port(device)
if mapped:
port = mapped
if port:
port_name = port.replace("/dev/", "").replace("/", "_")
port_dir = LOGS_DIR / port_name
if not port_dir.exists():
return None
logs = sorted(port_dir.glob("*.log"), key=lambda p: p.stat().st_mtime)
return logs[-1] if logs else None
# No filter: most recent across all ports
if not LOGS_DIR.exists():
return None
all_logs = sorted(LOGS_DIR.rglob("*.log"), key=lambda p: p.stat().st_mtime)
return all_logs[-1] if all_logs else None
@staticmethod
def list_all() -> list:
"""List all log files with metadata.
Returns list of dicts with: path, relative, size, mtime
"""
if not LOGS_DIR.exists():
return []
entries = []
for log_file in sorted(
LOGS_DIR.rglob("*.log"),
key=lambda p: p.stat().st_mtime,
reverse=True,
):
stat = log_file.stat()
entries.append({
"path": log_file,
"relative": log_file.relative_to(LOGS_DIR),
"size": stat.st_size,
"mtime": datetime.fromtimestamp(stat.st_mtime),
})
return entries
@staticmethod
def _device_to_port(device_id: str) -> Optional[str]:
"""Look up serial port for a device ID from deploy.json."""
if not DEPLOY_JSON.exists():
return None
try:
with open(DEPLOY_JSON) as f:
data = json.load(f)
for dev in data.get("devices", []):
if dev.get("device_id") == device_id:
return dev.get("port")
except (json.JSONDecodeError, KeyError, OSError):
pass
return None

103
tools/espmon/monitor.py Normal file
View File

@ -0,0 +1,103 @@
"""Serial port monitor — read, display, and log ESP32 output."""
import threading
import time
from datetime import datetime
from pathlib import Path
from espmon.colors import info, err
from espmon.config import DEFAULT_BAUD, strip_ansi
class SerialMonitor:
"""Read a serial port, print to stdout with colors, log to file without ANSI."""
def __init__(self, port: str, baud: int = DEFAULT_BAUD, log_path: Path = None):
self.port = port
self.baud = baud
self.log_path = log_path
self._stop = threading.Event()
self._log_file = None
def run(self) -> int:
"""Run the monitor (blocking). Returns exit code."""
import serial
try:
# Open without toggling DTR/RTS to avoid resetting the ESP32
ser = serial.Serial()
ser.port = self.port
ser.baudrate = self.baud
ser.timeout = 0.5
ser.dtr = False
ser.rts = False
ser.open()
except serial.SerialException as e:
err(f"Cannot open {self.port}: {e}")
return 1
if self.log_path:
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._log_file = open(self.log_path, 'a', encoding='utf-8')
info(f"Logging to {self.log_path}")
info(f"Monitoring {self.port} @ {self.baud} baud (Ctrl+C to stop)")
if self._log_file:
self._log_file.write(
f"# espmon session started {datetime.now().isoformat()}\n"
)
self._log_file.write(
f"# port: {self.port} baud: {self.baud}\n"
)
self._log_file.flush()
buf = b""
try:
while not self._stop.is_set():
try:
chunk = ser.read(256)
except serial.SerialException:
if self._stop.is_set():
break
err(f"Serial read error on {self.port}")
time.sleep(0.5)
continue
if not chunk:
continue
buf += chunk
while b"\n" in buf:
line_bytes, buf = buf.split(b"\n", 1)
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r")
# stdout with original ANSI colors
print(line, flush=True)
# log file: timestamped, ANSI stripped
if self._log_file:
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
clean = strip_ansi(line)
self._log_file.write(f"[{ts}] {clean}\n")
self._log_file.flush()
except KeyboardInterrupt:
pass
finally:
try:
ser.close()
except Exception:
pass
if self._log_file:
self._log_file.write(
f"# espmon session ended {datetime.now().isoformat()}\n"
)
self._log_file.close()
info("Monitor stopped")
return 0
def stop(self):
"""Signal the monitor to stop."""
self._stop.set()

View File

@ -1,470 +0,0 @@
# Espilon Multi-Device Flasher
Automated tool for building and flashing multiple ESP32 devices with custom configurations for the Espilon project.
## Features
- Build firmware with custom configurations per device
- Support for WiFi and GPRS modes
- Configurable modules (Network, Recon, FakeAP)
- Multi-device batch processing
- Individual device manual configuration
- Automatic hostname generation
- Build-only and flash-only modes
## Prerequisites
- Python 3.8+
- ESP-IDF v5.3.2 (properly configured with `export.sh`)
- esptool.py (usually included with ESP-IDF)
- ESP32 development boards connected via USB
## Installation
No installation required. The script is standalone.
```bash
cd tools/flasher
chmod +x flash.py
```
## Configuration File
### Structure
The `devices.json` file contains:
- **project**: Path to the `espilon_bot` directory
- **devices**: Array of device configurations
### Example: devices.json
```json
{
"project": "/home/user/epsilon/espilon_bot",
"devices": [
## WiFi AGENT ##
{
"device_id": "ce4f626b",
"port": "/dev/ttyUSB0",
"srv_ip": "192.168.1.13",
"srv_port": 2626,
"network_mode": "wifi",
"wifi_ssid": "MyWiFi",
"wifi_pass": "MyPassword123",
"hostname": "pixel-8-pro",
"module_network": true,
"module_recon": false,
"module_fakeap": false,
"recon_camera": false,
"recon_ble_trilat": false,
"crypto_key": "testde32chars00000000000000000000",
"crypto_nonce": "noncenonceno"
},
## GPRS AGENT ##
{
"device_id": "a91dd021",
"port": "/dev/ttyUSB1",
"srv_ip": "203.0.113.10",
"srv_port": 2626,
"network_mode": "gprs",
"gprs_apn": "sl2sfr",
"hostname": "galaxy-s24-ultra",
"module_network": true,
"module_recon": false,
"module_fakeap": false
}
]
}
```
### Device Configuration Fields
| Field | Required | Default | Description |
|-------|----------|---------|-------------|
| `device_id` | Yes | - | Unique device identifier (8 hex chars) |
| `port` | Yes | - | Serial port (e.g., `/dev/ttyUSB0`, `COM3`) |
| `srv_ip` | Yes | - | C2 server IP address |
| `srv_port` | No | 2626 | C2 server port |
| `network_mode` | No | "wifi" | Network mode: `"wifi"` or `"gprs"` |
| `wifi_ssid` | WiFi only | - | WiFi network SSID |
| `wifi_pass` | WiFi only | - | WiFi network password |
| `gprs_apn` | GPRS only | "sl2sfr" | GPRS APN |
| `hostname` | No | Random | Device hostname for network identification |
| `module_network` | No | true | Enable network commands module |
| `module_recon` | No | false | Enable reconnaissance module |
| `module_fakeap` | No | false | Enable fake AP module |
| `recon_camera` | No | false | Enable camera reconnaissance (ESP32-CAM) |
| `recon_ble_trilat` | No | false | Enable BLE trilateration |
| `crypto_key` | No | Test key | ChaCha20 encryption key (32 chars) |
| `crypto_nonce` | No | Test nonce | ChaCha20 nonce (12 chars) |
## Usage
### 1. Flash All Devices from Config
```bash
python3 flash.py --config devices.json
```
This will:
1. Read device configurations from `devices.json`
2. Build firmware for each device
3. Flash each device sequentially
4. Display summary report
### 2. Manual Single Device (WiFi)
```bash
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id abc12345 \
--port /dev/ttyUSB0 \
--srv-ip 192.168.1.100 \
--wifi-ssid MyWiFi \
--wifi-pass MyPassword123
```
### 3. Manual Single Device (GPRS)
```bash
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id def67890 \
--port /dev/ttyUSB1 \
--srv-ip 203.0.113.10 \
--network-mode gprs \
--gprs-apn sl2sfr
```
### 4. Build Only (No Flash)
Useful for generating firmware files without flashing:
```bash
python3 flash.py --config devices.json --build-only
```
Firmware files are saved to: `espilon_bot/firmware/<device_id>.bin`
### 5. Flash Only (Skip Build)
Flash pre-built firmware:
```bash
python3 flash.py --config devices.json --flash-only
```
Requires firmware files in `espilon_bot/firmware/` directory.
### 6. Enable Modules
```bash
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id xyz98765 \
--port /dev/ttyUSB0 \
--srv-ip 192.168.1.100 \
--wifi-ssid MyWiFi \
--wifi-pass MyPassword123 \
--enable-recon \
--enable-fakeap \
--enable-ble-trilat
```
### 7. Custom Encryption Keys
```bash
python3 flash.py --manual \
--project /home/user/epsilon/espilon_bot \
--device-id secure01 \
--port /dev/ttyUSB0 \
--srv-ip 192.168.1.100 \
--wifi-ssid MyWiFi \
--wifi-pass MyPassword123 \
--crypto-key "your32charencryptionkeyhere!!" \
--crypto-nonce "yournonce12b"
```
## Advanced Features
### Hostname Generation
If `hostname` is not specified, the script generates a realistic device name:
- iPhone models (iPhone-15-pro-max, etc.)
- Android devices (galaxy-s24-ultra, pixel-8-pro, etc.)
- Windows PCs (DESKTOP-XXXXXXX)
This helps devices blend in on networks during authorized testing.
### Configuration Backup
Before building, the script automatically backs up the existing `sdkconfig.defaults` to `sdkconfig.defaults.bak`.
### Firmware Storage
Built firmware is saved in:
```
espilon_bot/firmware/<device_id>.bin
```
This allows:
- Reuse without rebuilding (--flash-only)
- Firmware version archival
- Quick reflashing of multiple devices
## Workflow Examples
### Scenario 1: Initial Setup (3 devices)
1. Edit `devices.json`:
```json
{
"project": "/home/user/epsilon/espilon_bot",
"devices": [
{"device_id": "dev00001", "port": "/dev/ttyUSB0", ...},
{"device_id": "dev00002", "port": "/dev/ttyUSB1", ...},
{"device_id": "dev00003", "port": "/dev/ttyUSB2", ...}
]
}
```
2. Flash all:
```bash
python3 flash.py --config devices.json
```
3. Devices are ready for deployment
### Scenario 2: Update Single Device
1. Modify configuration in `devices.json`
2. Flash only that device:
```bash
# Remove other devices from JSON or use manual mode
python3 flash.py --manual --device-id dev00002 --port /dev/ttyUSB1 ...
```
### Scenario 3: Quick Reflash
```bash
# Build once
python3 flash.py --config devices.json --build-only
# Flash multiple times (testing, replacement devices)
python3 flash.py --config devices.json --flash-only
```
### Scenario 4: WiFi + GPRS Mixed Fleet
```json
{
"project": "/home/user/epsilon/espilon_bot",
"devices": [
{
"device_id": "wifi001",
"network_mode": "wifi",
"wifi_ssid": "HomeNetwork",
"wifi_pass": "password123",
...
},
{
"device_id": "gprs001",
"network_mode": "gprs",
"gprs_apn": "sl2sfr",
...
}
]
}
```
## Troubleshooting
### Error: "Port not found"
```
❌ Flash failed: Serial port /dev/ttyUSB0 not found
```
**Solution**: Check device connection and port:
```bash
ls /dev/ttyUSB* /dev/ttyACM*
```
Add user to dialout group:
```bash
sudo usermod -a -G dialout $USER
# Log out and log back in
```
### Error: "Build failed"
```
❌ Build failed for device_id
```
**Solutions**:
1. Verify ESP-IDF is sourced:
```bash
. $HOME/esp-idf/export.sh
```
2. Check project path in `devices.json`
3. Manually test build:
```bash
cd espilon_bot
idf.py build
```
### Error: "Permission denied"
```
❌ Permission denied: /dev/ttyUSB0
```
**Solution**:
```bash
sudo chmod 666 /dev/ttyUSB0
# Or add user to dialout group (permanent)
sudo usermod -a -G dialout $USER
```
### Error: "Binary not found"
```
❌ Binary not found: epsilon_bot.bin
```
**Solution**: Check build output for compilation errors. The binary name should match the project configuration.
### WiFi Mode Missing Credentials
```
ValueError: WiFi mode requires wifi_ssid and wifi_pass
```
**Solution**: Ensure `wifi_ssid` and `wifi_pass` are set for devices with `network_mode: "wifi"`.
## Output Example
```
============================================================
# Device 1/3: ce4f626b
############################################################
============================================================
🔧 Building firmware for: ce4f626b (WIFI) on /dev/ttyUSB0
============================================================
✅ Generated sdkconfig.defaults for ce4f626b
🗑️ Removed old sdkconfig
⚙️ Running idf.py build...
✅ Firmware saved: espilon_bot/firmware/ce4f626b.bin
============================================================
🚀 Flashing: ce4f626b (WIFI) on /dev/ttyUSB0
============================================================
📁 Bootloader: espilon_bot/build/bootloader/bootloader.bin
📁 Partitions: espilon_bot/build/partition_table/partition-table.bin
📁 Application: espilon_bot/firmware/ce4f626b.bin
🔌 Port: /dev/ttyUSB0
✅ Successfully flashed ce4f626b
============================================================
📊 SUMMARY
============================================================
✅ Success: 3/3
❌ Failed: 0/3
============================================================
```
## Security Considerations
### Encryption Keys
**WARNING**: The default crypto keys are for TESTING ONLY:
- `crypto_key`: "testde32chars00000000000000000000"
- `crypto_nonce`: "noncenonceno"
For production use:
1. Generate secure keys:
```bash
# 32-byte key
openssl rand -hex 32
# 12-byte nonce
openssl rand -hex 12
```
2. Update in `devices.json` or use `--crypto-key` and `--crypto-nonce`
3. **Never commit real keys to version control**
### Device IDs
Generate random device IDs:
```bash
openssl rand -hex 4 # Generates 8-character hex ID
```
## Files Generated
During operation, the script creates:
```
espilon_bot/
├── sdkconfig # Generated during build (auto-deleted)
├── sdkconfig.defaults # Overwritten per device
├── sdkconfig.defaults.bak # Backup of previous config
├── build/ # ESP-IDF build artifacts
│ ├── bootloader/
│ ├── partition_table/
│ └── epsilon_bot.bin
└── firmware/ # Saved firmware binaries
├── ce4f626b.bin
├── a91dd021.bin
└── f34592e0.bin
```
## Tips
1. **Batch Processing**: Connect multiple ESP32s to different USB ports, configure all in `devices.json`, and flash them all at once.
2. **Parallel Builds**: For faster processing with many devices, consider building in parallel (future enhancement).
3. **Configuration Templates**: Keep multiple `devices.json` files for different deployment scenarios:
- `devices-wifi.json`
- `devices-gprs.json`
- `devices-production.json`
- `devices-testing.json`
4. **Firmware Archive**: Save firmware binaries with version tags:
```bash
mkdir -p firmware-archive/v1.0
cp espilon_bot/firmware/*.bin firmware-archive/v1.0/
```
5. **Serial Port Mapping**: Create udev rules for consistent port naming (Linux):
```bash
# /etc/udev/rules.d/99-esp32.rules
SUBSYSTEM=="tty", ATTRS{idVendor}=="1a86", ATTRS{idProduct}=="7523", SYMLINK+="esp32-dev1"
SUBSYSTEM=="tty", ATTRS{idVendor}=="10c4", ATTRS{idProduct}=="ea60", SYMLINK+="esp32-dev2"
```
## Help
```bash
python3 flash.py --help
```
Displays full usage information with examples.
## Related Documentation
- [Installation Guide](../../docs/INSTALL.md) - Full Espilon setup
- [Hardware Guide](../../docs/HARDWARE.md) - Supported boards and wiring
- [Module API](../../docs/MODULES.md) - Available commands and modules
- [Security](../../docs/SECURITY.md) - Security best practices
## License
Part of the Espilon project. See [LICENSE](../../LICENSE) for details.

View File

@ -1,518 +0,0 @@
#!/usr/bin/env python3
"""
Epsilon Multi-Device Flasher
Automates building and flashing ESP32 devices with custom configurations.
"""
import os
import json
import subprocess
import string
import random
import argparse
from typing import List, Optional
from dataclasses import dataclass
# Configuration
CONFIG_FILE = "devices.json"
SDKCONFIG_FILENAME = "sdkconfig.defaults"
@dataclass
class Device:
"""Represents an ESP32 device configuration"""
device_id: str
port: str
srv_ip: str
srv_port: int = 2626
# Network configuration
network_mode: str = "wifi" # "wifi" or "gprs"
wifi_ssid: Optional[str] = None
wifi_pass: Optional[str] = None
gprs_apn: Optional[str] = "sl2sfr"
# Optional settings
hostname: Optional[str] = None
# Modules
module_network: bool = True
module_recon: bool = False
module_fakeap: bool = False
# Recon settings
recon_camera: bool = False
recon_ble_trilat: bool = False
# Security (master key provisioned separately via provision.py)
def __post_init__(self):
"""Generate hostname if not provided"""
if not self.hostname:
self.hostname = self._generate_hostname()
# Validate network mode
if self.network_mode not in ["wifi", "gprs"]:
raise ValueError(f"Invalid network_mode: {self.network_mode}. Must be 'wifi' or 'gprs'")
# Validate WiFi mode has credentials
if self.network_mode == "wifi" and (not self.wifi_ssid or not self.wifi_pass):
raise ValueError("WiFi mode requires wifi_ssid and wifi_pass")
@staticmethod
def _generate_hostname() -> str:
"""Generate a realistic device hostname"""
hostnames = [
"iPhone",
"Android",
f"DESKTOP-{''.join([random.choice(string.digits + string.ascii_uppercase) for _ in range(7)])}",
# iPhones
"iPhone-15-pro-max", "iPhone-15-pro", "iPhone-15", "iPhone-15-plus",
"iPhone-14-pro-max", "iPhone-14-pro", "iPhone-14", "iPhone-14-plus",
"iPhone-se-3rd-gen", "iPhone-13-pro-max",
# Samsung
"galaxy-s24-ultra", "galaxy-s24", "galaxy-z-fold5", "galaxy-z-flip5", "galaxy-a55",
# Xiaomi
"xiaomi-14-ultra", "xiaomi-14", "redmi-note-13-pro-plus", "redmi-note-13-5g", "poco-f6-pro",
# OnePlus
"oneplus-12", "oneplus-12r", "oneplus-11", "oneplus-nord-3", "oneplus-nord-ce-3-lite",
# Google
"pixel-8-pro", "pixel-8", "pixel-7a", "pixel-fold", "pixel-6a",
# Motorola
"moto-edge-50-ultra", "moto-g-stylus-5g-2024", "moto-g-power-2024",
"razr-50-ultra", "moto-e32",
# Sony
"xperia-1-vi", "xperia-10-vi", "xperia-5-v", "xperia-l5", "xperia-pro-i",
# Oppo
"oppo-find-x6-pro", "oppo-reno9-pro", "oppo-a78", "oppo-f21-pro", "oppo-a17",
# Vivo
"vivo-x90-pro-plus", "vivo-x90-pro", "vivo-y35", "vivo-y75", "vivo-v29e",
# Realme
"realme-11-pro-plus", "realme-10x", "realme-9i", "realme-c33", "realme-11x",
# Asus
"rog-phone-8-pro", "zenfone-10", "rog-phone-7", "rog-phone-6d", "asus-zenfone-9",
# Lenovo
"lenovo-legion-y90", "lenovo-k14-note", "lenovo-k14-plus", "lenovo-tab-m10",
# Honor
"honor-90", "honor-x8a", "honor-70-pro", "honor-magic5-pro", "honor-x7a",
# Huawei
"huawei-p60-pro", "huawei-p50-pro", "huawei-mate-50-pro", "huawei-mate-xs-2", "huawei-nova-11",
# LG
"lg-wing", "lg-velvet", "lg-g8x-thinQ", "lg-v60-thinQ", "lg-k92-5g"
]
return random.choice(hostnames)
@classmethod
def from_dict(cls, data: dict):
"""Create Device from dictionary"""
return cls(
device_id=data["device_id"],
port=data["port"],
srv_ip=data["srv_ip"],
srv_port=data.get("srv_port", 2626),
network_mode=data.get("network_mode", "wifi"),
wifi_ssid=data.get("wifi_ssid"),
wifi_pass=data.get("wifi_pass"),
gprs_apn=data.get("gprs_apn", "sl2sfr"),
hostname=data.get("hostname"),
module_network=data.get("module_network", True),
module_recon=data.get("module_recon", False),
module_fakeap=data.get("module_fakeap", False),
recon_camera=data.get("recon_camera", False),
recon_ble_trilat=data.get("recon_ble_trilat", False)
)
def __str__(self):
return f"{self.device_id} ({self.network_mode.upper()}) on {self.port}"
class FirmwareBuilder:
"""Handles firmware building for ESP32 devices"""
def __init__(self, project_path: str):
self.project_path = project_path
self.sdkconfig_path = os.path.join(self.project_path, SDKCONFIG_FILENAME)
self.build_dir = os.path.join(self.project_path, "build")
self.firmware_dir = os.path.join(self.project_path, "firmware")
os.makedirs(self.firmware_dir, exist_ok=True)
def generate_sdkconfig(self, device: Device):
"""Generate sdkconfig.defaults for a specific device"""
# Backup existing config
if os.path.exists(self.sdkconfig_path):
backup_path = f"{self.sdkconfig_path}.bak"
with open(self.sdkconfig_path, "r") as src, open(backup_path, "w") as dst:
dst.write(src.read())
# Generate new config
config_lines = []
# Device ID
config_lines.append(f'CONFIG_DEVICE_ID="{device.device_id}"')
# Network Mode
if device.network_mode == "wifi":
config_lines.append("CONFIG_NETWORK_WIFI=y")
config_lines.append("CONFIG_NETWORK_GPRS=n")
config_lines.append(f'CONFIG_WIFI_SSID="{device.wifi_ssid}"')
config_lines.append(f'CONFIG_WIFI_PASS="{device.wifi_pass}"')
else: # gprs
config_lines.append("CONFIG_NETWORK_WIFI=n")
config_lines.append("CONFIG_NETWORK_GPRS=y")
config_lines.append(f'CONFIG_GPRS_APN="{device.gprs_apn}"')
# Server
config_lines.append(f'CONFIG_SERVER_IP="{device.srv_ip}"')
config_lines.append(f'CONFIG_SERVER_PORT={device.srv_port}')
# Security (master key provisioned via provision.py into factory NVS)
# Modules
config_lines.append(f'CONFIG_MODULE_NETWORK={"y" if device.module_network else "n"}')
config_lines.append(f'CONFIG_MODULE_RECON={"y" if device.module_recon else "n"}')
config_lines.append(f'CONFIG_MODULE_FAKEAP={"y" if device.module_fakeap else "n"}')
# Recon settings (only if module enabled)
if device.module_recon:
config_lines.append(f'CONFIG_RECON_MODE_CAMERA={"y" if device.recon_camera else "n"}')
config_lines.append(f'CONFIG_RECON_MODE_BLE_TRILAT={"y" if device.recon_ble_trilat else "n"}')
# Crypto: ChaCha20-Poly1305 + HKDF (mbedtls legacy API, ESP-IDF v5.3)
config_lines.append("CONFIG_MBEDTLS_CHACHA20_C=y")
config_lines.append("CONFIG_MBEDTLS_POLY1305_C=y")
config_lines.append("CONFIG_MBEDTLS_CHACHAPOLY_C=y")
config_lines.append("CONFIG_MBEDTLS_HKDF_C=y")
# Partition table (custom with factory NVS)
config_lines.append("CONFIG_PARTITION_TABLE_CUSTOM=y")
config_lines.append('CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions.csv"')
config_lines.append("CONFIG_LWIP_IPV4_NAPT=y")
config_lines.append("CONFIG_LWIP_IPV4_NAPT_PORTMAP=y")
config_lines.append("CONFIG_LWIP_IP_FORWARD=y")
config_lines.append(f'CONFIG_LWIP_LOCAL_HOSTNAME="{device.hostname}"')
# Bluetooth (for BLE trilateration)
if device.recon_ble_trilat:
config_lines.append("CONFIG_BT_ENABLED=y")
config_lines.append("CONFIG_BT_BLUEDROID_ENABLED=y")
config_lines.append("CONFIG_BT_BLE_ENABLED=y")
# Write config
with open(self.sdkconfig_path, "w") as f:
f.write("\n".join(config_lines) + "\n")
print(f"✅ Generated sdkconfig.defaults for {device.device_id}")
def build(self, device: Device) -> Optional[str]:
"""Build firmware for a specific device"""
print(f"\n{'='*60}")
print(f"🔧 Building firmware for: {device}")
print(f"{'='*60}")
# Generate config
self.generate_sdkconfig(device)
# Remove old sdkconfig to force reconfiguration
sdkconfig = os.path.join(self.project_path, "sdkconfig")
if os.path.exists(sdkconfig):
os.remove(sdkconfig)
print("🗑️ Removed old sdkconfig")
# Build
try:
print("⚙️ Running idf.py build...")
result = subprocess.run(
["bash", "-c", f". $HOME/esp-idf/export.sh > /dev/null 2>&1 && idf.py -C {self.project_path} -D SDKCONFIG_DEFAULTS={SDKCONFIG_FILENAME} build 2>&1"],
capture_output=True,
text=True,
timeout=300 # 5 minutes timeout
)
if result.returncode != 0:
print(f"❌ Build failed for {device.device_id}")
print("Error output:")
print(result.stdout[-2000:] if len(result.stdout) > 2000 else result.stdout)
return None
except subprocess.TimeoutExpired:
print(f"❌ Build timeout for {device.device_id}")
return None
except Exception as e:
print(f"❌ Build error for {device.device_id}: {e}")
return None
# Find binary
bin_name = "epsilon_bot.bin"
bin_path = os.path.join(self.build_dir, bin_name)
if not os.path.exists(bin_path):
print(f"❌ Binary not found: {bin_path}")
return None
# Copy to firmware directory with device ID
output_bin = os.path.join(self.firmware_dir, f"{device.device_id}.bin")
subprocess.run(["cp", bin_path, output_bin], check=True)
print(f"✅ Firmware saved: {output_bin}")
return output_bin
class Flasher:
"""Handles flashing ESP32 devices"""
def __init__(self, project_path: str):
self.project_path = project_path
self.build_dir = os.path.join(project_path, "build")
def flash(self, device: Device, bin_path: str):
"""Flash a device with compiled firmware"""
print(f"\n{'='*60}")
print(f"🚀 Flashing: {device}")
print(f"{'='*60}")
# Locate required files
bootloader = os.path.join(self.build_dir, "bootloader", "bootloader.bin")
partitions = os.path.join(self.build_dir, "partition_table", "partition-table.bin")
# Check all files exist
if not os.path.exists(bootloader):
print(f"❌ Missing bootloader: {bootloader}")
return False
if not os.path.exists(partitions):
print(f"❌ Missing partition table: {partitions}")
return False
if not os.path.exists(bin_path):
print(f"❌ Missing application binary: {bin_path}")
return False
print(f"📁 Bootloader: {bootloader}")
print(f"📁 Partitions: {partitions}")
print(f"📁 Application: {bin_path}")
print(f"🔌 Port: {device.port}")
# Flash
try:
subprocess.run([
"esptool.py",
"--chip", "esp32",
"--port", device.port,
"--baud", "460800",
"write_flash", "-z",
"0x1000", bootloader,
"0x8000", partitions,
"0x10000", bin_path
], check=True)
print(f"✅ Successfully flashed {device.device_id}")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Flash failed for {device.device_id}: {e}")
return False
except Exception as e:
print(f"❌ Unexpected error flashing {device.device_id}: {e}")
return False
def load_devices_from_config(config_path: str) -> tuple[str, List[Device]]:
"""Load devices from JSON config file"""
if not os.path.exists(config_path):
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, "r") as f:
data = json.load(f)
project_path = data.get("project")
if not project_path:
raise ValueError("Missing 'project' field in config")
devices_data = data.get("devices", [])
devices = [Device.from_dict(d) for d in devices_data]
return project_path, devices
def main():
parser = argparse.ArgumentParser(
description="Epsilon ESP32 Multi-Device Flasher",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Flash all devices from config file
python flash.py --config devices.json
# Flash a single device manually (WiFi)
python flash.py --manual \\
--project /home/user/epsilon/espilon_bot \\
--device-id abc12345 \\
--port /dev/ttyUSB0 \\
--srv-ip 192.168.1.100 \\
--wifi-ssid MyWiFi \\
--wifi-pass MyPassword
# Flash a single device manually (GPRS)
python flash.py --manual \\
--project /home/user/epsilon/espilon_bot \\
--device-id def67890 \\
--port /dev/ttyUSB1 \\
--srv-ip 203.0.113.10 \\
--network-mode gprs \\
--gprs-apn sl2sfr
# Build only (no flash)
python flash.py --config devices.json --build-only
"""
)
# Mode selection
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--config", type=str, help="Path to devices.json config file")
mode_group.add_argument("--manual", action="store_true", help="Manual device configuration")
# Manual device configuration
parser.add_argument("--project", type=str, help="Path to epsilon_bot project directory")
parser.add_argument("--device-id", type=str, help="Device ID (8 hex chars)")
parser.add_argument("--port", type=str, help="Serial port (e.g., /dev/ttyUSB0)")
parser.add_argument("--srv-ip", type=str, help="C2 server IP address")
parser.add_argument("--srv-port", type=int, default=2626, help="C2 server port (default: 2626)")
# Network configuration
parser.add_argument("--network-mode", choices=["wifi", "gprs"], default="wifi", help="Network mode")
parser.add_argument("--wifi-ssid", type=str, help="WiFi SSID (required for WiFi mode)")
parser.add_argument("--wifi-pass", type=str, help="WiFi password (required for WiFi mode)")
parser.add_argument("--gprs-apn", type=str, default="sl2sfr", help="GPRS APN (default: sl2sfr)")
# Optional settings
parser.add_argument("--hostname", type=str, help="Device hostname (random if not specified)")
# Crypto keys are now provisioned separately via provision.py
# Modules
parser.add_argument("--enable-recon", action="store_true", help="Enable recon module")
parser.add_argument("--enable-fakeap", action="store_true", help="Enable fake AP module")
parser.add_argument("--enable-camera", action="store_true", help="Enable camera reconnaissance")
parser.add_argument("--enable-ble-trilat", action="store_true", help="Enable BLE trilateration")
# Actions
parser.add_argument("--build-only", action="store_true", help="Build firmware without flashing")
parser.add_argument("--flash-only", action="store_true", help="Flash existing firmware without rebuilding")
args = parser.parse_args()
# Load devices
if args.config:
# Load from config file
project_path, devices = load_devices_from_config(args.config)
print(f"📋 Loaded {len(devices)} device(s) from {args.config}")
print(f"📂 Project: {project_path}")
else:
# Manual device
required = ["project", "device_id", "port", "srv_ip"]
missing = [f for f in required if not getattr(args, f.replace("-", "_"))]
if missing:
parser.error(f"--manual mode requires: {', '.join(f'--{f}' for f in missing)}")
# Validate WiFi requirements
if args.network_mode == "wifi" and (not args.wifi_ssid or not args.wifi_pass):
parser.error("WiFi mode requires --wifi-ssid and --wifi-pass")
project_path = args.project
device = Device(
device_id=args.device_id,
port=args.port,
srv_ip=args.srv_ip,
srv_port=args.srv_port,
network_mode=args.network_mode,
wifi_ssid=args.wifi_ssid,
wifi_pass=args.wifi_pass,
gprs_apn=args.gprs_apn,
hostname=args.hostname,
module_network=True,
module_recon=args.enable_recon,
module_fakeap=args.enable_fakeap,
recon_camera=args.enable_camera,
recon_ble_trilat=args.enable_ble_trilat
)
devices = [device]
print(f"📋 Manual device configuration: {device}")
# Validate project path
if not os.path.exists(project_path):
print(f"❌ Project path not found: {project_path}")
return 1
# Initialize tools
builder = FirmwareBuilder(project_path)
flasher = Flasher(project_path)
# Process each device
success_count = 0
fail_count = 0
for i, device in enumerate(devices, 1):
print(f"\n{'#'*60}")
print(f"# Device {i}/{len(devices)}: {device.device_id}")
print(f"{'#'*60}")
try:
# Build
if not args.flash_only:
bin_path = builder.build(device)
if not bin_path:
print(f"⚠️ Skipping flash for {device.device_id} (build failed)")
fail_count += 1
continue
else:
# Use existing binary
bin_path = os.path.join(builder.firmware_dir, f"{device.device_id}.bin")
if not os.path.exists(bin_path):
print(f"❌ Firmware not found: {bin_path}")
fail_count += 1
continue
# Flash
if not args.build_only:
if flasher.flash(device, bin_path):
success_count += 1
else:
fail_count += 1
else:
print(f" Build-only mode, skipping flash")
success_count += 1
except Exception as e:
print(f"❌ Error processing {device.device_id}: {e}")
fail_count += 1
# Summary
print(f"\n{'='*60}")
print(f"📊 SUMMARY")
print(f"{'='*60}")
print(f"✅ Success: {success_count}/{len(devices)}")
print(f"❌ Failed: {fail_count}/{len(devices)}")
print(f"{'='*60}\n")
return 0 if fail_count == 0 else 1
if __name__ == "__main__":
exit(main())

View File

@ -1,3 +0,0 @@
key,type,encoding,value
crypto,namespace,,
master_key,data,hex2bin,0d99c1e0e86eb289b51c7e11bf913feb5180b1d266aade18466a3ef591c4986c
1 key type encoding value
2 crypto namespace
3 master_key data hex2bin 0d99c1e0e86eb289b51c7e11bf913feb5180b1d266aade18466a3ef591c4986c

View File

@ -1,212 +0,0 @@
#!/usr/bin/env python3
"""
Epsilon Device Provisioning Tool
Generates a unique 32-byte master key for an ESP32 device,
flashes it into the factory NVS partition, and registers it
in the C2 keystore.
Usage:
python provision.py --device-id abc12345 --port /dev/ttyUSB0
python provision.py --device-id abc12345 --port /dev/ttyUSB0 --keystore ../C3PO/keys.json
"""
import argparse
import json
import os
import subprocess
import sys
import tempfile
FCTRY_PARTITION_OFFSET = 0x10000
FCTRY_PARTITION_SIZE = 0x6000
NVS_NAMESPACE = "crypto"
NVS_KEY = "master_key"
def generate_master_key() -> bytes:
"""Generate a cryptographically secure 32-byte master key."""
return os.urandom(32)
def create_nvs_csv(master_key: bytes, csv_path: str) -> None:
"""Create a CSV file for nvs_partition_gen with the master key."""
with open(csv_path, "w") as f:
f.write("key,type,encoding,value\n")
f.write(f"{NVS_NAMESPACE},namespace,,\n")
f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n")
def generate_nvs_binary(csv_path: str, bin_path: str) -> bool:
"""Generate NVS partition binary from CSV using nvs_partition_gen."""
try:
result = subprocess.run(
[
sys.executable, "-m", "esp_idf_nvs_partition_gen",
"generate", csv_path, bin_path, hex(FCTRY_PARTITION_SIZE),
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
# Fallback: try the script directly from ESP-IDF
idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf"))
nvs_tool = os.path.join(
idf_path, "components", "nvs_flash", "nvs_partition_generator",
"nvs_partition_gen.py"
)
if os.path.exists(nvs_tool):
result = subprocess.run(
[sys.executable, nvs_tool, "generate",
csv_path, bin_path, hex(FCTRY_PARTITION_SIZE)],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
print(f"nvs_partition_gen failed:\n{result.stdout}\n{result.stderr}")
return False
return True
except FileNotFoundError:
print("Error: nvs_partition_gen not found. Ensure ESP-IDF is installed.")
return False
except subprocess.TimeoutExpired:
print("Error: nvs_partition_gen timed out")
return False
def flash_partition(port: str, bin_path: str, offset: int) -> bool:
"""Flash a binary to the specified partition offset."""
try:
subprocess.run(
[
"esptool.py",
"--chip", "esp32",
"--port", port,
"--baud", "460800",
"write_flash",
hex(offset), bin_path,
],
check=True,
timeout=60,
)
return True
except subprocess.CalledProcessError as e:
print(f"Flash failed: {e}")
return False
except FileNotFoundError:
print("Error: esptool.py not found. Install with: pip install esptool")
return False
except subprocess.TimeoutExpired:
print("Error: flash timed out")
return False
def update_keystore(keystore_path: str, device_id: str, master_key: bytes) -> None:
"""Add or update the device's master key in the C2 keystore."""
keys = {}
if os.path.exists(keystore_path):
try:
with open(keystore_path, "r") as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
pass
keys[device_id] = master_key.hex()
with open(keystore_path, "w") as f:
json.dump(keys, f, indent=2)
print(f"Keystore updated: {keystore_path}")
def main():
parser = argparse.ArgumentParser(
description="Epsilon ESP32 Device Provisioning",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Provision a device and register in default keystore
python provision.py --device-id abc12345 --port /dev/ttyUSB0
# Provision with custom keystore path
python provision.py --device-id abc12345 --port /dev/ttyUSB0 \\
--keystore ../C3PO/keys.json
# Generate key only (no flash)
python provision.py --device-id abc12345 --no-flash
""",
)
parser.add_argument("--device-id", required=True, help="Device ID")
parser.add_argument("--port", help="Serial port (e.g., /dev/ttyUSB0)")
parser.add_argument(
"--keystore",
default=os.path.join(os.path.dirname(__file__), "..", "C3PO", "keys.json"),
help="Path to C2 keystore JSON (default: ../C3PO/keys.json)",
)
parser.add_argument("--no-flash", action="store_true",
help="Generate key and update keystore without flashing")
parser.add_argument("--key", help="Use a specific hex-encoded 32-byte key instead of random")
args = parser.parse_args()
if not args.no_flash and not args.port:
parser.error("--port is required unless --no-flash is specified")
print(f"{'='*50}")
print(f" Epsilon Device Provisioning")
print(f"{'='*50}")
print(f" Device ID : {args.device_id}")
print(f" Port : {args.port or 'N/A (no-flash)'}")
print(f" Keystore : {args.keystore}")
print(f"{'='*50}")
# 1) Generate or parse master key
if args.key:
try:
master_key = bytes.fromhex(args.key)
if len(master_key) != 32:
print(f"Error: --key must be 32 bytes (64 hex chars), got {len(master_key)}")
return 1
except ValueError:
print("Error: --key must be valid hex")
return 1
print(f" Using provided key: {master_key.hex()[:16]}...")
else:
master_key = generate_master_key()
print(f" Generated key : {master_key.hex()[:16]}...")
# 2) Flash to device if requested
if not args.no_flash:
with tempfile.TemporaryDirectory() as tmpdir:
csv_path = os.path.join(tmpdir, "fctry.csv")
bin_path = os.path.join(tmpdir, "fctry.bin")
print("\nGenerating NVS binary...")
create_nvs_csv(master_key, csv_path)
if not generate_nvs_binary(csv_path, bin_path):
print("Failed to generate NVS binary")
return 1
print(f"Flashing factory NVS to {args.port} at {hex(FCTRY_PARTITION_OFFSET)}...")
if not flash_partition(args.port, bin_path, FCTRY_PARTITION_OFFSET):
print("Failed to flash factory NVS")
return 1
print("Factory NVS flashed successfully")
# 3) Update keystore
keystore_path = os.path.abspath(args.keystore)
update_keystore(keystore_path, args.device_id, master_key)
print(f"\n{'='*50}")
print(f" Provisioning complete for {args.device_id}")
print(f"{'='*50}")
return 0
if __name__ == "__main__":
sys.exit(main())