espilon-source/tools/C3PO/core/can_store.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

114 lines
3.5 KiB
Python

"""
CAN frame storage for C3PO server.
Stores CAN frames received from agents via AGENT_DATA messages.
Frame format: "CAN|<timestamp_ms>|<id_hex>|<dlc>|<data_hex>"
"""
import threading
import time
import collections
from typing import List, Optional, Dict
class CanFrame:
"""Parsed CAN frame from agent."""
__slots__ = ("device_id", "timestamp_ms", "can_id", "dlc", "data_hex", "received_at")
def __init__(self, device_id: str, timestamp_ms: int, can_id: int,
dlc: int, data_hex: str):
self.device_id = device_id
self.timestamp_ms = timestamp_ms
self.can_id = can_id
self.dlc = dlc
self.data_hex = data_hex
self.received_at = time.time()
def to_dict(self) -> dict:
return {
"device_id": self.device_id,
"timestamp_ms": self.timestamp_ms,
"can_id": f"0x{self.can_id:03X}",
"dlc": self.dlc,
"data": self.data_hex,
"received_at": self.received_at,
}
class CanStore:
"""Thread-safe ring buffer for CAN frames from all devices."""
def __init__(self, max_frames: int = 10000):
self.frames: collections.deque = collections.deque(maxlen=max_frames)
self.lock = threading.Lock()
self.total_count = 0
def store_frame(self, device_id: str, raw_line: str):
"""Parse 'CAN|ts|id|dlc|data' and store."""
parts = raw_line.split("|")
if len(parts) < 5 or parts[0] != "CAN":
return
try:
ts_ms = int(parts[1])
can_id = int(parts[2], 16)
dlc = int(parts[3])
data_hex = parts[4]
except (ValueError, IndexError):
return
frame = CanFrame(device_id, ts_ms, can_id, dlc, data_hex)
with self.lock:
self.frames.append(frame)
self.total_count += 1
def get_frames(self, device_id: Optional[str] = None,
can_id: Optional[int] = None,
limit: int = 100, offset: int = 0) -> List[Dict]:
"""Query frames with optional filters."""
with self.lock:
filtered = list(self.frames)
if device_id:
filtered = [f for f in filtered if f.device_id == device_id]
if can_id is not None:
filtered = [f for f in filtered if f.can_id == can_id]
# Most recent first
filtered.reverse()
# Paginate
page = filtered[offset:offset + limit]
return [f.to_dict() for f in page]
def get_stats(self, device_id: Optional[str] = None) -> Dict:
"""Get frame count stats."""
with self.lock:
frames = list(self.frames)
if device_id:
frames = [f for f in frames if f.device_id == device_id]
unique_ids = set(f.can_id for f in frames)
return {
"total_stored": len(frames),
"total_received": self.total_count,
"unique_can_ids": len(unique_ids),
"can_ids": sorted([f"0x{cid:03X}" for cid in unique_ids]),
}
def export_csv(self, device_id: Optional[str] = None) -> str:
"""Export frames as CSV string."""
with self.lock:
frames = list(self.frames)
if device_id:
frames = [f for f in frames if f.device_id == device_id]
lines = ["device_id,timestamp_ms,can_id,dlc,data"]
for f in frames:
lines.append(f"{f.device_id},{f.timestamp_ms},0x{f.can_id:03X},{f.dlc},{f.data_hex}")
return "\n".join(lines)