Crypto: - Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD - HKDF-SHA256 key derivation from per-device factory NVS master keys - Random 12-byte nonce per message (ESP32 hardware RNG) - crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2) - Custom partition table with factory NVS (fctry at 0x10000) Firmware: - crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt - crypto_init() at boot with esp_restart() on failure - Fix command_t initializations across all modules (sub/help fields) - Clean CMakeLists dependencies for ESP-IDF v5.3.2 C3PO (C2): - Rename tools/c2 + tools/c3po -> tools/C3PO - Per-device CryptoContext with HKDF key derivation - KeyStore (keys.json) for master key management - Transport parses device_id:base64(...) wire format Tools: - New tools/provisioning/provision.py for factory NVS key generation - Updated flasher with mbedtls config for v5.3.2 Docs: - Update all READMEs for new crypto, C3PO paths, provisioning - Update roadmap, architecture diagrams, security sections - Update CONTRIBUTING.md project structure
152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
import time
|
|
from utils.constant import _color
|
|
|
|
# TUI bridge import (lazy to avoid circular imports)
|
|
_tui_bridge = None
|
|
|
|
|
|
def _get_bridge():
|
|
global _tui_bridge
|
|
if _tui_bridge is None:
|
|
try:
|
|
from tui.bridge import tui_bridge
|
|
_tui_bridge = tui_bridge
|
|
except ImportError:
|
|
_tui_bridge = False
|
|
return _tui_bridge if _tui_bridge else None
|
|
|
|
|
|
class Display:
|
|
_tui_mode = False
|
|
|
|
@classmethod
|
|
def enable_tui_mode(cls):
|
|
"""Enable TUI mode - routes output to TUI bridge instead of print."""
|
|
cls._tui_mode = True
|
|
|
|
@classmethod
|
|
def disable_tui_mode(cls):
|
|
"""Disable TUI mode - back to print output."""
|
|
cls._tui_mode = False
|
|
|
|
@staticmethod
|
|
def _timestamp() -> str:
|
|
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
|
|
@staticmethod
|
|
def system_message(message: str):
|
|
if Display._tui_mode:
|
|
bridge = _get_bridge()
|
|
if bridge:
|
|
from tui.bridge import TUIMessage, MessageType
|
|
bridge.post_message(TUIMessage(
|
|
msg_type=MessageType.SYSTEM_MESSAGE,
|
|
payload=message
|
|
))
|
|
return
|
|
print(f"{Display._timestamp()} {_color('CYAN')}[SYSTEM]{_color('RESET')} {message}")
|
|
|
|
@staticmethod
|
|
def device_event(device_id: str, event: str):
|
|
if Display._tui_mode:
|
|
bridge = _get_bridge()
|
|
if bridge:
|
|
from tui.bridge import TUIMessage, MessageType
|
|
# Detect special events
|
|
if "Connected from" in event:
|
|
msg_type = MessageType.DEVICE_CONNECTED
|
|
elif "Reconnected from" in event:
|
|
msg_type = MessageType.DEVICE_RECONNECTED
|
|
elif event == "Disconnected":
|
|
msg_type = MessageType.DEVICE_DISCONNECTED
|
|
else:
|
|
msg_type = MessageType.DEVICE_EVENT
|
|
bridge.post_message(TUIMessage(
|
|
msg_type=msg_type,
|
|
device_id=device_id,
|
|
payload=event
|
|
))
|
|
return
|
|
print(f"{Display._timestamp()} {_color('YELLOW')}[DEVICE:{device_id}]{_color('RESET')} {event}")
|
|
|
|
@staticmethod
|
|
def command_sent(device_id: str, command_name: str, request_id: str):
|
|
if Display._tui_mode:
|
|
bridge = _get_bridge()
|
|
if bridge:
|
|
from tui.bridge import TUIMessage, MessageType
|
|
bridge.post_message(TUIMessage(
|
|
msg_type=MessageType.COMMAND_SENT,
|
|
device_id=device_id,
|
|
payload=command_name,
|
|
request_id=request_id
|
|
))
|
|
return
|
|
print(f"{Display._timestamp()} {_color('BLUE')}[CMD_SENT:{request_id}]{_color('RESET')} To {device_id}: {command_name}")
|
|
|
|
@staticmethod
|
|
def command_response(request_id: str, device_id: str, response: str):
|
|
if Display._tui_mode:
|
|
bridge = _get_bridge()
|
|
if bridge:
|
|
from tui.bridge import TUIMessage, MessageType
|
|
bridge.post_message(TUIMessage(
|
|
msg_type=MessageType.COMMAND_RESPONSE,
|
|
device_id=device_id,
|
|
payload=response,
|
|
request_id=request_id
|
|
))
|
|
return
|
|
print(f"{Display._timestamp()} {_color('GREEN')}[CMD_RESP:{request_id}]{_color('RESET')} From {device_id}: {response}")
|
|
|
|
@staticmethod
|
|
def error(message: str):
|
|
if Display._tui_mode:
|
|
bridge = _get_bridge()
|
|
if bridge:
|
|
from tui.bridge import TUIMessage, MessageType
|
|
bridge.post_message(TUIMessage(
|
|
msg_type=MessageType.ERROR,
|
|
payload=message
|
|
))
|
|
return
|
|
print(f"{Display._timestamp()} {_color('RED')}[ERROR]{_color('RESET')} {message}")
|
|
|
|
@staticmethod
|
|
def cli_prompt():
|
|
return f"\n{_color('BLUE')}c2:> {_color('RESET')}"
|
|
|
|
@staticmethod
|
|
def format_duration(seconds: float) -> str:
|
|
seconds = int(seconds)
|
|
m, s = divmod(seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
d, h = divmod(h, 24)
|
|
|
|
if d > 0:
|
|
return f"{d}d {h}h {m}m"
|
|
if h > 0:
|
|
return f"{h}h {m}m {s}s"
|
|
if m > 0:
|
|
return f"{m}m {s}s"
|
|
return f"{s}s"
|
|
|
|
@staticmethod
|
|
def format_timestamp(timestamp: float) -> str:
|
|
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
|
|
|
|
@staticmethod
|
|
def print_table_header(headers: list):
|
|
header_str = ""
|
|
for header in headers:
|
|
header_str += f"{header:<18}"
|
|
print(header_str)
|
|
print("-" * (len(headers) * 18))
|
|
|
|
@staticmethod
|
|
def print_table_row(columns: list):
|
|
row_str = ""
|
|
for col in columns:
|
|
row_str += f"{str(col):<18}"
|
|
print(row_str)
|