"""C3PO REST API client.""" import requests from espmon.config import Config class C3POClient: """HTTP client for the C3PO command & control API.""" def __init__(self, config: Config = None): cfg = config or Config() self.base_url = cfg.base_url self.headers = {"Content-Type": "application/json"} if cfg.token: self.headers["Authorization"] = f"Bearer {cfg.token}" def send_command(self, device_id: str, command: str, argv: list = None) -> dict: """Send a command to a device. POST /api/commands Returns: {"results": [{"device_id", "status", "request_id"}]} """ body = { "device_ids": [device_id] if device_id != "all" else "all", "command": command, "argv": argv or [], } resp = requests.post( f"{self.base_url}/api/commands", json=body, headers=self.headers, timeout=10, ) resp.raise_for_status() return resp.json() def poll_command(self, request_id: str) -> dict: """Poll a command's result. GET /api/commands/ Returns: {"status": "pending|completed", "output": [...]} """ resp = requests.get( f"{self.base_url}/api/commands/{request_id}", headers=self.headers, timeout=10, ) resp.raise_for_status() return resp.json() def list_devices(self) -> dict: """List connected devices. GET /api/devices Returns: {"devices": [...], "count": N} """ resp = requests.get( f"{self.base_url}/api/devices", headers=self.headers, timeout=10, ) resp.raise_for_status() return resp.json() def list_commands(self) -> dict: """List recent commands. GET /api/commands Returns: {"commands": [...]} """ resp = requests.get( f"{self.base_url}/api/commands", headers=self.headers, timeout=10, ) resp.raise_for_status() return resp.json()