Crypto: - Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD - HKDF-SHA256 key derivation from per-device factory NVS master keys - Random 12-byte nonce per message (ESP32 hardware RNG) - crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2) - Custom partition table with factory NVS (fctry at 0x10000) Firmware: - crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt - crypto_init() at boot with esp_restart() on failure - Fix command_t initializations across all modules (sub/help fields) - Clean CMakeLists dependencies for ESP-IDF v5.3.2 C3PO (C2): - Rename tools/c2 + tools/c3po -> tools/C3PO - Per-device CryptoContext with HKDF key derivation - KeyStore (keys.json) for master key management - Transport parses device_id:base64(...) wire format Tools: - New tools/provisioning/provision.py for factory NVS key generation - Updated flasher with mbedtls config for v5.3.2 Docs: - Update all READMEs for new crypto, C3PO paths, provisioning - Update roadmap, architecture diagrams, security sections - Update CONTRIBUTING.md project structure
213 lines
7.0 KiB
Python
213 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Epsilon Device Provisioning Tool
|
|
|
|
Generates a unique 32-byte master key for an ESP32 device,
|
|
flashes it into the factory NVS partition, and registers it
|
|
in the C2 keystore.
|
|
|
|
Usage:
|
|
python provision.py --device-id abc12345 --port /dev/ttyUSB0
|
|
python provision.py --device-id abc12345 --port /dev/ttyUSB0 --keystore ../C3PO/keys.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
FCTRY_PARTITION_OFFSET = 0x10000
|
|
FCTRY_PARTITION_SIZE = 0x6000
|
|
NVS_NAMESPACE = "crypto"
|
|
NVS_KEY = "master_key"
|
|
|
|
|
|
def generate_master_key() -> bytes:
|
|
"""Generate a cryptographically secure 32-byte master key."""
|
|
return os.urandom(32)
|
|
|
|
|
|
def create_nvs_csv(master_key: bytes, csv_path: str) -> None:
|
|
"""Create a CSV file for nvs_partition_gen with the master key."""
|
|
with open(csv_path, "w") as f:
|
|
f.write("key,type,encoding,value\n")
|
|
f.write(f"{NVS_NAMESPACE},namespace,,\n")
|
|
f.write(f"{NVS_KEY},data,hex2bin,{master_key.hex()}\n")
|
|
|
|
|
|
def generate_nvs_binary(csv_path: str, bin_path: str) -> bool:
|
|
"""Generate NVS partition binary from CSV using nvs_partition_gen."""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
sys.executable, "-m", "esp_idf_nvs_partition_gen",
|
|
"generate", csv_path, bin_path, hex(FCTRY_PARTITION_SIZE),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode != 0:
|
|
# Fallback: try the script directly from ESP-IDF
|
|
idf_path = os.environ.get("IDF_PATH", os.path.expanduser("~/esp-idf"))
|
|
nvs_tool = os.path.join(
|
|
idf_path, "components", "nvs_flash", "nvs_partition_generator",
|
|
"nvs_partition_gen.py"
|
|
)
|
|
if os.path.exists(nvs_tool):
|
|
result = subprocess.run(
|
|
[sys.executable, nvs_tool, "generate",
|
|
csv_path, bin_path, hex(FCTRY_PARTITION_SIZE)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"nvs_partition_gen failed:\n{result.stdout}\n{result.stderr}")
|
|
return False
|
|
return True
|
|
except FileNotFoundError:
|
|
print("Error: nvs_partition_gen not found. Ensure ESP-IDF is installed.")
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
print("Error: nvs_partition_gen timed out")
|
|
return False
|
|
|
|
|
|
def flash_partition(port: str, bin_path: str, offset: int) -> bool:
|
|
"""Flash a binary to the specified partition offset."""
|
|
try:
|
|
subprocess.run(
|
|
[
|
|
"esptool.py",
|
|
"--chip", "esp32",
|
|
"--port", port,
|
|
"--baud", "460800",
|
|
"write_flash",
|
|
hex(offset), bin_path,
|
|
],
|
|
check=True,
|
|
timeout=60,
|
|
)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Flash failed: {e}")
|
|
return False
|
|
except FileNotFoundError:
|
|
print("Error: esptool.py not found. Install with: pip install esptool")
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
print("Error: flash timed out")
|
|
return False
|
|
|
|
|
|
def update_keystore(keystore_path: str, device_id: str, master_key: bytes) -> None:
|
|
"""Add or update the device's master key in the C2 keystore."""
|
|
keys = {}
|
|
if os.path.exists(keystore_path):
|
|
try:
|
|
with open(keystore_path, "r") as f:
|
|
keys = json.load(f)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
keys[device_id] = master_key.hex()
|
|
|
|
with open(keystore_path, "w") as f:
|
|
json.dump(keys, f, indent=2)
|
|
|
|
print(f"Keystore updated: {keystore_path}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Epsilon ESP32 Device Provisioning",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Provision a device and register in default keystore
|
|
python provision.py --device-id abc12345 --port /dev/ttyUSB0
|
|
|
|
# Provision with custom keystore path
|
|
python provision.py --device-id abc12345 --port /dev/ttyUSB0 \\
|
|
--keystore ../C3PO/keys.json
|
|
|
|
# Generate key only (no flash)
|
|
python provision.py --device-id abc12345 --no-flash
|
|
""",
|
|
)
|
|
|
|
parser.add_argument("--device-id", required=True, help="Device ID")
|
|
parser.add_argument("--port", help="Serial port (e.g., /dev/ttyUSB0)")
|
|
parser.add_argument(
|
|
"--keystore",
|
|
default=os.path.join(os.path.dirname(__file__), "..", "C3PO", "keys.json"),
|
|
help="Path to C2 keystore JSON (default: ../C3PO/keys.json)",
|
|
)
|
|
parser.add_argument("--no-flash", action="store_true",
|
|
help="Generate key and update keystore without flashing")
|
|
parser.add_argument("--key", help="Use a specific hex-encoded 32-byte key instead of random")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.no_flash and not args.port:
|
|
parser.error("--port is required unless --no-flash is specified")
|
|
|
|
print(f"{'='*50}")
|
|
print(f" Epsilon Device Provisioning")
|
|
print(f"{'='*50}")
|
|
print(f" Device ID : {args.device_id}")
|
|
print(f" Port : {args.port or 'N/A (no-flash)'}")
|
|
print(f" Keystore : {args.keystore}")
|
|
print(f"{'='*50}")
|
|
|
|
# 1) Generate or parse master key
|
|
if args.key:
|
|
try:
|
|
master_key = bytes.fromhex(args.key)
|
|
if len(master_key) != 32:
|
|
print(f"Error: --key must be 32 bytes (64 hex chars), got {len(master_key)}")
|
|
return 1
|
|
except ValueError:
|
|
print("Error: --key must be valid hex")
|
|
return 1
|
|
print(f" Using provided key: {master_key.hex()[:16]}...")
|
|
else:
|
|
master_key = generate_master_key()
|
|
print(f" Generated key : {master_key.hex()[:16]}...")
|
|
|
|
# 2) Flash to device if requested
|
|
if not args.no_flash:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
csv_path = os.path.join(tmpdir, "fctry.csv")
|
|
bin_path = os.path.join(tmpdir, "fctry.bin")
|
|
|
|
print("\nGenerating NVS binary...")
|
|
create_nvs_csv(master_key, csv_path)
|
|
|
|
if not generate_nvs_binary(csv_path, bin_path):
|
|
print("Failed to generate NVS binary")
|
|
return 1
|
|
|
|
print(f"Flashing factory NVS to {args.port} at {hex(FCTRY_PARTITION_OFFSET)}...")
|
|
if not flash_partition(args.port, bin_path, FCTRY_PARTITION_OFFSET):
|
|
print("Failed to flash factory NVS")
|
|
return 1
|
|
|
|
print("Factory NVS flashed successfully")
|
|
|
|
# 3) Update keystore
|
|
keystore_path = os.path.abspath(args.keystore)
|
|
update_keystore(keystore_path, args.device_id, master_key)
|
|
|
|
print(f"\n{'='*50}")
|
|
print(f" Provisioning complete for {args.device_id}")
|
|
print(f"{'='*50}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|