""" CAN frame storage for C3PO server. Stores CAN frames received from agents via AGENT_DATA messages. Frame format: "CAN||||" """ import threading import time import collections from typing import List, Optional, Dict class CanFrame: """Parsed CAN frame from agent.""" __slots__ = ("device_id", "timestamp_ms", "can_id", "dlc", "data_hex", "received_at") def __init__(self, device_id: str, timestamp_ms: int, can_id: int, dlc: int, data_hex: str): self.device_id = device_id self.timestamp_ms = timestamp_ms self.can_id = can_id self.dlc = dlc self.data_hex = data_hex self.received_at = time.time() def to_dict(self) -> dict: return { "device_id": self.device_id, "timestamp_ms": self.timestamp_ms, "can_id": f"0x{self.can_id:03X}", "dlc": self.dlc, "data": self.data_hex, "received_at": self.received_at, } class CanStore: """Thread-safe ring buffer for CAN frames from all devices.""" def __init__(self, max_frames: int = 10000): self.frames: collections.deque = collections.deque(maxlen=max_frames) self.lock = threading.Lock() self.total_count = 0 def store_frame(self, device_id: str, raw_line: str): """Parse 'CAN|ts|id|dlc|data' and store.""" parts = raw_line.split("|") if len(parts) < 5 or parts[0] != "CAN": return try: ts_ms = int(parts[1]) can_id = int(parts[2], 16) dlc = int(parts[3]) data_hex = parts[4] except (ValueError, IndexError): return frame = CanFrame(device_id, ts_ms, can_id, dlc, data_hex) with self.lock: self.frames.append(frame) self.total_count += 1 def get_frames(self, device_id: Optional[str] = None, can_id: Optional[int] = None, limit: int = 100, offset: int = 0) -> List[Dict]: """Query frames with optional filters.""" with self.lock: filtered = list(self.frames) if device_id: filtered = [f for f in filtered if f.device_id == device_id] if can_id is not None: filtered = [f for f in filtered if f.can_id == can_id] # Most recent first filtered.reverse() # Paginate page = filtered[offset:offset + limit] return [f.to_dict() for f in page] def get_stats(self, device_id: Optional[str] = None) -> Dict: """Get frame count stats.""" with self.lock: frames = list(self.frames) if device_id: frames = [f for f in frames if f.device_id == device_id] unique_ids = set(f.can_id for f in frames) return { "total_stored": len(frames), "total_received": self.total_count, "unique_can_ids": len(unique_ids), "can_ids": sorted([f"0x{cid:03X}" for cid in unique_ids]), } def export_csv(self, device_id: Optional[str] = None) -> str: """Export frames as CSV string.""" with self.lock: frames = list(self.frames) if device_id: frames = [f for f in frames if f.device_id == device_id] lines = ["device_id,timestamp_ms,can_id,dlc,data"] for f in frames: lines.append(f"{f.device_id},{f.timestamp_ms},0x{f.can_id:03X},{f.dlc},{f.data_hex}") return "\n".join(lines)