Add deploy.py: multi-device build/flash/provision with JSON config. Add espmon/: real-time ESP32 fleet monitoring daemon. Remove tools/flasher/ and tools/provisioning/ (superseded).
85 lines
2.2 KiB
Python
85 lines
2.2 KiB
Python
"""Configuration, paths, and constants for espmon."""
|
|
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
|
|
|
|
# Paths
|
|
SCRIPT_DIR = Path(__file__).resolve().parent # tools/espmon/
|
|
TOOLS_DIR = SCRIPT_DIR.parent # tools/
|
|
LOGS_DIR = TOOLS_DIR / "logs"
|
|
C3PO_DIR = TOOLS_DIR / "C3PO"
|
|
ENV_FILE = C3PO_DIR / ".env"
|
|
DEPLOY_JSON = TOOLS_DIR / "deploy.json"
|
|
PID_FILE = LOGS_DIR / ".espmon.pid"
|
|
|
|
# Serial
|
|
DEFAULT_BAUD = 115200
|
|
|
|
# API polling
|
|
POLL_INTERVAL = 0.5 # seconds between polls
|
|
POLL_TIMEOUT = 120 # max wait for command result
|
|
|
|
# ANSI escape codes
|
|
ANSI_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]|\x1b\(B')
|
|
|
|
|
|
def strip_ansi(text: str) -> str:
|
|
"""Remove ANSI escape codes from text."""
|
|
return ANSI_RE.sub('', text)
|
|
|
|
|
|
class Config:
|
|
"""C3PO connection configuration.
|
|
|
|
Reads from (in order of priority):
|
|
1. Environment variables: ESPMON_HOST, ESPMON_PORT, ESPMON_TOKEN
|
|
2. tools/C3PO/.env file
|
|
3. Hardcoded defaults
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.host = "localhost"
|
|
self.port = 8000
|
|
self.token = ""
|
|
self._load()
|
|
|
|
def _load(self):
|
|
env_vars = {}
|
|
if ENV_FILE.exists():
|
|
with open(ENV_FILE) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
if '=' in line:
|
|
key, _, value = line.partition('=')
|
|
env_vars[key.strip()] = value.strip().strip('"').strip("'")
|
|
|
|
# Host
|
|
self.host = (
|
|
os.environ.get("ESPMON_HOST")
|
|
or env_vars.get("WEB_HOST", "").replace("0.0.0.0", "localhost")
|
|
or "localhost"
|
|
)
|
|
|
|
# Port
|
|
try:
|
|
self.port = int(
|
|
os.environ.get("ESPMON_PORT")
|
|
or env_vars.get("WEB_PORT", "8000")
|
|
)
|
|
except ValueError:
|
|
self.port = 8000
|
|
|
|
# Auth token
|
|
self.token = (
|
|
os.environ.get("ESPMON_TOKEN")
|
|
or env_vars.get("MULTILAT_AUTH_TOKEN", "")
|
|
)
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
return f"http://{self.host}:{self.port}"
|