espilon-source/tools/C3PO/log/manager.py
Eun0us 8b6c1cd53d ε - ChaCha20-Poly1305 AEAD + HKDF crypto upgrade + C3PO rewrite + docs
Crypto:
- Replace broken ChaCha20 (static nonce) with ChaCha20-Poly1305 AEAD
- HKDF-SHA256 key derivation from per-device factory NVS master keys
- Random 12-byte nonce per message (ESP32 hardware RNG)
- crypto_init/encrypt/decrypt API with mbedtls legacy (ESP-IDF v5.3.2)
- Custom partition table with factory NVS (fctry at 0x10000)

Firmware:
- crypto.c full rewrite, messages.c device_id prefix + AEAD encrypt
- crypto_init() at boot with esp_restart() on failure
- Fix command_t initializations across all modules (sub/help fields)
- Clean CMakeLists dependencies for ESP-IDF v5.3.2

C3PO (C2):
- Rename tools/c2 + tools/c3po -> tools/C3PO
- Per-device CryptoContext with HKDF key derivation
- KeyStore (keys.json) for master key management
- Transport parses device_id:base64(...) wire format

Tools:
- New tools/provisioning/provision.py for factory NVS key generation
- Updated flasher with mbedtls config for v5.3.2

Docs:
- Update all READMEs for new crypto, C3PO paths, provisioning
- Update roadmap, architecture diagrams, security sections
- Update CONTRIBUTING.md project structure
2026-02-10 21:28:45 +01:00

67 lines
1.9 KiB
Python

"""Log manager for storing device messages."""
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class LogEntry:
"""A single log entry from a device."""
timestamp: float
device_id: str
msg_type: str
source: str
payload: str
request_id: Optional[str] = None
class LogManager:
"""Manages log storage for device messages."""
def __init__(self, max_entries_per_device: int = 1000):
self.max_entries = max_entries_per_device
self._logs: Dict[str, List[LogEntry]] = {}
def add(self, device_id: str, msg_type: str, source: str, payload: str, request_id: str = None):
if device_id not in self._logs:
self._logs[device_id] = []
entry = LogEntry(
timestamp=time.time(),
device_id=device_id,
msg_type=msg_type,
source=source,
payload=payload,
request_id=request_id
)
self._logs[device_id].append(entry)
if len(self._logs[device_id]) > self.max_entries:
self._logs[device_id] = self._logs[device_id][-self.max_entries:]
def get_logs(self, device_id: str, limit: int = 100) -> List[LogEntry]:
if device_id not in self._logs:
return []
return self._logs[device_id][-limit:]
def get_all_logs(self, limit: int = 100) -> List[LogEntry]:
all_entries = []
for entries in self._logs.values():
all_entries.extend(entries)
all_entries.sort(key=lambda e: e.timestamp)
return all_entries[-limit:]
def clear(self, device_id: str = None):
if device_id:
self._logs.pop(device_id, None)
else:
self._logs.clear()
def device_count(self) -> int:
return len(self._logs)
def total_entries(self) -> int:
return sum(len(entries) for entries in self._logs.values())