"""CLI parser and subcommand handlers.""" import argparse import os import time import sys import requests from espmon import __version__ from espmon.colors import C, ok, info, err, warn from espmon.config import Config, DEFAULT_BAUD, POLL_INTERVAL, POLL_TIMEOUT from espmon.client import C3POClient from espmon.logs import LogManager from espmon.monitor import SerialMonitor from espmon.daemon import daemonize, stop_daemon # ─── Helpers ───────────────────────────────────────────────────────────────── def _format_duration(seconds: float) -> str: """Format seconds into a human-readable duration.""" s = int(seconds) if s < 60: return f"{s}s" if s < 3600: return f"{s // 60}m{s % 60:02d}s" h = s // 3600 m = (s % 3600) // 60 return f"{h}h{m:02d}m" def _format_size(size: int) -> str: """Format bytes into human-readable size.""" if size > 1024 * 1024: return f"{size / (1024 * 1024):.1f} MB" if size > 1024: return f"{size / 1024:.1f} KB" return f"{size} B" def _detect_ports() -> list: """Detect available serial ports.""" from glob import glob return sorted(glob("/dev/ttyUSB*") + glob("/dev/ttyACM*")) # ─── Subcommand: monitor ──────────────────────────────────────────────────── def cmd_monitor(args) -> int: """Monitor a serial port, logging output to file.""" port = args.port if not os.path.exists(port): err(f"Port {port} not found") available = _detect_ports() if available: info(f"Available ports: {', '.join(available)}") return 1 LogManager.ensure_dirs() if args.bg: return daemonize(port, args.baud) # Foreground mode log_path = LogManager.session_path(port) monitor = SerialMonitor(port, args.baud, log_path) return monitor.run() # ─── Subcommand: tail ──────────────────────────────────────────────────────── def cmd_tail(args) -> int: """Tail the most recent log file.""" log_path = LogManager.latest_log(port=args.port, device=args.device) if not log_path: err("No log files found") info("Start monitoring first: espmon monitor /dev/ttyUSB0") return 1 info(f"{log_path}") with open(log_path, 'r') as f: lines = f.readlines() n = args.lines for line in lines[-n:]: print(line, end='') if args.follow: # Follow mode: keep watching for new lines try: with open(log_path, 'r') as f: f.seek(0, 2) # seek to end while True: line = f.readline() if line: print(line, end='', flush=True) else: time.sleep(0.2) except KeyboardInterrupt: pass return 0 # ─── Subcommand: cmd ───────────────────────────────────────────────────────── def cmd_cmd(args) -> int: """Send a command to a device via C3PO API and wait for response.""" config = Config() client = C3POClient(config) try: result = client.send_command(args.device_id, args.command, args.argv) except requests.exceptions.ConnectionError: err(f"Cannot connect to C3PO at {config.base_url}") err("Is C3PO running? Start with: cd tools/C3PO && python c3po.py --headless") return 1 except requests.exceptions.HTTPError as e: err(f"API error: {e.response.status_code} {e.response.text}") return 1 results = result.get("results", []) if not results: err("No results returned") return 1 # Handle multiple devices (broadcast) pending = [] for r in results: if r.get("status") == "ok": pending.append((r["device_id"], r["request_id"])) info(f"Sent to {r['device_id']} (req: {r['request_id'][:20]}...)") else: err(f"{r.get('device_id', '?')}: {r.get('message', 'error')}") if not pending: return 1 # Poll for results seen = {req_id: 0 for _, req_id in pending} start = time.time() completed = set() timeout = args.timeout while time.time() - start < timeout and len(completed) < len(pending): for device_id, req_id in pending: if req_id in completed: continue try: status = client.poll_command(req_id) except requests.exceptions.RequestException: time.sleep(POLL_INTERVAL) continue output = status.get("output", []) prefix = f"[{device_id}] " if len(pending) > 1 else "" for line in output[seen[req_id]:]: print(f"{prefix}{line}") seen[req_id] = len(output) if status.get("status") == "completed": completed.add(req_id) if len(completed) < len(pending): time.sleep(POLL_INTERVAL) if len(completed) == len(pending): ok(f"Command completed ({len(completed)} device(s))") return 0 warn(f"Timed out after {timeout}s ({len(completed)}/{len(pending)} completed)") return 1 # ─── Subcommand: status ────────────────────────────────────────────────────── def cmd_status(args) -> int: """List connected devices from C3PO.""" config = Config() client = C3POClient(config) try: data = client.list_devices() except requests.exceptions.ConnectionError: err(f"Cannot connect to C3PO at {config.base_url}") return 1 except requests.exceptions.HTTPError as e: err(f"API error: {e.response.status_code}") return 1 devices = data.get("devices", []) if not devices: info("No devices connected") return 0 print(f"\n {'ID':<16} {'IP':<16} {'Status':<12} {'Chip':<10} " f"{'Modules':<30} {'Uptime':<10}") print(f" {'-'*16} {'-'*16} {'-'*12} {'-'*10} {'-'*30} {'-'*10}") for d in devices: status_color = C.GRN if d.get("status") == "Connected" else C.YLW modules = d.get("modules", "") or "-" uptime = _format_duration(d.get("connected_for_seconds", 0)) print(f" {d['id']:<16} " f"{d.get('ip', '?'):<16} " f"{status_color}{d.get('status', '?'):<12}{C.RST} " f"{d.get('chip', '?'):<10} " f"{modules:<30} " f"{uptime:<10}") print(f"\n {len(devices)} device(s)\n") return 0 # ─── Subcommand: logs ──────────────────────────────────────────────────────── def cmd_logs(args) -> int: """List all log files.""" entries = LogManager.list_all() if not entries: info("No log files found") return 0 print(f"\n {'File':<40} {'Size':<10} {'Last Modified'}") print(f" {'-'*40} {'-'*10} {'-'*20}") total_size = 0 for e in entries: total_size += e["size"] mtime_str = e["mtime"].strftime("%Y-%m-%d %H:%M") print(f" {str(e['relative']):<40} " f"{_format_size(e['size']):<10} " f"{mtime_str}") print(f"\n {len(entries)} file(s), {_format_size(total_size)} total\n") return 0 # ─── Subcommand: stop ──────────────────────────────────────────────────────── def cmd_stop(args) -> int: """Stop the background monitor daemon.""" return stop_daemon() # ─── Parser ────────────────────────────────────────────────────────────────── def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="espmon", description="ESP32 serial monitor and C2 command interface", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ Examples: espmon monitor /dev/ttyUSB0 Monitor serial (foreground) espmon monitor /dev/ttyUSB0 --bg Monitor as background daemon espmon tail Tail most recent log espmon tail -f -n 100 Follow mode, last 100 lines espmon cmd 03d03f48 system_info Send command to device espmon cmd all system_mem Broadcast to all devices espmon status List connected devices espmon logs List log files espmon stop Stop background monitor """, ) parser.add_argument("--version", action="version", version=f"espmon {__version__}") sub = parser.add_subparsers(dest="subcommand") # monitor p_mon = sub.add_parser("monitor", aliases=["mon", "m"], help="Monitor ESP32 serial output") p_mon.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0)") p_mon.add_argument("--baud", "-b", type=int, default=DEFAULT_BAUD, help=f"Baud rate (default: {DEFAULT_BAUD})") p_mon.add_argument("--bg", action="store_true", help="Run as background daemon") p_mon.set_defaults(func=cmd_monitor) # tail p_tail = sub.add_parser("tail", aliases=["t"], help="Tail a log file") p_tail.add_argument("--port", "-p", help="Filter by serial port") p_tail.add_argument("--device", "-d", help="Filter by device ID") p_tail.add_argument("--lines", "-n", type=int, default=50, help="Number of lines (default: 50)") p_tail.add_argument("--follow", "-f", action="store_true", help="Follow mode (keep watching)") p_tail.set_defaults(func=cmd_tail) # cmd p_cmd = sub.add_parser("cmd", aliases=["c"], help="Send command via C3PO API") p_cmd.add_argument("device_id", help="Target device ID (or 'all' for broadcast)") p_cmd.add_argument("command", help="Command name") p_cmd.add_argument("argv", nargs="*", default=[], help="Command arguments") p_cmd.add_argument("--timeout", "-t", type=int, default=POLL_TIMEOUT, help=f"Poll timeout seconds (default: {POLL_TIMEOUT})") p_cmd.set_defaults(func=cmd_cmd) # status p_status = sub.add_parser("status", aliases=["s"], help="List connected devices") p_status.set_defaults(func=cmd_status) # logs p_logs = sub.add_parser("logs", aliases=["l"], help="List log files") p_logs.set_defaults(func=cmd_logs) # stop p_stop = sub.add_parser("stop", help="Stop background monitor") p_stop.set_defaults(func=cmd_stop) return parser # ─── Entry Point ────────────────────────────────────────────────────────────── def main() -> int: parser = build_parser() args = parser.parse_args() if not args.subcommand: parser.print_help() return 0 try: return args.func(args) except KeyboardInterrupt: print() return 130 except Exception as e: err(f"Unexpected error: {e}") return 1