diff --git a/platforms/common/include/px4_platform_common/px4_work_queue/WorkQueue.hpp b/platforms/common/include/px4_platform_common/px4_work_queue/WorkQueue.hpp index f2a66bb898..d5d58ca209 100644 --- a/platforms/common/include/px4_platform_common/px4_work_queue/WorkQueue.hpp +++ b/platforms/common/include/px4_platform_common/px4_work_queue/WorkQueue.hpp @@ -69,7 +69,7 @@ public: void Run(); - void request_stop() { _should_exit.store(true); } + void request_stop() { _should_exit.store(true); SignalWorkerThread(); } void print_status(bool last = false); @@ -80,7 +80,14 @@ private: bool should_exit() const { return _should_exit.load(); } - inline void SignalWorkerThread(); + inline void SignalWorkerThread() + { + int sem_val; + + if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) { + px4_sem_post(&_process_lock); + } + } #ifdef __PX4_NUTTX // In NuttX work can be enqueued from an ISR diff --git a/platforms/common/px4_work_queue/WorkQueue.cpp b/platforms/common/px4_work_queue/WorkQueue.cpp index c8aa15995b..d3f8f684f5 100644 --- a/platforms/common/px4_work_queue/WorkQueue.cpp +++ b/platforms/common/px4_work_queue/WorkQueue.cpp @@ -144,15 +144,6 @@ void WorkQueue::Add(WorkItem *item) SignalWorkerThread(); } -void WorkQueue::SignalWorkerThread() -{ - int sem_val; - - if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) { - px4_sem_post(&_process_lock); - } -} - void WorkQueue::Remove(WorkItem *item) { work_lock(); diff --git a/platforms/common/px4_work_queue/WorkQueueManager.cpp b/platforms/common/px4_work_queue/WorkQueueManager.cpp index 8d22e32cb2..4bb808adda 100644 --- a/platforms/common/px4_work_queue/WorkQueueManager.cpp +++ b/platforms/common/px4_work_queue/WorkQueueManager.cpp @@ -62,6 +62,14 @@ static BlockingQueue *_wq_manager_create_queue{nullptr}; static px4::atomic_bool _wq_manager_should_exit{true}; static px4::atomic_bool _wq_manager_running{false}; +static px4_task_t _wq_manager_task_id{-1}; + +#ifdef __PX4_POSIX +#include +static constexpr int WQ_MAX_THREADS = 16; +static pthread_t _wq_threads[WQ_MAX_THREADS] {}; +static int _wq_thread_count = 0; +#endif static WorkQueue * @@ -326,6 +334,14 @@ WorkQueueManagerRun(int, char **) if (ret_create == 0) { PX4_DEBUG("starting: %s, priority: %d, stack: %zu bytes", wq->name, param.sched_priority, stacksize); +#ifdef __PX4_POSIX + + if (_wq_thread_count < WQ_MAX_THREADS) { + _wq_threads[_wq_thread_count++] = thread; + } + +#endif + } else { PX4_ERR("failed to create thread for %s (%i): %s", wq->name, ret_create, strerror(ret_create)); } @@ -375,16 +391,16 @@ WorkQueueManagerStart() _wq_manager_should_exit.store(false); - int task_id = px4_task_spawn_cmd("wq:manager", - SCHED_DEFAULT, - SCHED_PRIORITY_MAX, - PX4_STACK_ADJUSTED(1280), - (px4_main_t)&WorkQueueManagerRun, - nullptr); + _wq_manager_task_id = px4_task_spawn_cmd("wq:manager", + SCHED_DEFAULT, + SCHED_PRIORITY_MAX, + PX4_STACK_ADJUSTED(1280), + (px4_main_t)&WorkQueueManagerRun, + nullptr); - if (task_id < 0) { + if (_wq_manager_task_id < 0) { _wq_manager_should_exit.store(true); - PX4_ERR("task start failed (%i)", task_id); + PX4_ERR("task start failed (%i)", _wq_manager_task_id); return -errno; } @@ -413,20 +429,12 @@ WorkQueueManagerStop() { if (!_wq_manager_should_exit.load()) { - // error can't shutdown until all WorkItems are removed/stopped - if (_wq_manager_running.load() && (_wq_manager_wqs_list->size() > 0)) { - PX4_ERR("can't shutdown with active WQs"); - WorkQueueManagerStatus(); - return PX4_ERROR; - } - // first ask all WQs to stop if (_wq_manager_wqs_list != nullptr) { { LockGuard lg{_wq_manager_wqs_list->mutex()}; // ask all work queues (threads) to stop - // NOTE: not currently safe without all WorkItems stopping first for (WorkQueue *wq : *_wq_manager_wqs_list) { wq->request_stop(); } @@ -441,18 +449,31 @@ WorkQueueManagerStop() _wq_manager_wqs_list = nullptr; } + // signal wq:manager thread to exit _wq_manager_should_exit.store(true); if (_wq_manager_create_queue != nullptr) { // push nullptr to wake the wq manager task _wq_manager_create_queue->push(nullptr); - px4_usleep(10000); + // wait for wq:manager thread to finish + px4_task_join(_wq_manager_task_id); + _wq_manager_task_id = -1; delete _wq_manager_create_queue; _wq_manager_create_queue = nullptr; } +#ifdef __PX4_POSIX + + // join all work queue threads (safe now that wq:manager has exited) + for (int i = 0; i < _wq_thread_count; i++) { + pthread_join(_wq_threads[i], nullptr); + } + + _wq_thread_count = 0; +#endif + } else { PX4_WARN("not running"); return PX4_ERROR;