espilon-source/tools/C3PO/web/server.py
Eun0us 79c2a4d4bf c3po: full server rewrite with modular routes and honeypot dashboard
Replace monolithic CLI and web server with route-based Flask API.
New routes: api_commands, api_build, api_can, api_monitor, api_ota,
api_tunnel. Add honeypot security dashboard with real-time SSE,
MITRE ATT&CK mapping, kill chain analysis.

New TUI with commander/help modules. Add session management,
tunnel proxy core, CAN bus data store. Docker support.
2026-02-28 20:12:27 +01:00

276 lines
9.6 KiB
Python

"""Unified Flask web server for ESPILON C2 dashboard."""
import os
import logging
import threading
from typing import Optional
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.serving import make_server
from .mlat import MlatEngine
from .auth import create_auth_decorators
from .routes import (
create_pages_blueprint,
create_devices_blueprint,
create_cameras_blueprint,
create_mlat_blueprint,
create_stats_blueprint,
create_ota_blueprint,
create_build_blueprint,
create_commands_blueprint,
create_monitor_blueprint,
create_can_blueprint,
create_tunnel_blueprint,
)
from .build_manager import BuildManager
# Disable Flask/Werkzeug request logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)
class UnifiedWebServer:
"""
Unified Flask-based web server for ESPILON C2.
Provides:
- Dashboard: View connected ESP32 devices
- Cameras: View live camera streams with recording
- MLAT: Visualize multilateration positioning
"""
def __init__(self,
host: str = "0.0.0.0",
port: int = 8000,
image_dir: str = "static/streams",
username: str = "admin",
password: str = "admin",
secret_key: str = "change_this_for_prod",
multilat_token: str = "multilat_secret_token",
device_registry=None,
transport=None,
session=None,
mlat_engine: Optional[MlatEngine] = None,
camera_receiver=None,
hp_store=None,
hp_commander=None,
hp_alerts=None,
hp_geo=None):
"""
Initialize the unified web server.
Args:
host: Host to bind the server
port: Port for the web server
image_dir: Directory containing camera frame images
username: Login username
password: Login password
secret_key: Flask session secret key
multilat_token: Bearer token for MLAT API
device_registry: DeviceRegistry instance for device listing
mlat_engine: MlatEngine instance (created if None)
camera_receiver: UDPReceiver instance for camera control
hp_store: HpStore instance for honeypot event storage
hp_commander: HpCommander instance for honeypot command dispatch
hp_alerts: HpAlertEngine instance for honeypot alert rules
hp_geo: HpGeoLookup instance for geo-IP enrichment
"""
self.host = host
self.port = port
self.image_dir = image_dir
self.username = username
self.password = password
self.secret_key = secret_key
self.multilat_token = multilat_token
self.device_registry = device_registry
self.transport = transport
self._session = session
self.mlat = mlat_engine or MlatEngine()
self.camera_receiver = camera_receiver
self.hp_store = hp_store
self.hp_commander = hp_commander
self.hp_alerts = hp_alerts
self.hp_geo = hp_geo
# C2 root directory
self.c2_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Ensure image directory exists
full_image_dir = os.path.join(self.c2_root, self.image_dir)
os.makedirs(full_image_dir, exist_ok=True)
self._app = self._create_app()
self._server = None
self._thread = None
def set_camera_receiver(self, receiver):
"""Set the camera receiver after initialization."""
self.camera_receiver = receiver
@property
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
def _create_app(self) -> Flask:
"""Create and configure the Flask application."""
template_dir = os.path.join(self.c2_root, "templates")
static_dir = os.path.join(self.c2_root, "static")
app = Flask(__name__,
template_folder=template_dir,
static_folder=static_dir)
app.secret_key = self.secret_key
# CORS: allow cross-origin requests from whitelisted origins only
from streams.config import CORS_ALLOWED_ORIGINS
@app.after_request
def add_cors_headers(response):
origin = request.headers.get("Origin", "")
if origin and CORS_ALLOWED_ORIGINS and origin in CORS_ALLOWED_ORIGINS:
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Credentials"] = "true"
return response
@app.route("/api/<path:path>", methods=["OPTIONS"])
@app.route("/api/", defaults={"path": ""}, methods=["OPTIONS"])
def handle_preflight(path):
return "", 204
# Rate limiter (in-memory, per-IP)
from streams.config import RATE_LIMIT_DEFAULT, RATE_LIMIT_LOGIN
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=[RATE_LIMIT_DEFAULT],
storage_uri="memory://",
)
# Return JSON for API errors
@app.errorhandler(429)
def handle_429(e):
return jsonify({"error": "Rate limit exceeded", "detail": str(e.description)}), 429
@app.errorhandler(500)
def handle_500(e):
return jsonify({"error": "Internal server error", "detail": str(e)}), 500
# Create auth decorators
require_login, require_api_auth = create_auth_decorators(
lambda: self.multilat_token
)
# Shared config for blueprints
base_config = {
"c2_root": self.c2_root,
"image_dir": self.image_dir,
"require_login": require_login,
"require_api_auth": require_api_auth,
"limiter": limiter,
"rate_limit_login": RATE_LIMIT_LOGIN,
}
# Register blueprints
app.register_blueprint(create_pages_blueprint({
**base_config,
"username": self.username,
"password": self.password,
}))
app.register_blueprint(create_devices_blueprint({
**base_config,
"get_device_registry": lambda: self.device_registry,
}))
app.register_blueprint(create_cameras_blueprint({
**base_config,
"get_camera_receiver": lambda: self.camera_receiver,
}))
app.register_blueprint(create_mlat_blueprint({
**base_config,
"get_mlat_engine": lambda: self.mlat,
}))
app.register_blueprint(create_stats_blueprint({
**base_config,
"get_device_registry": lambda: self.device_registry,
"get_mlat_engine": lambda: self.mlat,
}))
app.register_blueprint(create_ota_blueprint({
**base_config,
"get_device_registry": lambda: self.device_registry,
"get_transport": lambda: self.transport,
}))
app.register_blueprint(create_commands_blueprint({
**base_config,
"get_device_registry": lambda: self.device_registry,
"get_transport": lambda: self.transport,
"get_session": lambda: self._session,
}))
app.register_blueprint(create_monitor_blueprint({
**base_config,
"get_device_registry": lambda: self.device_registry,
}))
# Build firmware from web (uses deploy.py as library)
deploy_json = os.path.join(os.path.dirname(self.c2_root), "deploy.json")
firmware_dir = os.path.join(self.c2_root, "firmware")
build_mgr = BuildManager(firmware_dir, deploy_json)
app.register_blueprint(create_build_blueprint({
**base_config,
"build_manager": build_mgr,
}))
# CAN bus frame API (always available — store is created in session)
app.register_blueprint(create_can_blueprint({
**base_config,
"get_can_store": lambda: self._session.can_store if self._session else None,
}))
# Tunnel / SOCKS5 proxy API
app.register_blueprint(create_tunnel_blueprint(self._session))
# Honeypot dashboard (optional — only if hp_store is provided)
if self.hp_store and self.hp_commander:
try:
from hp_dashboard import create_hp_blueprint
app.register_blueprint(create_hp_blueprint({
**base_config,
"hp_store": self.hp_store,
"hp_commander": self.hp_commander,
"hp_alerts": self.hp_alerts,
"hp_geo": self.hp_geo,
}))
except ImportError:
pass # hp_dashboard not available
return app
def start(self) -> bool:
"""Start the web server in a background thread."""
if self.is_running:
return False
self._server = make_server(self.host, self.port, self._app, threaded=True)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
self._thread.start()
return True
def stop(self):
"""Stop the web server."""
if self._server:
self._server.shutdown()
self._server = None
self._thread = None
def get_url(self) -> str:
"""Get the server URL."""
return f"http://{self.host}:{self.port}"