Crypto: - Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD - HKDF-SHA256 key derivation from per-device factory NVS master keys - Random 12-byte nonce per message (ESP32 hardware RNG) - crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2) - Custom partition table with factory NVS (fctry at 0x10000) Firmware: - crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt - crypto_init() at boot with esp_restart() on failure - Fix command_t initializations across all modules (sub/help fields) - Clean CMakeLists dependencies for ESP-IDF v5.3.2 C3PO (C2): - Rename tools/c2 + tools/c3po -> tools/C3PO - Per-device CryptoContext with HKDF key derivation - KeyStore (keys.json) for master key management - Transport parses device_id:base64(...) wire format Tools: - New tools/provisioning/provision.py for factory NVS key generation - Updated flasher with mbedtls config for v5.3.2 Docs: - Update all READMEs for new crypto, C3PO paths, provisioning - Update roadmap, architecture diagrams, security sections - Update CONTRIBUTING.md project structure
216 lines
7.0 KiB
Python
216 lines
7.0 KiB
Python
"""
|
|
Command input widget with history and zsh-style tab completion.
|
|
"""
|
|
from textual.widgets import Input
|
|
from textual.message import Message
|
|
from typing import Callable, Optional
|
|
|
|
|
|
class CommandInput(Input):
|
|
"""Command input with history and zsh-style tab completion."""
|
|
|
|
DEFAULT_CSS = """
|
|
CommandInput {
|
|
dock: bottom;
|
|
height: 1;
|
|
border: none;
|
|
background: $surface;
|
|
padding: 0 1;
|
|
}
|
|
CommandInput:focus {
|
|
border: none;
|
|
}
|
|
"""
|
|
|
|
class CommandSubmitted(Message):
|
|
"""Posted when a command is submitted."""
|
|
def __init__(self, command: str):
|
|
self.command = command
|
|
super().__init__()
|
|
|
|
class CompletionsAvailable(Message):
|
|
"""Posted when multiple completions are available."""
|
|
def __init__(self, completions: list[str], word: str):
|
|
self.completions = completions
|
|
self.word = word
|
|
super().__init__()
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(
|
|
placeholder="c2:> Type command here...",
|
|
**kwargs
|
|
)
|
|
self._history: list[str] = []
|
|
self._history_index: int = -1
|
|
self._current_input: str = ""
|
|
self._completer: Optional[Callable[[str, int], Optional[str]]] = None
|
|
self._last_completion_text: str = ""
|
|
self._last_completions: list[str] = []
|
|
self._completion_cycle_index: int = 0
|
|
|
|
def set_completer(self, completer: Callable[[str, int], Optional[str]]):
|
|
"""Set the tab completion function (same signature as readline completer)."""
|
|
self._completer = completer
|
|
|
|
def on_key(self, event) -> None:
|
|
"""Handle special keys for history and completion."""
|
|
if event.key == "up":
|
|
event.prevent_default()
|
|
self._navigate_history(-1)
|
|
elif event.key == "down":
|
|
event.prevent_default()
|
|
self._navigate_history(1)
|
|
elif event.key == "tab":
|
|
event.prevent_default()
|
|
self._handle_tab_completion()
|
|
|
|
def _get_all_completions(self, word: str) -> list[str]:
|
|
"""Get all possible completions for a word."""
|
|
if not self._completer:
|
|
return []
|
|
|
|
completions = []
|
|
state = 0
|
|
while True:
|
|
completion = self._completer(word, state)
|
|
if completion is None:
|
|
break
|
|
completions.append(completion)
|
|
state += 1
|
|
return completions
|
|
|
|
def _find_common_prefix(self, strings: list[str]) -> str:
|
|
"""Find the longest common prefix among strings."""
|
|
if not strings:
|
|
return ""
|
|
if len(strings) == 1:
|
|
return strings[0]
|
|
|
|
prefix = strings[0]
|
|
for s in strings[1:]:
|
|
while not s.startswith(prefix):
|
|
prefix = prefix[:-1]
|
|
if not prefix:
|
|
return ""
|
|
return prefix
|
|
|
|
def _handle_tab_completion(self):
|
|
"""Handle zsh-style tab completion."""
|
|
if not self._completer:
|
|
return
|
|
|
|
current_text = self.value
|
|
cursor_pos = self.cursor_position
|
|
|
|
# Get the word being completed
|
|
text_before_cursor = current_text[:cursor_pos]
|
|
parts = text_before_cursor.split()
|
|
|
|
if not parts:
|
|
word_to_complete = ""
|
|
elif text_before_cursor.endswith(" "):
|
|
word_to_complete = ""
|
|
else:
|
|
word_to_complete = parts[-1]
|
|
|
|
# Check if context changed (new completion session)
|
|
context_changed = text_before_cursor != self._last_completion_text
|
|
|
|
if context_changed:
|
|
# New completion session - get all completions
|
|
self._last_completions = self._get_all_completions(word_to_complete)
|
|
self._completion_cycle_index = 0
|
|
self._last_completion_text = text_before_cursor
|
|
|
|
if not self._last_completions:
|
|
# No completions
|
|
return
|
|
|
|
if len(self._last_completions) == 1:
|
|
# Single match - complete directly
|
|
self._apply_completion(self._last_completions[0], word_to_complete, cursor_pos)
|
|
self._last_completions = []
|
|
return
|
|
|
|
# Multiple matches - complete to common prefix and show options
|
|
common_prefix = self._find_common_prefix(self._last_completions)
|
|
|
|
if common_prefix and len(common_prefix) > len(word_to_complete):
|
|
# Complete to common prefix
|
|
self._apply_completion(common_prefix, word_to_complete, cursor_pos)
|
|
|
|
# Show all completions
|
|
self.post_message(self.CompletionsAvailable(
|
|
self._last_completions.copy(),
|
|
word_to_complete
|
|
))
|
|
|
|
else:
|
|
# Same context - cycle through completions
|
|
if not self._last_completions:
|
|
return
|
|
|
|
# Get next completion in cycle
|
|
completion = self._last_completions[self._completion_cycle_index]
|
|
self._apply_completion(completion, word_to_complete, cursor_pos)
|
|
|
|
# Advance cycle
|
|
self._completion_cycle_index = (self._completion_cycle_index + 1) % len(self._last_completions)
|
|
|
|
def _apply_completion(self, completion: str, word_to_complete: str, cursor_pos: int):
|
|
"""Apply a completion to the input."""
|
|
current_text = self.value
|
|
text_before_cursor = current_text[:cursor_pos]
|
|
|
|
if word_to_complete:
|
|
prefix = text_before_cursor[:-len(word_to_complete)]
|
|
else:
|
|
prefix = text_before_cursor
|
|
|
|
new_text = prefix + completion + current_text[cursor_pos:]
|
|
new_cursor = len(prefix) + len(completion)
|
|
|
|
self.value = new_text
|
|
self.cursor_position = new_cursor
|
|
self._last_completion_text = new_text[:new_cursor]
|
|
|
|
def _navigate_history(self, direction: int):
|
|
"""Navigate through command history."""
|
|
if not self._history:
|
|
return
|
|
|
|
if self._history_index == -1:
|
|
self._current_input = self.value
|
|
|
|
new_index = self._history_index + direction
|
|
|
|
if new_index < -1:
|
|
new_index = -1
|
|
elif new_index >= len(self._history):
|
|
new_index = len(self._history) - 1
|
|
|
|
self._history_index = new_index
|
|
|
|
if self._history_index == -1:
|
|
self.value = self._current_input
|
|
else:
|
|
self.value = self._history[-(self._history_index + 1)]
|
|
|
|
self.cursor_position = len(self.value)
|
|
|
|
def action_submit(self) -> None:
|
|
"""Submit the current command."""
|
|
command = self.value.strip()
|
|
if command:
|
|
self._history.append(command)
|
|
if len(self._history) > 100:
|
|
self._history.pop(0)
|
|
self.post_message(self.CommandSubmitted(command))
|
|
|
|
self.value = ""
|
|
self._history_index = -1
|
|
self._current_input = ""
|
|
self._last_completions = []
|
|
self._completion_cycle_index = 0
|
|
self._last_completion_text = ""
|