"""Serial port monitor — read, display, and log ESP32 output.""" import threading import time from datetime import datetime from pathlib import Path from espmon.colors import info, err from espmon.config import DEFAULT_BAUD, strip_ansi class SerialMonitor: """Read a serial port, print to stdout with colors, log to file without ANSI.""" def __init__(self, port: str, baud: int = DEFAULT_BAUD, log_path: Path = None): self.port = port self.baud = baud self.log_path = log_path self._stop = threading.Event() self._log_file = None def run(self) -> int: """Run the monitor (blocking). Returns exit code.""" import serial try: # Open without toggling DTR/RTS to avoid resetting the ESP32 ser = serial.Serial() ser.port = self.port ser.baudrate = self.baud ser.timeout = 0.5 ser.dtr = False ser.rts = False ser.open() except serial.SerialException as e: err(f"Cannot open {self.port}: {e}") return 1 if self.log_path: self.log_path.parent.mkdir(parents=True, exist_ok=True) self._log_file = open(self.log_path, 'a', encoding='utf-8') info(f"Logging to {self.log_path}") info(f"Monitoring {self.port} @ {self.baud} baud (Ctrl+C to stop)") if self._log_file: self._log_file.write( f"# espmon session started {datetime.now().isoformat()}\n" ) self._log_file.write( f"# port: {self.port} baud: {self.baud}\n" ) self._log_file.flush() buf = b"" try: while not self._stop.is_set(): try: chunk = ser.read(256) except serial.SerialException: if self._stop.is_set(): break err(f"Serial read error on {self.port}") time.sleep(0.5) continue if not chunk: continue buf += chunk while b"\n" in buf: line_bytes, buf = buf.split(b"\n", 1) line = line_bytes.decode("utf-8", errors="replace").rstrip("\r") # stdout with original ANSI colors print(line, flush=True) # log file: timestamped, ANSI stripped if self._log_file: ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] clean = strip_ansi(line) self._log_file.write(f"[{ts}] {clean}\n") self._log_file.flush() except KeyboardInterrupt: pass finally: try: ser.close() except Exception: pass if self._log_file: self._log_file.write( f"# espmon session ended {datetime.now().isoformat()}\n" ) self._log_file.close() info("Monitor stopped") return 0 def stop(self): """Signal the monitor to stop.""" self._stop.set()