Add deploy.py: multi-device build/flash/provision with JSON config. Add espmon/: real-time ESP32 fleet monitoring daemon. Remove tools/flasher/ and tools/provisioning/ (superseded).
104 lines
3.2 KiB
Python
104 lines
3.2 KiB
Python
"""Log file management — session paths, listing, tail."""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from espmon.config import LOGS_DIR, DEPLOY_JSON
|
|
|
|
|
|
class LogManager:
|
|
"""Manage serial monitor log files in tools/logs/."""
|
|
|
|
@staticmethod
|
|
def ensure_dirs():
|
|
"""Create the logs root directory if needed."""
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
|
|
@staticmethod
|
|
def session_path(port: str) -> Path:
|
|
"""Create a new log file path for a monitor session.
|
|
|
|
Format: tools/logs/<port_name>/YYYY-MM-DD_HHhMM.log
|
|
"""
|
|
port_name = port.replace("/dev/", "").replace("/", "_")
|
|
port_dir = LOGS_DIR / port_name
|
|
port_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%Hh%M")
|
|
path = port_dir / f"{timestamp}.log"
|
|
|
|
# Avoid collision if multiple sessions in same minute
|
|
if path.exists():
|
|
secs = datetime.now().strftime("%S")
|
|
path = port_dir / f"{timestamp}_{secs}.log"
|
|
|
|
return path
|
|
|
|
@staticmethod
|
|
def latest_log(port: str = None, device: str = None) -> Optional[Path]:
|
|
"""Find the most recent log file.
|
|
|
|
Args:
|
|
port: Filter by serial port (e.g. /dev/ttyUSB0)
|
|
device: Filter by device ID (looks up port in deploy.json)
|
|
"""
|
|
if device:
|
|
mapped = LogManager._device_to_port(device)
|
|
if mapped:
|
|
port = mapped
|
|
|
|
if port:
|
|
port_name = port.replace("/dev/", "").replace("/", "_")
|
|
port_dir = LOGS_DIR / port_name
|
|
if not port_dir.exists():
|
|
return None
|
|
logs = sorted(port_dir.glob("*.log"), key=lambda p: p.stat().st_mtime)
|
|
return logs[-1] if logs else None
|
|
|
|
# No filter: most recent across all ports
|
|
if not LOGS_DIR.exists():
|
|
return None
|
|
all_logs = sorted(LOGS_DIR.rglob("*.log"), key=lambda p: p.stat().st_mtime)
|
|
return all_logs[-1] if all_logs else None
|
|
|
|
@staticmethod
|
|
def list_all() -> list:
|
|
"""List all log files with metadata.
|
|
|
|
Returns list of dicts with: path, relative, size, mtime
|
|
"""
|
|
if not LOGS_DIR.exists():
|
|
return []
|
|
|
|
entries = []
|
|
for log_file in sorted(
|
|
LOGS_DIR.rglob("*.log"),
|
|
key=lambda p: p.stat().st_mtime,
|
|
reverse=True,
|
|
):
|
|
stat = log_file.stat()
|
|
entries.append({
|
|
"path": log_file,
|
|
"relative": log_file.relative_to(LOGS_DIR),
|
|
"size": stat.st_size,
|
|
"mtime": datetime.fromtimestamp(stat.st_mtime),
|
|
})
|
|
return entries
|
|
|
|
@staticmethod
|
|
def _device_to_port(device_id: str) -> Optional[str]:
|
|
"""Look up serial port for a device ID from deploy.json."""
|
|
if not DEPLOY_JSON.exists():
|
|
return None
|
|
try:
|
|
with open(DEPLOY_JSON) as f:
|
|
data = json.load(f)
|
|
for dev in data.get("devices", []):
|
|
if dev.get("device_id") == device_id:
|
|
return dev.get("port")
|
|
except (json.JSONDecodeError, KeyError, OSError):
|
|
pass
|
|
return None
|