espilon-source/tools/espmon/config.py
Eun0us 12b851581a tools: replace flasher/provisioning with unified deploy system + espmon
Add deploy.py: multi-device build/flash/provision with JSON config.
Add espmon/: real-time ESP32 fleet monitoring daemon.
Remove tools/flasher/ and tools/provisioning/ (superseded).
2026-02-28 20:15:57 +01:00

85 lines
2.2 KiB
Python

"""Configuration, paths, and constants for espmon."""
import os
import re
from pathlib import Path
# Paths
SCRIPT_DIR = Path(__file__).resolve().parent # tools/espmon/
TOOLS_DIR = SCRIPT_DIR.parent # tools/
LOGS_DIR = TOOLS_DIR / "logs"
C3PO_DIR = TOOLS_DIR / "C3PO"
ENV_FILE = C3PO_DIR / ".env"
DEPLOY_JSON = TOOLS_DIR / "deploy.json"
PID_FILE = LOGS_DIR / ".espmon.pid"
# Serial
DEFAULT_BAUD = 115200
# API polling
POLL_INTERVAL = 0.5 # seconds between polls
POLL_TIMEOUT = 120 # max wait for command result
# ANSI escape codes
ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\(B')
def strip_ansi(text: str) -> str:
"""Remove ANSI escape codes from text."""
return ANSI_RE.sub('', text)
class Config:
"""C3PO connection configuration.
Reads from (in order of priority):
1. Environment variables: ESPMON_HOST, ESPMON_PORT, ESPMON_TOKEN
2. tools/C3PO/.env file
3. Hardcoded defaults
"""
def __init__(self):
self.host = "localhost"
self.port = 8000
self.token = ""
self._load()
def _load(self):
env_vars = {}
if ENV_FILE.exists():
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, _, value = line.partition('=')
env_vars[key.strip()] = value.strip().strip('"').strip("'")
# Host
self.host = (
os.environ.get("ESPMON_HOST")
or env_vars.get("WEB_HOST", "").replace("0.0.0.0", "localhost")
or "localhost"
)
# Port
try:
self.port = int(
os.environ.get("ESPMON_PORT")
or env_vars.get("WEB_PORT", "8000")
)
except ValueError:
self.port = 8000
# Auth token
self.token = (
os.environ.get("ESPMON_TOKEN")
or env_vars.get("MULTILAT_AUTH_TOKEN", "")
)
@property
def base_url(self) -> str:
return f"http://{self.host}:{self.port}"