Replace monolithic CLI and web server with route-based Flask API. New routes: api_commands, api_build, api_can, api_monitor, api_ota, api_tunnel. Add honeypot security dashboard with real-time SSE, MITRE ATT&CK mapping, kill chain analysis. New TUI with commander/help modules. Add session management, tunnel proxy core, CAN bus data store. Docker support.
147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
"""Page routes (login, dashboard, cameras, mlat)."""
|
|
|
|
import hmac
|
|
import os
|
|
import secrets
|
|
from flask import Blueprint, render_template, redirect, url_for, request, session
|
|
|
|
|
|
def create_pages_blueprint(server_config):
|
|
"""
|
|
Create the pages blueprint.
|
|
|
|
Args:
|
|
server_config: Dict with keys:
|
|
- username: Login username
|
|
- password: Login password
|
|
- image_dir: Camera images directory
|
|
- c2_root: C2 root directory path
|
|
- require_login: Auth decorator
|
|
"""
|
|
bp = Blueprint("pages", __name__)
|
|
|
|
username = server_config["username"]
|
|
password = server_config["password"]
|
|
image_dir = server_config["image_dir"]
|
|
c2_root = server_config["c2_root"]
|
|
require_login = server_config["require_login"]
|
|
limiter = server_config["limiter"]
|
|
rate_limit_login = server_config["rate_limit_login"]
|
|
|
|
@bp.route("/login", methods=["GET", "POST"])
|
|
@limiter.limit(rate_limit_login, methods=["POST"])
|
|
def login():
|
|
error = None
|
|
if request.method == "POST":
|
|
# CSRF validation
|
|
token = request.form.get("csrf_token", "")
|
|
if token != session.get("csrf_token", ""):
|
|
error = "Invalid request. Please try again."
|
|
else:
|
|
form_user = request.form.get("username")
|
|
form_pass = request.form.get("password")
|
|
if hmac.compare_digest(form_user, username) and hmac.compare_digest(form_pass, password):
|
|
session.clear() # Prevent session fixation
|
|
session["logged_in"] = True
|
|
return redirect(url_for("pages.dashboard"))
|
|
else:
|
|
error = "Invalid credentials."
|
|
# Generate CSRF token for the form
|
|
session["csrf_token"] = secrets.token_hex(32)
|
|
return render_template("login.html", error=error, csrf_token=session["csrf_token"])
|
|
|
|
@bp.route("/logout")
|
|
def logout():
|
|
session.pop("logged_in", None)
|
|
return redirect(url_for("pages.login"))
|
|
|
|
@bp.route("/")
|
|
@require_login
|
|
def index():
|
|
return redirect(url_for("pages.dashboard"))
|
|
|
|
@bp.route("/dashboard")
|
|
@require_login
|
|
def dashboard():
|
|
return render_template("dashboard.html", active_page="dashboard")
|
|
|
|
@bp.route("/cameras")
|
|
@require_login
|
|
def cameras():
|
|
full_image_dir = os.path.join(c2_root, image_dir)
|
|
try:
|
|
image_files = sorted([
|
|
f for f in os.listdir(full_image_dir)
|
|
if f.endswith(".jpg")
|
|
])
|
|
except FileNotFoundError:
|
|
image_files = []
|
|
|
|
return render_template("cameras.html", active_page="cameras", image_files=image_files)
|
|
|
|
@bp.route("/mlat")
|
|
@require_login
|
|
def mlat():
|
|
return render_template("mlat.html", active_page="mlat")
|
|
|
|
@bp.route("/ota")
|
|
@require_login
|
|
def ota():
|
|
return render_template("ota.html", active_page="ota")
|
|
|
|
@bp.route("/device/<device_id>")
|
|
@require_login
|
|
def device_detail(device_id):
|
|
return render_template("device.html", active_page="dashboard", device_id=device_id)
|
|
|
|
@bp.route("/terminal")
|
|
@require_login
|
|
def terminal():
|
|
return render_template("terminal.html", active_page="terminal")
|
|
|
|
@bp.route("/canbus")
|
|
@require_login
|
|
def canbus():
|
|
return render_template("canbus.html", active_page="canbus")
|
|
|
|
@bp.route("/redteam")
|
|
@require_login
|
|
def redteam():
|
|
return render_template("redteam.html", active_page="redteam")
|
|
|
|
@bp.route("/network")
|
|
@require_login
|
|
def network():
|
|
return render_template("network.html", active_page="network")
|
|
|
|
@bp.route("/fakeap")
|
|
@require_login
|
|
def fakeap():
|
|
return render_template("fakeap.html", active_page="fakeap")
|
|
|
|
@bp.route("/system")
|
|
@require_login
|
|
def system():
|
|
return render_template("system.html", active_page="system")
|
|
|
|
@bp.route("/tunnel")
|
|
@require_login
|
|
def tunnel():
|
|
return render_template("tunnel.html", active_page="tunnel")
|
|
|
|
@bp.route("/streams/<filename>")
|
|
@require_login
|
|
def stream_image(filename):
|
|
from flask import send_from_directory
|
|
full_image_dir = os.path.join(c2_root, image_dir)
|
|
return send_from_directory(full_image_dir, filename)
|
|
|
|
@bp.route("/recordings/<filename>")
|
|
@require_login
|
|
def download_recording(filename):
|
|
from flask import send_from_directory
|
|
recordings_dir = os.path.join(c2_root, "static", "recordings")
|
|
return send_from_directory(recordings_dir, filename, as_attachment=True)
|
|
|
|
return bp
|