Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
214 lines
7.8 KiB
Python
214 lines
7.8 KiB
Python
"""Background firmware build manager for the OTA web interface."""
|
|
|
|
import json
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
# Import deploy.py as a library (one level up from C3PO/)
|
|
import sys as _sys
|
|
_TOOLS_DIR = str(Path(__file__).resolve().parent.parent.parent)
|
|
if _TOOLS_DIR not in _sys.path:
|
|
_sys.path.insert(0, _TOOLS_DIR)
|
|
|
|
from deploy import (
|
|
DeviceConfig,
|
|
generate_sdkconfig,
|
|
find_app_binary,
|
|
config_from_dict,
|
|
PROJECT_DIR,
|
|
)
|
|
|
|
|
|
class BuildStatus(str, Enum):
|
|
IDLE = "idle"
|
|
BUILDING = "building"
|
|
SUCCESS = "success"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class BuildState:
|
|
status: BuildStatus = BuildStatus.IDLE
|
|
device_id: str = ""
|
|
started_at: float = 0.0
|
|
finished_at: float = 0.0
|
|
output_filename: str = ""
|
|
log_lines: List[str] = field(default_factory=list)
|
|
error: str = ""
|
|
progress_hint: str = ""
|
|
|
|
|
|
class BuildManager:
|
|
"""Manages one firmware build at a time in a background thread."""
|
|
|
|
def __init__(self, firmware_dir: str, deploy_json_path: str):
|
|
self.firmware_dir = firmware_dir
|
|
self.deploy_json_path = deploy_json_path
|
|
self._lock = threading.Lock()
|
|
self._state = BuildState()
|
|
|
|
@property
|
|
def state(self) -> dict:
|
|
s = self._state
|
|
elapsed = 0.0
|
|
if s.status == BuildStatus.BUILDING and s.started_at:
|
|
elapsed = time.time() - s.started_at
|
|
elif s.finished_at and s.started_at:
|
|
elapsed = s.finished_at - s.started_at
|
|
return {
|
|
"status": s.status.value,
|
|
"device_id": s.device_id,
|
|
"started_at": s.started_at,
|
|
"finished_at": s.finished_at,
|
|
"elapsed": round(elapsed, 1),
|
|
"output_filename": s.output_filename,
|
|
"error": s.error,
|
|
"progress_hint": s.progress_hint,
|
|
"log_line_count": len(s.log_lines),
|
|
}
|
|
|
|
def get_log(self, offset: int = 0) -> List[str]:
|
|
return self._state.log_lines[offset:]
|
|
|
|
def get_defaults(self) -> dict:
|
|
try:
|
|
with open(self.deploy_json_path) as f:
|
|
data = json.load(f)
|
|
return data.get("defaults", {})
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
def start_build(self, config_dict: dict) -> tuple:
|
|
"""Start a background build. Returns (success, message)."""
|
|
with self._lock:
|
|
if self._state.status == BuildStatus.BUILDING:
|
|
return False, "A build is already in progress"
|
|
|
|
self._state = BuildState(
|
|
status=BuildStatus.BUILDING,
|
|
device_id=config_dict.get("device_id", "unknown"),
|
|
started_at=time.time(),
|
|
)
|
|
|
|
t = threading.Thread(target=self._build_worker, args=(config_dict,), daemon=True)
|
|
t.start()
|
|
return True, "Build started"
|
|
|
|
def _build_worker(self, config_dict: dict):
|
|
s = self._state
|
|
try:
|
|
# 1. Find ESP-IDF
|
|
s.progress_hint = "Checking ESP-IDF..."
|
|
s.log_lines.append("[build] Checking ESP-IDF environment")
|
|
idf_path = self._find_idf()
|
|
s.log_lines.append(f"[build] ESP-IDF: {idf_path}")
|
|
|
|
# 2. Build DeviceConfig from dict + deploy.json defaults
|
|
s.progress_hint = "Preparing configuration..."
|
|
defaults = self.get_defaults()
|
|
cfg = config_from_dict(config_dict, defaults)
|
|
s.log_lines.append(f"[build] Device: {cfg.device_id}")
|
|
mods = []
|
|
if cfg.mod_network: mods.append("network")
|
|
if cfg.mod_fakeap: mods.append("fakeap")
|
|
if cfg.mod_honeypot: mods.append("honeypot")
|
|
if cfg.mod_recon: mods.append("recon")
|
|
if cfg.recon_camera: mods.append("camera")
|
|
s.log_lines.append(f"[build] Modules: {', '.join(mods) or 'none'}")
|
|
|
|
# 3. Write sdkconfig.defaults
|
|
s.progress_hint = "Writing sdkconfig..."
|
|
sdkconfig_path = PROJECT_DIR / "sdkconfig.defaults"
|
|
if sdkconfig_path.exists():
|
|
shutil.copy2(sdkconfig_path, sdkconfig_path.with_suffix(".defaults.bak"))
|
|
content = generate_sdkconfig(cfg)
|
|
with open(sdkconfig_path, "w") as f:
|
|
f.write(content)
|
|
s.log_lines.append("[build] sdkconfig.defaults written")
|
|
|
|
# 4. Clean build directory
|
|
s.progress_hint = "Cleaning build directory..."
|
|
sdkconfig = PROJECT_DIR / "sdkconfig"
|
|
build_dir = PROJECT_DIR / "build"
|
|
if sdkconfig.exists():
|
|
sdkconfig.unlink()
|
|
if build_dir.exists():
|
|
shutil.rmtree(build_dir)
|
|
s.log_lines.append("[build] Build directory cleaned")
|
|
|
|
# 5. Run idf.py build with streaming output
|
|
s.progress_hint = "Building firmware (this takes 3-8 minutes)..."
|
|
s.log_lines.append("[build] Starting idf.py build...")
|
|
idf_export = os.path.join(idf_path, "export.sh")
|
|
if not os.path.isfile(idf_export):
|
|
raise RuntimeError(f"export.sh not found in {idf_path}")
|
|
cmd = (
|
|
f". {shlex.quote(idf_export)} > /dev/null 2>&1 && "
|
|
f"idf.py -C {shlex.quote(str(PROJECT_DIR))} "
|
|
f"-D SDKCONFIG_DEFAULTS=sdkconfig.defaults "
|
|
f"build 2>&1"
|
|
)
|
|
process = subprocess.Popen(
|
|
["bash", "-c", cmd],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
for line in iter(process.stdout.readline, ""):
|
|
line = line.rstrip()
|
|
if not line:
|
|
continue
|
|
s.log_lines.append(line)
|
|
# Extract progress hints
|
|
lower = line.lower()
|
|
if "compiling" in lower or "building" in lower or "linking" in lower:
|
|
s.progress_hint = line[:100]
|
|
elif "generating" in lower:
|
|
s.progress_hint = line[:100]
|
|
process.wait()
|
|
|
|
if process.returncode != 0:
|
|
raise RuntimeError(f"idf.py build failed (exit code {process.returncode})")
|
|
|
|
# 6. Copy binary to firmware directory
|
|
s.progress_hint = "Copying binary..."
|
|
app_bin = find_app_binary(build_dir)
|
|
bin_size = os.path.getsize(app_bin)
|
|
filename = f"{cfg.device_id}-{int(time.time())}.bin"
|
|
os.makedirs(self.firmware_dir, exist_ok=True)
|
|
dest = os.path.join(self.firmware_dir, filename)
|
|
shutil.copy2(app_bin, dest)
|
|
|
|
s.log_lines.append(f"[build] Binary: {filename} ({bin_size:,} bytes)")
|
|
s.output_filename = filename
|
|
s.progress_hint = "Build complete"
|
|
s.status = BuildStatus.SUCCESS
|
|
s.finished_at = time.time()
|
|
|
|
except Exception as e:
|
|
s.error = str(e)
|
|
s.status = BuildStatus.FAILED
|
|
s.finished_at = time.time()
|
|
s.progress_hint = "Build failed"
|
|
s.log_lines.append(f"[error] {e}")
|
|
|
|
@staticmethod
|
|
def _find_idf() -> str:
|
|
"""Find and validate the ESP-IDF installation path."""
|
|
idf_path = os.environ.get("IDF_PATH", "")
|
|
if idf_path and os.path.isdir(idf_path):
|
|
if os.path.isfile(os.path.join(idf_path, "export.sh")):
|
|
return os.path.realpath(idf_path)
|
|
for candidate in [os.path.expanduser("~/esp-idf"), "/opt/esp-idf"]:
|
|
if os.path.isdir(candidate) and os.path.isfile(os.path.join(candidate, "export.sh")):
|
|
return os.path.realpath(candidate)
|
|
raise RuntimeError("ESP-IDF not found. Set IDF_PATH or install ESP-IDF.")
|