#include "utils.h" #include "esp_log.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/queue.h" #include #include static const char *TAG = "CMD_ASYNC"; /* ========================================================= * Configuration * ========================================================= */ #ifndef CONFIG_ASYNC_WORKER_COUNT #define CONFIG_ASYNC_WORKER_COUNT 2 #endif #ifndef CONFIG_ASYNC_QUEUE_DEPTH #define CONFIG_ASYNC_QUEUE_DEPTH 8 #endif #define ASYNC_WORKER_STACK 4096 #define WATCHDOG_INTERVAL_MS 5000 #define WATCHDOG_TIMEOUT_US (60 * 1000000LL) /* 60s */ /* ========================================================= * Async job structure * ========================================================= */ typedef struct { const command_t *cmd; int argc; char argv[MAX_ASYNC_ARGS][MAX_ASYNC_ARG_LEN]; char *argv_ptrs[MAX_ASYNC_ARGS]; char request_id[64]; } async_job_t; /* ========================================================= * Per-worker state (watchdog tracking) * ========================================================= */ typedef struct { volatile int64_t start_us; /* 0 = idle */ volatile bool alerted; /* already reported to C2 */ const char *cmd_name; /* current command name */ char request_id[64]; } worker_state_t; static QueueHandle_t async_queue; static worker_state_t worker_states[CONFIG_ASYNC_WORKER_COUNT]; /* ========================================================= * Watchdog task — monitors workers for stuck commands * ========================================================= */ static void watchdog_task(void *arg) { while (1) { vTaskDelay(pdMS_TO_TICKS(WATCHDOG_INTERVAL_MS)); int64_t now = esp_timer_get_time(); for (int i = 0; i < CONFIG_ASYNC_WORKER_COUNT; i++) { worker_state_t *ws = &worker_states[i]; if (ws->start_us == 0 || ws->alerted) continue; int64_t elapsed = now - ws->start_us; if (elapsed > WATCHDOG_TIMEOUT_US) { int secs = (int)(elapsed / 1000000LL); char buf[128]; snprintf(buf, sizeof(buf), "Worker %d stuck: '%s' running for %ds", i, ws->cmd_name ? ws->cmd_name : "?", secs); ESP_LOGW(TAG, "%s", buf); msg_error("cmd_async", buf, ws->request_id[0] ? ws->request_id : NULL); ws->alerted = true; } } } } /* ========================================================= * Worker task (multiple instances share the same queue) * ========================================================= */ static void async_worker(void *arg) { int worker_id = (int)(intptr_t)arg; worker_state_t *ws = &worker_states[worker_id]; async_job_t job; while (1) { if (xQueueReceive(async_queue, &job, portMAX_DELAY)) { /* Recompute argv_ptrs to point into THIS copy's argv buffers. * xQueueReceive copies the struct by value, so the old * pointers (set at enqueue time) are now dangling. */ for (int i = 0; i < job.argc; i++) { job.argv_ptrs[i] = job.argv[i]; } /* Mark worker as busy for watchdog */ ws->cmd_name = job.cmd->name; strncpy(ws->request_id, job.request_id, sizeof(ws->request_id) - 1); ws->alerted = false; ws->start_us = esp_timer_get_time(); ESP_LOGI(TAG, "Worker %d exec: %s", worker_id, job.cmd->name); job.cmd->handler( job.argc, job.argv_ptrs, job.request_id[0] ? job.request_id : NULL, job.cmd->ctx ); /* Mark worker as idle */ ws->start_us = 0; } } } /* ========================================================= * Init async system * ========================================================= */ void command_async_init(void) { memset(worker_states, 0, sizeof(worker_states)); async_queue = xQueueCreate(CONFIG_ASYNC_QUEUE_DEPTH, sizeof(async_job_t)); if (!async_queue) { ESP_LOGE(TAG, "Failed to create async queue"); return; } for (int i = 0; i < CONFIG_ASYNC_WORKER_COUNT; i++) { char name[16]; snprintf(name, sizeof(name), "cmd_async_%d", i); BaseType_t ret = xTaskCreatePinnedToCore( async_worker, name, ASYNC_WORKER_STACK, (void *)(intptr_t)i, 5, NULL, 1 /* Core 1 */ ); if (ret != pdPASS) { ESP_LOGE(TAG, "Failed to create worker %d", i); } } /* Watchdog: low priority, small stack, Core 0 */ xTaskCreatePinnedToCore(watchdog_task, "cmd_wdog", 2048, NULL, 2, NULL, 0); ESPILON_LOGI_PURPLE(TAG, "Async command system ready (%d workers, watchdog on)", CONFIG_ASYNC_WORKER_COUNT); } /* ========================================================= * Enqueue async command * ========================================================= */ void command_async_enqueue(const command_t *cmd, const c2_Command *pb_cmd, int argv_offset) { if (!cmd || !pb_cmd) return; async_job_t job = {0}; job.cmd = cmd; job.argc = pb_cmd->argv_count - argv_offset; if (job.argc > MAX_ASYNC_ARGS) job.argc = MAX_ASYNC_ARGS; if (job.argc < 0) job.argc = 0; for (int i = 0; i < job.argc; i++) { strncpy(job.argv[i], pb_cmd->argv[i + argv_offset], MAX_ASYNC_ARG_LEN - 1); job.argv[i][MAX_ASYNC_ARG_LEN - 1] = '\0'; job.argv_ptrs[i] = job.argv[i]; } if (pb_cmd->request_id[0]) { strncpy(job.request_id, pb_cmd->request_id, sizeof(job.request_id) - 1); job.request_id[sizeof(job.request_id) - 1] = '\0'; } if (xQueueSend(async_queue, &job, 0) != pdTRUE) { char buf[128]; snprintf(buf, sizeof(buf), "Async queue full, dropped '%s'", cmd->name); ESP_LOGE(TAG, "%s", buf); msg_error("cmd_async", buf, pb_cmd->request_id); } }