espilon-source/tools/C3PO/tui/app.py
Eun0us 8b6c1cd53d ε - ChaCha20-Poly1305 AEAD + HKDF crypto upgrade + C3PO rewrite + docs
Crypto:
- Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD
- HKDF-SHA256 key derivation from per-device factory NVS master keys
- Random 12-byte nonce per message (ESP32 hardware RNG)
- crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2)
- Custom partition table with factory NVS (fctry at 0x10000)

Firmware:
- crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt
- crypto_init() at boot with esp_restart() on failure
- Fix command_t initializations across all modules (sub/help fields)
- Clean CMakeLists dependencies for ESP-IDF v5.3.2

C3PO (C2):
- Rename tools/c2 + tools/c3po -> tools/C3PO
- Per-device CryptoContext with HKDF key derivation
- KeyStore (keys.json) for master key management
- Transport parses device_id:base64(...) wire format

Tools:
- New tools/provisioning/provision.py for factory NVS key generation
- Updated flasher with mbedtls config for v5.3.2

Docs:
- Update all READMEs for new crypto, C3PO paths, provisioning
- Update roadmap, architecture diagrams, security sections
- Update CONTRIBUTING.md project structure
2026-02-10 21:28:45 +01:00

296 lines
12 KiB
Python

"""
Main C3PO TUI Application using Textual.
Multi-device view: all connected devices visible simultaneously.
"""
import time
from pathlib import Path
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, Container, ScrollableContainer
from textual.widgets import Static
from tui.bridge import tui_bridge, TUIMessage, MessageType
from tui.widgets.log_pane import GlobalLogPane, DeviceLogPane
from tui.widgets.command_input import CommandInput
from tui.widgets.device_tabs import DeviceTabs
class DeviceContainer(Container):
"""Container for a single device with border and title."""
DEFAULT_CSS = """
DeviceContainer {
height: 1fr;
min-height: 6;
border: solid $secondary;
border-title-color: $text;
border-title-style: bold;
}
"""
def __init__(self, device_id: str, **kwargs):
super().__init__(**kwargs)
self.device_id = device_id
self.border_title = f"DEVICE: {device_id}"
class C3POApp(App):
"""C3PO Command & Control TUI Application."""
CSS_PATH = Path(__file__).parent / "styles" / "c2.tcss"
BINDINGS = [
Binding("alt+g", "toggle_global", "Global", show=True),
Binding("ctrl+l", "clear_global", "Clear", show=True),
Binding("ctrl+q", "quit", "Quit", show=True),
Binding("escape", "focus_input", "Input", show=False),
Binding("tab", "tab_complete", show=False, priority=True),
]
def __init__(self, registry=None, cli=None, **kwargs):
super().__init__(**kwargs)
self.registry = registry
self.cli = cli
self._device_panes: dict[str, DeviceLogPane] = {}
self._device_containers: dict[str, DeviceContainer] = {}
self._device_modules: dict[str, str] = {}
def compose(self) -> ComposeResult:
yield DeviceTabs(id="tab-bar")
with Horizontal(id="main-content"):
# Left side: all devices stacked vertically
with Vertical(id="devices-panel"):
yield Static("Waiting for devices...", id="no-device-placeholder")
# Right side: global logs
with Container(id="global-log-container") as global_container:
global_container.border_title = "GLOBAL LOGS"
yield GlobalLogPane(id="global-log")
with Vertical(id="input-container"):
yield Static(
"Alt+G:Toggle Global ^L:Clear Logs ^Q:Quit Tab:Complete",
id="shortcuts-bar"
)
yield CommandInput(id="command-input")
def on_mount(self) -> None:
"""Called when app is mounted."""
tui_bridge.set_app(self)
self.set_interval(0.1, self.process_bridge_queue)
cmd_input = self.query_one("#command-input", CommandInput)
if self.cli:
cmd_input.set_completer(self._make_completer())
cmd_input.focus()
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.add_system(self._timestamp(), "C3PO TUI initialized - Multi-device view")
def _make_completer(self):
"""Create a completer function that works without readline."""
ESP_COMMANDS = [
"system_reboot", "system_mem", "system_uptime", "system_info",
"ping", "arp_scan", "proxy_start", "proxy_stop", "dos_tcp",
"fakeap_start", "fakeap_stop", "fakeap_status", "fakeap_clients",
"fakeap_portal_start", "fakeap_portal_stop",
"fakeap_sniffer_on", "fakeap_sniffer_off",
"cam_start", "cam_stop", "mlat", "trilat",
]
def completer(text: str, state: int) -> str | None:
if not self.cli:
return None
cmd_input = self.query_one("#command-input", CommandInput)
buffer = cmd_input.value
parts = buffer.split()
options = []
if len(parts) <= 1 and not buffer.endswith(" "):
options = ["send", "list", "modules", "group", "help", "clear", "exit",
"active_commands", "web", "camera"]
elif parts[0] == "send":
if len(parts) == 2 and not buffer.endswith(" "):
options = ["all", "group"] + self.cli.registry.ids()
elif len(parts) == 2 and buffer.endswith(" "):
options = ["all", "group"] + self.cli.registry.ids()
elif len(parts) == 3 and parts[1] == "group" and not buffer.endswith(" "):
options = list(self.cli.groups.all_groups().keys())
elif len(parts) == 3 and parts[1] == "group" and buffer.endswith(" "):
options = ESP_COMMANDS
elif len(parts) == 3 and parts[1] != "group":
options = ESP_COMMANDS
elif len(parts) == 4 and parts[1] == "group":
options = ESP_COMMANDS
elif parts[0] == "web":
if len(parts) <= 2:
options = ["start", "stop", "status"]
elif parts[0] == "camera":
if len(parts) <= 2:
options = ["start", "stop", "status"]
elif parts[0] == "group":
if len(parts) == 2 and not buffer.endswith(" "):
options = ["add", "remove", "list", "show"]
elif len(parts) == 2 and buffer.endswith(" "):
options = ["add", "remove", "list", "show"]
elif parts[1] in ("remove", "show") and len(parts) >= 3:
options = list(self.cli.groups.all_groups().keys())
elif parts[1] == "add" and len(parts) >= 3:
options = self.cli.registry.ids()
matches = [o for o in options if o.startswith(text)]
return matches[state] if state < len(matches) else None
return completer
def _timestamp(self) -> str:
return time.strftime("%H:%M:%S")
def process_bridge_queue(self) -> None:
for msg in tui_bridge.get_pending_messages():
self._handle_tui_message(msg)
def _handle_tui_message(self, msg: TUIMessage) -> None:
global_log = self.query_one("#global-log", GlobalLogPane)
timestamp = time.strftime("%H:%M:%S", time.localtime(msg.timestamp))
if msg.msg_type == MessageType.SYSTEM_MESSAGE:
global_log.add_system(timestamp, msg.payload)
elif msg.msg_type == MessageType.DEVICE_CONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} connected")
self._add_device_pane(msg.device_id)
tabs = self.query_one("#tab-bar", DeviceTabs)
tabs.add_device(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_RECONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} reconnected")
elif msg.msg_type == MessageType.DEVICE_INFO_UPDATED:
self._device_modules[msg.device_id] = msg.payload
global_log.add_system(timestamp, f"{msg.device_id} modules: {msg.payload}")
self._update_device_title(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_DISCONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} disconnected")
self._remove_device_pane(msg.device_id)
tabs = self.query_one("#tab-bar", DeviceTabs)
tabs.remove_device(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_EVENT:
global_log.add_device_event(timestamp, msg.device_id, msg.payload)
if msg.device_id in self._device_panes:
event_type = self._detect_event_type(msg.payload)
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, event_type)
elif msg.msg_type == MessageType.COMMAND_SENT:
global_log.add_command_sent(timestamp, msg.device_id, msg.payload, msg.request_id)
if msg.device_id in self._device_panes:
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, "cmd_sent")
elif msg.msg_type == MessageType.COMMAND_RESPONSE:
global_log.add_command_response(timestamp, msg.device_id, msg.payload, msg.request_id)
if msg.device_id in self._device_panes:
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, "cmd_resp")
elif msg.msg_type == MessageType.ERROR:
global_log.add_error(timestamp, msg.payload)
def _detect_event_type(self, payload: str) -> str:
payload_upper = payload.upper()
if payload_upper.startswith("INFO:"):
return "info"
elif payload_upper.startswith("LOG:"):
return "log"
elif payload_upper.startswith("ERROR:"):
return "error"
elif payload_upper.startswith("DATA:"):
return "data"
return "info"
def _add_device_pane(self, device_id: str) -> None:
"""Add a new device pane (visible immediately)."""
if device_id in self._device_panes:
return
# Hide placeholder
placeholder = self.query_one("#no-device-placeholder", Static)
placeholder.display = False
# Create container with border for this device
container = DeviceContainer(device_id, id=f"device-container-{device_id}")
pane = DeviceLogPane(device_id, id=f"device-pane-{device_id}")
self._device_containers[device_id] = container
self._device_panes[device_id] = pane
# Mount in the devices panel
devices_panel = self.query_one("#devices-panel", Vertical)
devices_panel.mount(container)
container.mount(pane)
def _remove_device_pane(self, device_id: str) -> None:
"""Remove a device pane."""
if device_id in self._device_containers:
container = self._device_containers.pop(device_id)
container.remove()
self._device_panes.pop(device_id, None)
self._device_modules.pop(device_id, None)
# Show placeholder if no devices
if not self._device_containers:
placeholder = self.query_one("#no-device-placeholder", Static)
placeholder.display = True
def _update_device_title(self, device_id: str) -> None:
"""Update device container title with modules info."""
if device_id in self._device_containers:
modules = self._device_modules.get(device_id, "")
container = self._device_containers[device_id]
if modules:
container.border_title = f"DEVICE: {device_id} [{modules}]"
else:
container.border_title = f"DEVICE: {device_id}"
def on_command_input_completions_available(self, event: CommandInput.CompletionsAvailable) -> None:
global_log = self.query_one("#global-log", GlobalLogPane)
completions_str = " ".join(event.completions)
global_log.add_system(self._timestamp(), f"Completions: {completions_str}")
def on_command_input_command_submitted(self, event: CommandInput.CommandSubmitted) -> None:
command = event.command
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.add_system(self._timestamp(), f"Executing: {command}")
if self.cli:
try:
self.cli.execute_command(command)
except Exception as e:
global_log.add_error(self._timestamp(), f"Command error: {e}")
def action_toggle_global(self) -> None:
"""Toggle global logs pane visibility."""
global_container = self.query_one("#global-log-container", Container)
global_container.display = not global_container.display
def action_clear_global(self) -> None:
"""Clear global logs pane only."""
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.clear()
def action_focus_input(self) -> None:
self.query_one("#command-input", CommandInput).focus()
def action_tab_complete(self) -> None:
cmd_input = self.query_one("#command-input", CommandInput)
cmd_input.focus()
cmd_input._handle_tab_completion()