espilon-source/tools/espmon/cli.py
Eun0us 12b851581a tools: replace flasher/provisioning with unified deploy system + espmon
Add deploy.py: multi-device build/flash/provision with JSON config.
Add espmon/: real-time ESP32 fleet monitoring daemon.
Remove tools/flasher/ and tools/provisioning/ (superseded).
2026-02-28 20:15:57 +01:00

347 lines
12 KiB
Python

"""CLI parser and subcommand handlers."""
import argparse
import os
import time
import sys
import requests
from espmon import __version__
from espmon.colors import C, ok, info, err, warn
from espmon.config import Config, DEFAULT_BAUD, POLL_INTERVAL, POLL_TIMEOUT
from espmon.client import C3POClient
from espmon.logs import LogManager
from espmon.monitor import SerialMonitor
from espmon.daemon import daemonize, stop_daemon
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _format_duration(seconds: float) -> str:
"""Format seconds into a human-readable duration."""
s = int(seconds)
if s < 60:
return f"{s}s"
if s < 3600:
return f"{s // 60}m{s % 60:02d}s"
h = s // 3600
m = (s % 3600) // 60
return f"{h}h{m:02d}m"
def _format_size(size: int) -> str:
"""Format bytes into human-readable size."""
if size > 1024 * 1024:
return f"{size / (1024 * 1024):.1f} MB"
if size > 1024:
return f"{size / 1024:.1f} KB"
return f"{size} B"
def _detect_ports() -> list:
"""Detect available serial ports."""
from glob import glob
return sorted(glob("/dev/ttyUSB*") + glob("/dev/ttyACM*"))
# ─── Subcommand: monitor ────────────────────────────────────────────────────
def cmd_monitor(args) -> int:
"""Monitor a serial port, logging output to file."""
port = args.port
if not os.path.exists(port):
err(f"Port {port} not found")
available = _detect_ports()
if available:
info(f"Available ports: {', '.join(available)}")
return 1
LogManager.ensure_dirs()
if args.bg:
return daemonize(port, args.baud)
# Foreground mode
log_path = LogManager.session_path(port)
monitor = SerialMonitor(port, args.baud, log_path)
return monitor.run()
# ─── Subcommand: tail ────────────────────────────────────────────────────────
def cmd_tail(args) -> int:
"""Tail the most recent log file."""
log_path = LogManager.latest_log(port=args.port, device=args.device)
if not log_path:
err("No log files found")
info("Start monitoring first: espmon monitor /dev/ttyUSB0")
return 1
info(f"{log_path}")
with open(log_path, 'r') as f:
lines = f.readlines()
n = args.lines
for line in lines[-n:]:
print(line, end='')
if args.follow:
# Follow mode: keep watching for new lines
try:
with open(log_path, 'r') as f:
f.seek(0, 2) # seek to end
while True:
line = f.readline()
if line:
print(line, end='', flush=True)
else:
time.sleep(0.2)
except KeyboardInterrupt:
pass
return 0
# ─── Subcommand: cmd ─────────────────────────────────────────────────────────
def cmd_cmd(args) -> int:
"""Send a command to a device via C3PO API and wait for response."""
config = Config()
client = C3POClient(config)
try:
result = client.send_command(args.device_id, args.command, args.argv)
except requests.exceptions.ConnectionError:
err(f"Cannot connect to C3PO at {config.base_url}")
err("Is C3PO running? Start with: cd tools/C3PO && python c3po.py --headless")
return 1
except requests.exceptions.HTTPError as e:
err(f"API error: {e.response.status_code} {e.response.text}")
return 1
results = result.get("results", [])
if not results:
err("No results returned")
return 1
# Handle multiple devices (broadcast)
pending = []
for r in results:
if r.get("status") == "ok":
pending.append((r["device_id"], r["request_id"]))
info(f"Sent to {r['device_id']} (req: {r['request_id'][:20]}...)")
else:
err(f"{r.get('device_id', '?')}: {r.get('message', 'error')}")
if not pending:
return 1
# Poll for results
seen = {req_id: 0 for _, req_id in pending}
start = time.time()
completed = set()
timeout = args.timeout
while time.time() - start < timeout and len(completed) < len(pending):
for device_id, req_id in pending:
if req_id in completed:
continue
try:
status = client.poll_command(req_id)
except requests.exceptions.RequestException:
time.sleep(POLL_INTERVAL)
continue
output = status.get("output", [])
prefix = f"[{device_id}] " if len(pending) > 1 else ""
for line in output[seen[req_id]:]:
print(f"{prefix}{line}")
seen[req_id] = len(output)
if status.get("status") == "completed":
completed.add(req_id)
if len(completed) < len(pending):
time.sleep(POLL_INTERVAL)
if len(completed) == len(pending):
ok(f"Command completed ({len(completed)} device(s))")
return 0
warn(f"Timed out after {timeout}s ({len(completed)}/{len(pending)} completed)")
return 1
# ─── Subcommand: status ──────────────────────────────────────────────────────
def cmd_status(args) -> int:
"""List connected devices from C3PO."""
config = Config()
client = C3POClient(config)
try:
data = client.list_devices()
except requests.exceptions.ConnectionError:
err(f"Cannot connect to C3PO at {config.base_url}")
return 1
except requests.exceptions.HTTPError as e:
err(f"API error: {e.response.status_code}")
return 1
devices = data.get("devices", [])
if not devices:
info("No devices connected")
return 0
print(f"\n {'ID':<16} {'IP':<16} {'Status':<12} {'Chip':<10} "
f"{'Modules':<30} {'Uptime':<10}")
print(f" {'-'*16} {'-'*16} {'-'*12} {'-'*10} {'-'*30} {'-'*10}")
for d in devices:
status_color = C.GRN if d.get("status") == "Connected" else C.YLW
modules = d.get("modules", "") or "-"
uptime = _format_duration(d.get("connected_for_seconds", 0))
print(f" {d['id']:<16} "
f"{d.get('ip', '?'):<16} "
f"{status_color}{d.get('status', '?'):<12}{C.RST} "
f"{d.get('chip', '?'):<10} "
f"{modules:<30} "
f"{uptime:<10}")
print(f"\n {len(devices)} device(s)\n")
return 0
# ─── Subcommand: logs ────────────────────────────────────────────────────────
def cmd_logs(args) -> int:
"""List all log files."""
entries = LogManager.list_all()
if not entries:
info("No log files found")
return 0
print(f"\n {'File':<40} {'Size':<10} {'Last Modified'}")
print(f" {'-'*40} {'-'*10} {'-'*20}")
total_size = 0
for e in entries:
total_size += e["size"]
mtime_str = e["mtime"].strftime("%Y-%m-%d %H:%M")
print(f" {str(e['relative']):<40} "
f"{_format_size(e['size']):<10} "
f"{mtime_str}")
print(f"\n {len(entries)} file(s), {_format_size(total_size)} total\n")
return 0
# ─── Subcommand: stop ────────────────────────────────────────────────────────
def cmd_stop(args) -> int:
"""Stop the background monitor daemon."""
return stop_daemon()
# ─── Parser ──────────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="espmon",
description="ESP32 serial monitor and C2 command interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
espmon monitor /dev/ttyUSB0 Monitor serial (foreground)
espmon monitor /dev/ttyUSB0 --bg Monitor as background daemon
espmon tail Tail most recent log
espmon tail -f -n 100 Follow mode, last 100 lines
espmon cmd 03d03f48 system_info Send command to device
espmon cmd all system_mem Broadcast to all devices
espmon status List connected devices
espmon logs List log files
espmon stop Stop background monitor
""",
)
parser.add_argument("--version", action="version",
version=f"espmon {__version__}")
sub = parser.add_subparsers(dest="subcommand")
# monitor
p_mon = sub.add_parser("monitor", aliases=["mon", "m"],
help="Monitor ESP32 serial output")
p_mon.add_argument("port", help="Serial port (e.g. /dev/ttyUSB0)")
p_mon.add_argument("--baud", "-b", type=int, default=DEFAULT_BAUD,
help=f"Baud rate (default: {DEFAULT_BAUD})")
p_mon.add_argument("--bg", action="store_true",
help="Run as background daemon")
p_mon.set_defaults(func=cmd_monitor)
# tail
p_tail = sub.add_parser("tail", aliases=["t"],
help="Tail a log file")
p_tail.add_argument("--port", "-p", help="Filter by serial port")
p_tail.add_argument("--device", "-d", help="Filter by device ID")
p_tail.add_argument("--lines", "-n", type=int, default=50,
help="Number of lines (default: 50)")
p_tail.add_argument("--follow", "-f", action="store_true",
help="Follow mode (keep watching)")
p_tail.set_defaults(func=cmd_tail)
# cmd
p_cmd = sub.add_parser("cmd", aliases=["c"],
help="Send command via C3PO API")
p_cmd.add_argument("device_id",
help="Target device ID (or 'all' for broadcast)")
p_cmd.add_argument("command", help="Command name")
p_cmd.add_argument("argv", nargs="*", default=[],
help="Command arguments")
p_cmd.add_argument("--timeout", "-t", type=int, default=POLL_TIMEOUT,
help=f"Poll timeout seconds (default: {POLL_TIMEOUT})")
p_cmd.set_defaults(func=cmd_cmd)
# status
p_status = sub.add_parser("status", aliases=["s"],
help="List connected devices")
p_status.set_defaults(func=cmd_status)
# logs
p_logs = sub.add_parser("logs", aliases=["l"],
help="List log files")
p_logs.set_defaults(func=cmd_logs)
# stop
p_stop = sub.add_parser("stop", help="Stop background monitor")
p_stop.set_defaults(func=cmd_stop)
return parser
# ─── Entry Point ──────────────────────────────────────────────────────────────
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if not args.subcommand:
parser.print_help()
return 0
try:
return args.func(args)
except KeyboardInterrupt:
print()
return 130
except Exception as e:
err(f"Unexpected error: {e}")
return 1