Add deploy.py: multi-device build/flash/provision with JSON config. Add espmon/: real-time ESP32 fleet monitoring daemon. Remove tools/flasher/ and tools/provisioning/ (superseded).
104 lines
3.2 KiB
Python
104 lines
3.2 KiB
Python
"""Serial port monitor — read, display, and log ESP32 output."""
|
|
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from espmon.colors import info, err
|
|
from espmon.config import DEFAULT_BAUD, strip_ansi
|
|
|
|
|
|
class SerialMonitor:
|
|
"""Read a serial port, print to stdout with colors, log to file without ANSI."""
|
|
|
|
def __init__(self, port: str, baud: int = DEFAULT_BAUD, log_path: Path = None):
|
|
self.port = port
|
|
self.baud = baud
|
|
self.log_path = log_path
|
|
self._stop = threading.Event()
|
|
self._log_file = None
|
|
|
|
def run(self) -> int:
|
|
"""Run the monitor (blocking). Returns exit code."""
|
|
import serial
|
|
|
|
try:
|
|
# Open without toggling DTR/RTS to avoid resetting the ESP32
|
|
ser = serial.Serial()
|
|
ser.port = self.port
|
|
ser.baudrate = self.baud
|
|
ser.timeout = 0.5
|
|
ser.dtr = False
|
|
ser.rts = False
|
|
ser.open()
|
|
except serial.SerialException as e:
|
|
err(f"Cannot open {self.port}: {e}")
|
|
return 1
|
|
|
|
if self.log_path:
|
|
self.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._log_file = open(self.log_path, 'a', encoding='utf-8')
|
|
info(f"Logging to {self.log_path}")
|
|
|
|
info(f"Monitoring {self.port} @ {self.baud} baud (Ctrl+C to stop)")
|
|
|
|
if self._log_file:
|
|
self._log_file.write(
|
|
f"# espmon session started {datetime.now().isoformat()}\n"
|
|
)
|
|
self._log_file.write(
|
|
f"# port: {self.port} baud: {self.baud}\n"
|
|
)
|
|
self._log_file.flush()
|
|
|
|
buf = b""
|
|
try:
|
|
while not self._stop.is_set():
|
|
try:
|
|
chunk = ser.read(256)
|
|
except serial.SerialException:
|
|
if self._stop.is_set():
|
|
break
|
|
err(f"Serial read error on {self.port}")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if not chunk:
|
|
continue
|
|
|
|
buf += chunk
|
|
while b"\n" in buf:
|
|
line_bytes, buf = buf.split(b"\n", 1)
|
|
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r")
|
|
|
|
# stdout with original ANSI colors
|
|
print(line, flush=True)
|
|
|
|
# log file: timestamped, ANSI stripped
|
|
if self._log_file:
|
|
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
|
|
clean = strip_ansi(line)
|
|
self._log_file.write(f"[{ts}] {clean}\n")
|
|
self._log_file.flush()
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
try:
|
|
ser.close()
|
|
except Exception:
|
|
pass
|
|
if self._log_file:
|
|
self._log_file.write(
|
|
f"# espmon session ended {datetime.now().isoformat()}\n"
|
|
)
|
|
self._log_file.close()
|
|
info("Monitor stopped")
|
|
|
|
return 0
|
|
|
|
def stop(self):
|
|
"""Signal the monitor to stop."""
|
|
self._stop.set()
|