espilon-source/tools/espmon/monitor.py
Eun0us 12b851581a tools: replace flasher/provisioning with unified deploy system + espmon
Add deploy.py: multi-device build/flash/provision with JSON config.
Add espmon/: real-time ESP32 fleet monitoring daemon.
Remove tools/flasher/ and tools/provisioning/ (superseded).
2026-02-28 20:15:57 +01:00

104 lines
3.2 KiB
Python

"""Serial port monitor — read, display, and log ESP32 output."""
import threading
import time
from datetime import datetime
from pathlib import Path
from espmon.colors import info, err
from espmon.config import DEFAULT_BAUD, strip_ansi
class SerialMonitor:
"""Read a serial port, print to stdout with colors, log to file without ANSI."""
def __init__(self, port: str, baud: int = DEFAULT_BAUD, log_path: Path = None):
self.port = port
self.baud = baud
self.log_path = log_path
self._stop = threading.Event()
self._log_file = None
def run(self) -> int:
"""Run the monitor (blocking). Returns exit code."""
import serial
try:
# Open without toggling DTR/RTS to avoid resetting the ESP32
ser = serial.Serial()
ser.port = self.port
ser.baudrate = self.baud
ser.timeout = 0.5
ser.dtr = False
ser.rts = False
ser.open()
except serial.SerialException as e:
err(f"Cannot open {self.port}: {e}")
return 1
if self.log_path:
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._log_file = open(self.log_path, 'a', encoding='utf-8')
info(f"Logging to {self.log_path}")
info(f"Monitoring {self.port} @ {self.baud} baud (Ctrl+C to stop)")
if self._log_file:
self._log_file.write(
f"# espmon session started {datetime.now().isoformat()}\n"
)
self._log_file.write(
f"# port: {self.port} baud: {self.baud}\n"
)
self._log_file.flush()
buf = b""
try:
while not self._stop.is_set():
try:
chunk = ser.read(256)
except serial.SerialException:
if self._stop.is_set():
break
err(f"Serial read error on {self.port}")
time.sleep(0.5)
continue
if not chunk:
continue
buf += chunk
while b"\n" in buf:
line_bytes, buf = buf.split(b"\n", 1)
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r")
# stdout with original ANSI colors
print(line, flush=True)
# log file: timestamped, ANSI stripped
if self._log_file:
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
clean = strip_ansi(line)
self._log_file.write(f"[{ts}] {clean}\n")
self._log_file.flush()
except KeyboardInterrupt:
pass
finally:
try:
ser.close()
except Exception:
pass
if self._log_file:
self._log_file.write(
f"# espmon session ended {datetime.now().isoformat()}\n"
)
self._log_file.close()
info("Monitor stopped")
return 0
def stop(self):
"""Signal the monitor to stop."""
self._stop.set()