Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
82 lines
2.5 KiB
Python
82 lines
2.5 KiB
Python
"""CAN bus frame API routes."""
|
|
|
|
from flask import Blueprint, jsonify, request, Response
|
|
|
|
|
|
def create_can_blueprint(server_config):
|
|
"""
|
|
Create the CAN bus API blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- get_can_store: Callable returning CanStore instance
|
|
- require_api_auth: Auth decorator
|
|
"""
|
|
bp = Blueprint("api_can", __name__, url_prefix="/api/can")
|
|
|
|
get_can_store = server_config["get_can_store"]
|
|
require_api_auth = server_config["require_api_auth"]
|
|
|
|
@bp.route("/frames", methods=["GET"])
|
|
@require_api_auth
|
|
def get_frames():
|
|
"""List CAN frames with optional filters."""
|
|
store = get_can_store()
|
|
if not store:
|
|
return jsonify({"error": "CAN store not available"}), 503
|
|
|
|
device_id = request.args.get("device_id")
|
|
can_id_str = request.args.get("can_id")
|
|
limit = request.args.get("limit", 100, type=int)
|
|
offset = request.args.get("offset", 0, type=int)
|
|
|
|
# Clamp
|
|
limit = max(1, min(limit, 1000))
|
|
offset = max(0, offset)
|
|
|
|
can_id = None
|
|
if can_id_str:
|
|
try:
|
|
can_id = int(can_id_str, 16) if can_id_str.startswith("0x") else int(can_id_str)
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid can_id format"}), 400
|
|
|
|
frames = store.get_frames(device_id=device_id, can_id=can_id,
|
|
limit=limit, offset=offset)
|
|
return jsonify({
|
|
"frames": frames,
|
|
"count": len(frames),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
})
|
|
|
|
@bp.route("/stats", methods=["GET"])
|
|
@require_api_auth
|
|
def get_stats():
|
|
"""Get CAN frame statistics."""
|
|
store = get_can_store()
|
|
if not store:
|
|
return jsonify({"error": "CAN store not available"}), 503
|
|
|
|
device_id = request.args.get("device_id")
|
|
return jsonify(store.get_stats(device_id=device_id))
|
|
|
|
@bp.route("/frames/export", methods=["GET"])
|
|
@require_api_auth
|
|
def export_csv():
|
|
"""Export CAN frames as CSV."""
|
|
store = get_can_store()
|
|
if not store:
|
|
return jsonify({"error": "CAN store not available"}), 503
|
|
|
|
device_id = request.args.get("device_id")
|
|
csv_data = store.export_csv(device_id=device_id)
|
|
|
|
return Response(
|
|
csv_data,
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=can_frames.csv"},
|
|
)
|
|
|
|
return bp
|