px4_work_queue: properly clean up

Fixes TSAN issues in unit tets.
This commit is contained in:
Julian Oes
2026-02-18 20:21:04 +13:00
parent 2ad0b297ca
commit d6c8514fea
3 changed files with 47 additions and 28 deletions
@@ -62,6 +62,14 @@ static BlockingQueue<const wq_config_t *, 1> *_wq_manager_create_queue{nullptr};
static px4::atomic_bool _wq_manager_should_exit{true};
static px4::atomic_bool _wq_manager_running{false};
static px4_task_t _wq_manager_task_id{-1};
#ifdef __PX4_POSIX
#include <pthread.h>
static constexpr int WQ_MAX_THREADS = 16;
static pthread_t _wq_threads[WQ_MAX_THREADS] {};
static int _wq_thread_count = 0;
#endif
static WorkQueue *
@@ -326,6 +334,14 @@ WorkQueueManagerRun(int, char **)
if (ret_create == 0) {
PX4_DEBUG("starting: %s, priority: %d, stack: %zu bytes", wq->name, param.sched_priority, stacksize);
#ifdef __PX4_POSIX
if (_wq_thread_count < WQ_MAX_THREADS) {
_wq_threads[_wq_thread_count++] = thread;
}
#endif
} else {
PX4_ERR("failed to create thread for %s (%i): %s", wq->name, ret_create, strerror(ret_create));
}
@@ -375,16 +391,16 @@ WorkQueueManagerStart()
_wq_manager_should_exit.store(false);
int task_id = px4_task_spawn_cmd("wq:manager",
SCHED_DEFAULT,
SCHED_PRIORITY_MAX,
PX4_STACK_ADJUSTED(1280),
(px4_main_t)&WorkQueueManagerRun,
nullptr);
_wq_manager_task_id = px4_task_spawn_cmd("wq:manager",
SCHED_DEFAULT,
SCHED_PRIORITY_MAX,
PX4_STACK_ADJUSTED(1280),
(px4_main_t)&WorkQueueManagerRun,
nullptr);
if (task_id < 0) {
if (_wq_manager_task_id < 0) {
_wq_manager_should_exit.store(true);
PX4_ERR("task start failed (%i)", task_id);
PX4_ERR("task start failed (%i)", _wq_manager_task_id);
return -errno;
}
@@ -413,20 +429,12 @@ WorkQueueManagerStop()
{
if (!_wq_manager_should_exit.load()) {
// error can't shutdown until all WorkItems are removed/stopped
if (_wq_manager_running.load() && (_wq_manager_wqs_list->size() > 0)) {
PX4_ERR("can't shutdown with active WQs");
WorkQueueManagerStatus();
return PX4_ERROR;
}
// first ask all WQs to stop
if (_wq_manager_wqs_list != nullptr) {
{
LockGuard lg{_wq_manager_wqs_list->mutex()};
// ask all work queues (threads) to stop
// NOTE: not currently safe without all WorkItems stopping first
for (WorkQueue *wq : *_wq_manager_wqs_list) {
wq->request_stop();
}
@@ -441,18 +449,31 @@ WorkQueueManagerStop()
_wq_manager_wqs_list = nullptr;
}
// signal wq:manager thread to exit
_wq_manager_should_exit.store(true);
if (_wq_manager_create_queue != nullptr) {
// push nullptr to wake the wq manager task
_wq_manager_create_queue->push(nullptr);
px4_usleep(10000);
// wait for wq:manager thread to finish
px4_task_join(_wq_manager_task_id);
_wq_manager_task_id = -1;
delete _wq_manager_create_queue;
_wq_manager_create_queue = nullptr;
}
#ifdef __PX4_POSIX
// join all work queue threads (safe now that wq:manager has exited)
for (int i = 0; i < _wq_thread_count; i++) {
pthread_join(_wq_threads[i], nullptr);
}
_wq_thread_count = 0;
#endif
} else {
PX4_WARN("not running");
return PX4_ERROR;