Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
"""
|
|
CAN frame storage for C3PO server.
|
|
|
|
Stores CAN frames received from agents via AGENT_DATA messages.
|
|
Frame format: "CAN|<timestamp_ms>|<id_hex>|<dlc>|<data_hex>"
|
|
"""
|
|
|
|
import threading
|
|
import time
|
|
import collections
|
|
from typing import List, Optional, Dict
|
|
|
|
|
|
class CanFrame:
|
|
"""Parsed CAN frame from agent."""
|
|
__slots__ = ("device_id", "timestamp_ms", "can_id", "dlc", "data_hex", "received_at")
|
|
|
|
def __init__(self, device_id: str, timestamp_ms: int, can_id: int,
|
|
dlc: int, data_hex: str):
|
|
self.device_id = device_id
|
|
self.timestamp_ms = timestamp_ms
|
|
self.can_id = can_id
|
|
self.dlc = dlc
|
|
self.data_hex = data_hex
|
|
self.received_at = time.time()
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"device_id": self.device_id,
|
|
"timestamp_ms": self.timestamp_ms,
|
|
"can_id": f"0x{self.can_id:03X}",
|
|
"dlc": self.dlc,
|
|
"data": self.data_hex,
|
|
"received_at": self.received_at,
|
|
}
|
|
|
|
|
|
class CanStore:
|
|
"""Thread-safe ring buffer for CAN frames from all devices."""
|
|
|
|
def __init__(self, max_frames: int = 10000):
|
|
self.frames: collections.deque = collections.deque(maxlen=max_frames)
|
|
self.lock = threading.Lock()
|
|
self.total_count = 0
|
|
|
|
def store_frame(self, device_id: str, raw_line: str):
|
|
"""Parse 'CAN|ts|id|dlc|data' and store."""
|
|
parts = raw_line.split("|")
|
|
if len(parts) < 5 or parts[0] != "CAN":
|
|
return
|
|
|
|
try:
|
|
ts_ms = int(parts[1])
|
|
can_id = int(parts[2], 16)
|
|
dlc = int(parts[3])
|
|
data_hex = parts[4]
|
|
except (ValueError, IndexError):
|
|
return
|
|
|
|
frame = CanFrame(device_id, ts_ms, can_id, dlc, data_hex)
|
|
with self.lock:
|
|
self.frames.append(frame)
|
|
self.total_count += 1
|
|
|
|
def get_frames(self, device_id: Optional[str] = None,
|
|
can_id: Optional[int] = None,
|
|
limit: int = 100, offset: int = 0) -> List[Dict]:
|
|
"""Query frames with optional filters."""
|
|
with self.lock:
|
|
filtered = list(self.frames)
|
|
|
|
if device_id:
|
|
filtered = [f for f in filtered if f.device_id == device_id]
|
|
if can_id is not None:
|
|
filtered = [f for f in filtered if f.can_id == can_id]
|
|
|
|
# Most recent first
|
|
filtered.reverse()
|
|
|
|
# Paginate
|
|
page = filtered[offset:offset + limit]
|
|
return [f.to_dict() for f in page]
|
|
|
|
def get_stats(self, device_id: Optional[str] = None) -> Dict:
|
|
"""Get frame count stats."""
|
|
with self.lock:
|
|
frames = list(self.frames)
|
|
|
|
if device_id:
|
|
frames = [f for f in frames if f.device_id == device_id]
|
|
|
|
unique_ids = set(f.can_id for f in frames)
|
|
|
|
return {
|
|
"total_stored": len(frames),
|
|
"total_received": self.total_count,
|
|
"unique_can_ids": len(unique_ids),
|
|
"can_ids": sorted([f"0x{cid:03X}" for cid in unique_ids]),
|
|
}
|
|
|
|
def export_csv(self, device_id: Optional[str] = None) -> str:
|
|
"""Export frames as CSV string."""
|
|
with self.lock:
|
|
frames = list(self.frames)
|
|
|
|
if device_id:
|
|
frames = [f for f in frames if f.device_id == device_id]
|
|
|
|
lines = ["device_id,timestamp_ms,can_id,dlc,data"]
|
|
for f in frames:
|
|
lines.append(f"{f.device_id},{f.timestamp_ms},0x{f.can_id:03X},{f.dlc},{f.data_hex}")
|
|
|
|
return "\n".join(lines)
|