espilon-source/tools/C3PO/web/routes/api_can.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

82 lines
2.5 KiB
Python

"""CAN bus frame API routes."""
from flask import Blueprint, jsonify, request, Response
def create_can_blueprint(server_config):
"""
Create the CAN bus API blueprint.
Args:
server_config: Dict with keys:
- get_can_store: Callable returning CanStore instance
- require_api_auth: Auth decorator
"""
bp = Blueprint("api_can", __name__, url_prefix="/api/can")
get_can_store = server_config["get_can_store"]
require_api_auth = server_config["require_api_auth"]
@bp.route("/frames", methods=["GET"])
@require_api_auth
def get_frames():
"""List CAN frames with optional filters."""
store = get_can_store()
if not store:
return jsonify({"error": "CAN store not available"}), 503
device_id = request.args.get("device_id")
can_id_str = request.args.get("can_id")
limit = request.args.get("limit", 100, type=int)
offset = request.args.get("offset", 0, type=int)
# Clamp
limit = max(1, min(limit, 1000))
offset = max(0, offset)
can_id = None
if can_id_str:
try:
can_id = int(can_id_str, 16) if can_id_str.startswith("0x") else int(can_id_str)
except ValueError:
return jsonify({"error": "Invalid can_id format"}), 400
frames = store.get_frames(device_id=device_id, can_id=can_id,
limit=limit, offset=offset)
return jsonify({
"frames": frames,
"count": len(frames),
"offset": offset,
"limit": limit,
})
@bp.route("/stats", methods=["GET"])
@require_api_auth
def get_stats():
"""Get CAN frame statistics."""
store = get_can_store()
if not store:
return jsonify({"error": "CAN store not available"}), 503
device_id = request.args.get("device_id")
return jsonify(store.get_stats(device_id=device_id))
@bp.route("/frames/export", methods=["GET"])
@require_api_auth
def export_csv():
"""Export CAN frames as CSV."""
store = get_can_store()
if not store:
return jsonify({"error": "CAN store not available"}), 503
device_id = request.args.get("device_id")
csv_data = store.export_csv(device_id=device_id)
return Response(
csv_data,
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=can_frames.csv"},
)
return bp