espilon-source/tools/C3PO/tui/app.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

297 lines
12 KiB
Python

"""
Main C3PO TUI Application using Textual.
Multi-device view: all connected devices visible simultaneously.
"""
import time
from pathlib import Path
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, Container, ScrollableContainer
from textual.widgets import Static
from tui.bridge import tui_bridge, TUIMessage, MessageType
from tui.widgets.log_pane import GlobalLogPane, DeviceLogPane
from tui.widgets.command_input import CommandInput
from tui.widgets.device_tabs import DeviceTabs
class DeviceContainer(Container):
"""Container for a single device with border and title."""
DEFAULT_CSS = """
DeviceContainer {
height: 1fr;
min-height: 6;
border: solid $secondary;
border-title-color: $text;
border-title-style: bold;
}
"""
def __init__(self, device_id: str, **kwargs):
super().__init__(**kwargs)
self.device_id = device_id
self.border_title = f"DEVICE: {device_id}"
class C3POApp(App):
"""C3PO Command & Control TUI Application."""
CSS_PATH = Path(__file__).parent / "styles" / "c2.tcss"
BINDINGS = [
Binding("alt+g", "toggle_global", "Global", show=True),
Binding("ctrl+l", "clear_global", "Clear", show=True),
Binding("ctrl+q", "quit", "Quit", show=True),
Binding("escape", "focus_input", "Input", show=False),
Binding("tab", "tab_complete", show=False, priority=True),
]
def __init__(self, registry=None, session=None, commander=None, **kwargs):
super().__init__(**kwargs)
self.registry = registry
self.session = session
self.commander = commander
self._device_panes: dict[str, DeviceLogPane] = {}
self._device_containers: dict[str, DeviceContainer] = {}
self._device_modules: dict[str, str] = {}
def compose(self) -> ComposeResult:
yield DeviceTabs(id="tab-bar")
with Horizontal(id="main-content"):
# Left side: all devices stacked vertically
with Vertical(id="devices-panel"):
yield Static("Waiting for devices...", id="no-device-placeholder")
# Right side: global logs
with Container(id="global-log-container") as global_container:
global_container.border_title = "GLOBAL LOGS"
yield GlobalLogPane(id="global-log")
with Vertical(id="input-container"):
yield Static(
"Alt+G:Toggle Global ^L:Clear Logs ^Q:Quit Tab:Complete",
id="shortcuts-bar"
)
yield CommandInput(id="command-input")
def on_mount(self) -> None:
"""Called when app is mounted."""
tui_bridge.set_app(self)
self.set_interval(0.1, self.process_bridge_queue)
cmd_input = self.query_one("#command-input", CommandInput)
if self.session:
cmd_input.set_completer(self._make_completer())
cmd_input.focus()
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.add_system(self._timestamp(), "C3PO TUI initialized - Multi-device view")
def _make_completer(self):
"""Create a completer function that works without readline."""
ESP_COMMANDS = [
"system_reboot", "system_mem", "system_uptime", "system_info",
"ping", "arp_scan", "proxy_start", "proxy_stop", "dos_tcp",
"fakeap_start", "fakeap_stop", "fakeap_status", "fakeap_clients",
"fakeap_portal_start", "fakeap_portal_stop",
"fakeap_sniffer_on", "fakeap_sniffer_off",
"cam_start", "cam_stop", "mlat", "trilat",
]
def completer(text: str, state: int) -> str | None:
if not self.session:
return None
cmd_input = self.query_one("#command-input", CommandInput)
buffer = cmd_input.value
parts = buffer.split()
options = []
if len(parts) <= 1 and not buffer.endswith(" "):
options = ["send", "list", "modules", "group", "help",
"active_commands", "web", "camera"]
elif parts[0] == "send":
if len(parts) == 2 and not buffer.endswith(" "):
options = ["all", "group"] + self.session.registry.ids()
elif len(parts) == 2 and buffer.endswith(" "):
options = ["all", "group"] + self.session.registry.ids()
elif len(parts) == 3 and parts[1] == "group" and not buffer.endswith(" "):
options = list(self.session.groups.all_groups().keys())
elif len(parts) == 3 and parts[1] == "group" and buffer.endswith(" "):
options = ESP_COMMANDS
elif len(parts) == 3 and parts[1] != "group":
options = ESP_COMMANDS
elif len(parts) == 4 and parts[1] == "group":
options = ESP_COMMANDS
elif parts[0] == "web":
if len(parts) <= 2:
options = ["start", "stop", "status"]
elif parts[0] == "camera":
if len(parts) <= 2:
options = ["start", "stop", "status"]
elif parts[0] == "group":
if len(parts) == 2 and not buffer.endswith(" "):
options = ["add", "remove", "list", "show"]
elif len(parts) == 2 and buffer.endswith(" "):
options = ["add", "remove", "list", "show"]
elif parts[1] in ("remove", "show") and len(parts) >= 3:
options = list(self.session.groups.all_groups().keys())
elif parts[1] == "add" and len(parts) >= 3:
options = self.session.registry.ids()
matches = [o for o in options if o.startswith(text)]
return matches[state] if state < len(matches) else None
return completer
def _timestamp(self) -> str:
return time.strftime("%H:%M:%S")
def process_bridge_queue(self) -> None:
for msg in tui_bridge.get_pending_messages():
self._handle_tui_message(msg)
def _handle_tui_message(self, msg: TUIMessage) -> None:
global_log = self.query_one("#global-log", GlobalLogPane)
timestamp = time.strftime("%H:%M:%S", time.localtime(msg.timestamp))
if msg.msg_type == MessageType.SYSTEM_MESSAGE:
global_log.add_system(timestamp, msg.payload)
elif msg.msg_type == MessageType.DEVICE_CONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} connected")
self._add_device_pane(msg.device_id)
tabs = self.query_one("#tab-bar", DeviceTabs)
tabs.add_device(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_RECONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} reconnected")
elif msg.msg_type == MessageType.DEVICE_INFO_UPDATED:
self._device_modules[msg.device_id] = msg.payload
global_log.add_system(timestamp, f"{msg.device_id} modules: {msg.payload}")
self._update_device_title(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_DISCONNECTED:
global_log.add_system(timestamp, f"{msg.device_id} disconnected")
self._remove_device_pane(msg.device_id)
tabs = self.query_one("#tab-bar", DeviceTabs)
tabs.remove_device(msg.device_id)
elif msg.msg_type == MessageType.DEVICE_EVENT:
global_log.add_device_event(timestamp, msg.device_id, msg.payload)
if msg.device_id in self._device_panes:
event_type = self._detect_event_type(msg.payload)
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, event_type)
elif msg.msg_type == MessageType.COMMAND_SENT:
global_log.add_command_sent(timestamp, msg.device_id, msg.payload, msg.request_id)
if msg.device_id in self._device_panes:
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, "cmd_sent")
elif msg.msg_type == MessageType.COMMAND_RESPONSE:
global_log.add_command_response(timestamp, msg.device_id, msg.payload, msg.request_id)
if msg.device_id in self._device_panes:
self._device_panes[msg.device_id].add_event(timestamp, msg.payload, "cmd_resp")
elif msg.msg_type == MessageType.ERROR:
global_log.add_error(timestamp, msg.payload)
def _detect_event_type(self, payload: str) -> str:
payload_upper = payload.upper()
if payload_upper.startswith("INFO:"):
return "info"
elif payload_upper.startswith("LOG:"):
return "log"
elif payload_upper.startswith("ERROR:"):
return "error"
elif payload_upper.startswith("DATA:"):
return "data"
return "info"
def _add_device_pane(self, device_id: str) -> None:
"""Add a new device pane (visible immediately)."""
if device_id in self._device_panes:
return
# Hide placeholder
placeholder = self.query_one("#no-device-placeholder", Static)
placeholder.display = False
# Create container with border for this device
container = DeviceContainer(device_id, id=f"device-container-{device_id}")
pane = DeviceLogPane(device_id, id=f"device-pane-{device_id}")
self._device_containers[device_id] = container
self._device_panes[device_id] = pane
# Mount in the devices panel
devices_panel = self.query_one("#devices-panel", Vertical)
devices_panel.mount(container)
container.mount(pane)
def _remove_device_pane(self, device_id: str) -> None:
"""Remove a device pane."""
if device_id in self._device_containers:
container = self._device_containers.pop(device_id)
container.remove()
self._device_panes.pop(device_id, None)
self._device_modules.pop(device_id, None)
# Show placeholder if no devices
if not self._device_containers:
placeholder = self.query_one("#no-device-placeholder", Static)
placeholder.display = True
def _update_device_title(self, device_id: str) -> None:
"""Update device container title with modules info."""
if device_id in self._device_containers:
modules = self._device_modules.get(device_id, "")
container = self._device_containers[device_id]
if modules:
container.border_title = f"DEVICE: {device_id} [{modules}]"
else:
container.border_title = f"DEVICE: {device_id}"
def on_command_input_completions_available(self, event: CommandInput.CompletionsAvailable) -> None:
global_log = self.query_one("#global-log", GlobalLogPane)
completions_str = " ".join(event.completions)
global_log.add_system(self._timestamp(), f"Completions: {completions_str}")
def on_command_input_command_submitted(self, event: CommandInput.CommandSubmitted) -> None:
command = event.command
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.add_system(self._timestamp(), f"Executing: {command}")
if self.commander:
try:
self.commander.execute(command)
except Exception as e:
global_log.add_error(self._timestamp(), f"Command error: {e}")
def action_toggle_global(self) -> None:
"""Toggle global logs pane visibility."""
global_container = self.query_one("#global-log-container", Container)
global_container.display = not global_container.display
def action_clear_global(self) -> None:
"""Clear global logs pane only."""
global_log = self.query_one("#global-log", GlobalLogPane)
global_log.clear()
def action_focus_input(self) -> None:
self.query_one("#command-input", CommandInput).focus()
def action_tab_complete(self) -> None:
cmd_input = self.query_one("#command-input", CommandInput)
cmd_input.focus()
cmd_input._handle_tab_completion()