From 22a32eaa5549835dd5aaa25b5f6b8a05dd5b65cf Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Wed, 18 Feb 2026 12:43:02 +1300 Subject: [PATCH] uORB: fix TSAN issues using atomics --- platforms/common/uORB/Subscription.cpp | 4 +- platforms/common/uORB/Subscription.hpp | 17 ++++++--- .../common/uORB/SubscriptionBlocking.hpp | 4 +- .../common/uORB/SubscriptionCallback.hpp | 10 +++-- .../common/uORB/SubscriptionInterval.cpp | 8 ++-- .../common/uORB/SubscriptionInterval.hpp | 5 ++- platforms/common/uORB/uORBDeviceNode.cpp | 28 +++++++------- platforms/common/uORB/uORBDeviceNode.hpp | 14 +++---- platforms/common/uORB/uORBManager.cpp | 22 +++++------ platforms/common/uORB/uORBManager.hpp | 4 +- .../uORB/uORB_tests/uORBTest_UnitTest.cpp | 38 +++++++++---------- .../uORB/uORB_tests/uORBTest_UnitTest.hpp | 9 +++-- platforms/posix/src/px4/common/drv_hrt.cpp | 4 +- 13 files changed, 92 insertions(+), 75 deletions(-) diff --git a/platforms/common/uORB/Subscription.cpp b/platforms/common/uORB/Subscription.cpp index 473c554866..63ec4700ed 100644 --- a/platforms/common/uORB/Subscription.cpp +++ b/platforms/common/uORB/Subscription.cpp @@ -55,7 +55,7 @@ bool Subscription::subscribe() if (node) { _node = node; - _last_generation = initial_generation; + _last_generation.store(initial_generation); return true; } } @@ -70,7 +70,7 @@ void Subscription::unsubscribe() } _node = nullptr; - _last_generation = 0; + _last_generation.store(0); } bool Subscription::ChangeInstance(uint8_t instance) diff --git a/platforms/common/uORB/Subscription.hpp b/platforms/common/uORB/Subscription.hpp index 723427d840..e9bf197137 100644 --- a/platforms/common/uORB/Subscription.hpp +++ b/platforms/common/uORB/Subscription.hpp @@ -41,6 +41,7 @@ #include #include +#include #include #include @@ -131,7 +132,7 @@ public: bool updated() { if (subscribe()) { - return Manager::updates_available(_node, _last_generation); + return Manager::updates_available(_node, _last_generation.load()); } return false; @@ -144,7 +145,10 @@ public: bool update(void *dst) { if (subscribe()) { - return Manager::orb_data_copy(_node, dst, _last_generation, true); + unsigned gen = _last_generation.load(); + bool ret = Manager::orb_data_copy(_node, dst, gen, true); + _last_generation.store(gen); + return ret; } return false; @@ -157,7 +161,10 @@ public: bool copy(void *dst) { if (subscribe()) { - return Manager::orb_data_copy(_node, dst, _last_generation, false); + unsigned gen = _last_generation.load(); + bool ret = Manager::orb_data_copy(_node, dst, gen, false); + _last_generation.store(gen); + return ret; } return false; @@ -170,7 +177,7 @@ public: bool ChangeInstance(uint8_t instance); uint8_t get_instance() const { return _instance; } - unsigned get_last_generation() const { return _last_generation; } + unsigned get_last_generation() const { return _last_generation.load(); } orb_id_t get_topic() const { return get_orb_meta(_orb_id); } ORB_ID orb_id() const { return _orb_id; } @@ -184,7 +191,7 @@ protected: void *_node{nullptr}; - unsigned _last_generation{0}; /**< last generation the subscriber has seen */ + px4::atomic _last_generation{0}; /**< last generation the subscriber has seen */ ORB_ID _orb_id{ORB_ID::INVALID}; uint8_t _instance{0}; diff --git a/platforms/common/uORB/SubscriptionBlocking.hpp b/platforms/common/uORB/SubscriptionBlocking.hpp index 5c5580d855..0df895bc01 100644 --- a/platforms/common/uORB/SubscriptionBlocking.hpp +++ b/platforms/common/uORB/SubscriptionBlocking.hpp @@ -67,7 +67,9 @@ public: void call() override { // signal immediately if no interval, otherwise only if interval has elapsed - if ((_interval_us == 0) || (hrt_elapsed_time(&_last_update) >= _interval_us)) { + hrt_abstime last_update = _last_update.load(); + + if ((_interval_us == 0) || (hrt_elapsed_time(&last_update) >= _interval_us)) { pthread_cond_signal(&_cv); } } diff --git a/platforms/common/uORB/SubscriptionCallback.hpp b/platforms/common/uORB/SubscriptionCallback.hpp index 942a9fbe17..b04eb42f5b 100644 --- a/platforms/common/uORB/SubscriptionCallback.hpp +++ b/platforms/common/uORB/SubscriptionCallback.hpp @@ -163,8 +163,10 @@ public: void call() override { // schedule immediately if updated (queue depth or subscription interval) - if ((_required_updates == 0) - || (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= _required_updates)) { + uint8_t req = _required_updates.load(); + + if ((req == 0) + || (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= req)) { if (updated()) { _work_item->ScheduleNow(); } @@ -179,13 +181,13 @@ public: void set_required_updates(uint8_t required_updates) { // TODO: constrain to queue depth? - _required_updates = required_updates; + _required_updates.store(required_updates); } private: px4::WorkItem *_work_item; - uint8_t _required_updates{0}; + px4::atomic _required_updates{0}; }; } // namespace uORB diff --git a/platforms/common/uORB/SubscriptionInterval.cpp b/platforms/common/uORB/SubscriptionInterval.cpp index 8f8af333f2..934775b7e8 100644 --- a/platforms/common/uORB/SubscriptionInterval.cpp +++ b/platforms/common/uORB/SubscriptionInterval.cpp @@ -38,7 +38,9 @@ namespace uORB bool SubscriptionInterval::updated() { - if (advertised() && (hrt_elapsed_time(&_last_update) >= _interval_us)) { + hrt_abstime last_update = _last_update.load(); + + if (advertised() && (hrt_elapsed_time(&last_update) >= _interval_us)) { return _subscription.updated(); } @@ -62,10 +64,10 @@ bool SubscriptionInterval::copy(void *dst) // make sure we don't set a timestamp before the timer started counting (now - _interval_us would wrap because it's unsigned) if (now > _interval_us) { // shift last update time forward, but don't let it get further behind than the interval - _last_update = math::constrain(_last_update + _interval_us, now - _interval_us, now); + _last_update.store(math::constrain(_last_update.load() + _interval_us, now - _interval_us, now)); } else { - _last_update = now; + _last_update.store(now); } return true; diff --git a/platforms/common/uORB/SubscriptionInterval.hpp b/platforms/common/uORB/SubscriptionInterval.hpp index fd6ccdfb61..9a024cd6bd 100644 --- a/platforms/common/uORB/SubscriptionInterval.hpp +++ b/platforms/common/uORB/SubscriptionInterval.hpp @@ -39,6 +39,7 @@ #pragma once #include +#include #include #include "uORBDeviceNode.hpp" @@ -132,11 +133,11 @@ public: * Set the last data update * @param t should be in range [now, now - _interval_us] */ - void set_last_update(hrt_abstime t) { _last_update = t; } + void set_last_update(hrt_abstime t) { _last_update.store(t); } protected: Subscription _subscription; - uint64_t _last_update{0}; // last subscription update in microseconds + px4::atomic _last_update{0}; // last subscription update in microseconds uint32_t _interval_us{0}; // maximum update interval in microseconds }; diff --git a/platforms/common/uORB/uORBDeviceNode.cpp b/platforms/common/uORB/uORBDeviceNode.cpp index fd4ef60e63..306c6fba66 100644 --- a/platforms/common/uORB/uORBDeviceNode.cpp +++ b/platforms/common/uORB/uORBDeviceNode.cpp @@ -57,7 +57,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t inst uORB::DeviceNode::~DeviceNode() { - free(_data); + free(_data.load()); const char *devname = get_devname(); @@ -148,7 +148,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) * * Note that filp will usually be NULL. */ - if (nullptr == _data) { + if (nullptr == _data.load()) { #ifdef __PX4_NUTTX @@ -158,13 +158,15 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) lock(); /* re-check size */ - if (nullptr == _data) { + if (nullptr == _data.load()) { const size_t data_size = _meta->o_size * _meta->o_queue; - _data = (uint8_t *) px4_cache_aligned_alloc(data_size); + uint8_t *data = (uint8_t *) px4_cache_aligned_alloc(data_size); - if (_data) { - memset(_data, 0, data_size); + if (data) { + memset(data, 0, data_size); } + + _data.store(data); } unlock(); @@ -175,7 +177,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) #endif /* __PX4_NUTTX */ /* failed or could not allocate */ - if (nullptr == _data) { + if (nullptr == _data.load()) { return -ENOMEM; } } @@ -190,7 +192,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) /* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */ unsigned generation = _generation.fetch_add(1); - memcpy(_data + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size); + memcpy(_data.load() + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size); // callbacks for (auto item : _callbacks) { @@ -232,7 +234,7 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg) return PX4_OK; case ORBIOCISADVERTISED: - *(unsigned long *)arg = _advertised; + *(unsigned long *)arg = _advertised.load(); return PX4_OK; @@ -310,7 +312,7 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle) * of subscribers and publishers. But we also do not have a leak since future * publishers reuse the same DeviceNode object. */ - devnode->_advertised = false; + devnode->_advertised.store(false); return PX4_OK; } @@ -347,7 +349,7 @@ uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t even bool uORB::DeviceNode::print_statistics(int max_topic_length) { - if (!_advertised) { + if (!_advertised.load()) { return false; } @@ -411,10 +413,10 @@ int16_t uORB::DeviceNode::process_add_subscription() // send the data to the remote entity. uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator(); - if (_data != nullptr && ch != nullptr) { // _data will not be null if there is a publisher. + if (_data.load() != nullptr && ch != nullptr) { // _data will not be null if there is a publisher. // Only send the most recent data to initialize the remote end. if (_data_valid) { - ch->send_message(_meta->o_name, _meta->o_size, _data + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue))); + ch->send_message(_meta->o_name, _meta->o_size, _data.load() + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue))); } } diff --git a/platforms/common/uORB/uORBDeviceNode.hpp b/platforms/common/uORB/uORBDeviceNode.hpp index 527c5ddf2b..45f5f6b38a 100644 --- a/platforms/common/uORB/uORBDeviceNode.hpp +++ b/platforms/common/uORB/uORBDeviceNode.hpp @@ -175,9 +175,9 @@ public: * * This is used in the case of multi_pub/sub to check if it's valid to advertise * and publish to this node or if another node should be tried. */ - bool is_advertised() const { return _advertised; } + bool is_advertised() const { return _advertised.load(); } - void mark_as_advertised() { _advertised = true; } + void mark_as_advertised() { _advertised.store(true); } /** * Print statistics @@ -224,10 +224,10 @@ public: */ bool copy(void *dst, unsigned &generation) { - if ((dst != nullptr) && (_data != nullptr)) { + if ((dst != nullptr) && (_data.load() != nullptr)) { if (_meta->o_queue == 1) { ATOMIC_ENTER; - memcpy(dst, _data, _meta->o_size); + memcpy(dst, _data.load(), _meta->o_size); generation = _generation.load(); ATOMIC_LEAVE; return true; @@ -249,7 +249,7 @@ public: generation = current_generation - _meta->o_queue; } - memcpy(dst, _data + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size); + memcpy(dst, _data.load() + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size); ATOMIC_LEAVE; ++generation; @@ -279,13 +279,13 @@ private: const orb_metadata *_meta; /**< object metadata information */ - uint8_t *_data{nullptr}; /**< allocated object buffer */ + px4::atomic _data{nullptr}; /**< allocated object buffer */ bool _data_valid{false}; /**< At least one valid data */ px4::atomic _generation{0}; /**< object generation count */ List _callbacks; const uint8_t _instance; /**< orb multi instance identifier */ - bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */ + px4::atomic_bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */ int8_t _subscriber_count{0}; diff --git a/platforms/common/uORB/uORBManager.cpp b/platforms/common/uORB/uORBManager.cpp index 3fa75a3454..2e8d8e6385 100644 --- a/platforms/common/uORB/uORBManager.cpp +++ b/platforms/common/uORB/uORBManager.cpp @@ -52,25 +52,25 @@ pthread_mutex_t uORB::Manager::_communicator_mutex = PTHREAD_MUTEX_INITIALIZER; #endif -uORB::Manager *uORB::Manager::_Instance = nullptr; +px4::atomic uORB::Manager::_Instance{nullptr}; bool uORB::Manager::initialize() { - if (_Instance == nullptr) { - _Instance = new uORB::Manager(); + if (_Instance.load() == nullptr) { + _Instance.store(new uORB::Manager()); } #if defined(__PX4_NUTTX) && !defined(CONFIG_BUILD_FLAT) && defined(__KERNEL__) px4_register_boardct_ioctl(_ORBIOCDEVBASE, orb_ioctl); #endif - return _Instance != nullptr; + return _Instance.load() != nullptr; } bool uORB::Manager::terminate() { - if (_Instance != nullptr) { - delete _Instance; - _Instance = nullptr; + if (_Instance.load() != nullptr) { + delete _Instance.load(); + _Instance.store(nullptr); return true; } @@ -277,14 +277,14 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, if (_publisher_rule.ignore_other_topics) { if (!findTopic(_publisher_rule, meta->o_name)) { PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name); - return (orb_advert_t)_Instance; + return (orb_advert_t)_Instance.load(); } } } else { if (findTopic(_publisher_rule, meta->o_name)) { PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name); - return (orb_advert_t)_Instance; + return (orb_advert_t)_Instance.load(); } } } @@ -337,7 +337,7 @@ int uORB::Manager::orb_unadvertise(orb_advert_t handle) { #ifdef ORB_USE_PUBLISHER_RULES - if (handle == _Instance) { + if (handle == _Instance.load()) { return PX4_OK; //pretend success } @@ -366,7 +366,7 @@ int uORB::Manager::orb_publish(const struct orb_metadata *meta, orb_advert_t han { #ifdef ORB_USE_PUBLISHER_RULES - if (handle == _Instance) { + if (handle == _Instance.load()) { return PX4_OK; //pretend success } diff --git a/platforms/common/uORB/uORBManager.hpp b/platforms/common/uORB/uORBManager.hpp index d5202b9236..4244fe208e 100644 --- a/platforms/common/uORB/uORBManager.hpp +++ b/platforms/common/uORB/uORBManager.hpp @@ -184,7 +184,7 @@ public: * Make sure initialize() is called first. * @return uORB::Manager* */ - static uORB::Manager *get_instance() { return _Instance; } + static uORB::Manager *get_instance() { return _Instance.load(); } /** * Get the DeviceMaster. If it does not exist, @@ -489,7 +489,7 @@ private: // class methods int node_open(const struct orb_metadata *meta, bool advertiser, int *instance = nullptr); private: // data members - static Manager *_Instance; + static px4::atomic _Instance; #ifdef CONFIG_ORB_COMMUNICATOR // the communicator channel instance. diff --git a/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.cpp b/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.cpp index 035d2f9ac5..8550a03cf8 100644 --- a/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.cpp +++ b/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.cpp @@ -144,7 +144,7 @@ int uORBTest::UnitTest::pubsublatency_main() PX4_INFO("max: %3i us", timing_max); PX4_INFO("missed topic updates: %i", num_missed); - pubsubtest_passed = true; + pubsubtest_passed.store(true); #if defined(CONFIG_ARCH_BOARD_PX4_SITL) // relaxed on SITL (non-realtime) @@ -154,13 +154,13 @@ int uORBTest::UnitTest::pubsublatency_main() #endif if (mean > kMaxMeanUs) { - pubsubtest_res = PX4_ERROR; + pubsubtest_res.store(PX4_ERROR); } else { - pubsubtest_res = PX4_OK; + pubsubtest_res.store(PX4_OK); } - return pubsubtest_res; + return pubsubtest_res.load(); } int uORBTest::UnitTest::test() @@ -412,7 +412,7 @@ int uORBTest::UnitTest::pub_test_multi2_main() pub = orb_advertise_multi(ORB_ID(orb_test_medium_multi), &data_topic, &idx); if (idx != i) { - _thread_should_exit = true; + _thread_should_exit.store(true); PX4_ERR("Got wrong instance! should be: %i, but is %i", i, idx); return -1; } @@ -440,7 +440,7 @@ int uORBTest::UnitTest::pub_test_multi2_main() } px4_usleep(100 * 1000); - _thread_should_exit = true; + _thread_should_exit.store(true); for (int i = 0; i < num_instances; ++i) { orb_unadvertise(orb_pub[i]); @@ -454,7 +454,7 @@ int uORBTest::UnitTest::test_multi2() test_note("Testing multi-topic 2 test (queue simulation)"); //test: first subscribe, then advertise - _thread_should_exit = false; + _thread_should_exit.store(false); const int num_instances = 3; int orb_data_fd[num_instances] {-1, -1, -1}; int orb_data_next = 0; @@ -478,7 +478,7 @@ int uORBTest::UnitTest::test_multi2() hrt_abstime last_time[num_instances] {}; - while (!_thread_should_exit) { + while (!_thread_should_exit.load()) { px4_usleep(1000); @@ -945,7 +945,7 @@ int uORBTest::UnitTest::pub_test_queue_main() const int queue_size = orb_get_queue_size(ORB_ID(orb_test_medium_queue_poll)); if ((ptopic = orb_advertise(ORB_ID(orb_test_medium_queue_poll), &t)) == nullptr) { - _thread_should_exit = true; + _thread_should_exit.store(true); return test_fail("advertise failed: %d", errno); } @@ -968,9 +968,9 @@ int uORBTest::UnitTest::pub_test_queue_main() px4_usleep(20 * 1000); //give subscriber a chance to catch up } - _num_messages_sent = t.val; + _num_messages_sent.store(t.val); px4_usleep(100 * 1000); - _thread_should_exit = true; + _thread_should_exit.store(true); orb_unadvertise(ptopic); return 0; @@ -987,7 +987,7 @@ int uORBTest::UnitTest::test_queue_poll_notify() return test_fail("subscribe failed: %d", errno); } - _thread_should_exit = false; + _thread_should_exit.store(false); char *const args[1] = { nullptr }; int pubsub_task = px4_task_spawn_cmd("uorb_test_queue", @@ -1006,12 +1006,12 @@ int uORBTest::UnitTest::test_queue_poll_notify() fds[0].fd = sfd; fds[0].events = POLLIN; - while (!_thread_should_exit) { + while (!_thread_should_exit.load()) { int poll_ret = px4_poll(fds, 1, 500); if (poll_ret == 0) { - if (_thread_should_exit) { + if (_thread_should_exit.load()) { break; } @@ -1032,9 +1032,9 @@ int uORBTest::UnitTest::test_queue_poll_notify() } } - if (_num_messages_sent != next_expected_val) { + if (_num_messages_sent.load() != next_expected_val) { return test_fail("number of sent and received messages mismatch (sent: %i, received: %i)", - _num_messages_sent, next_expected_val); + _num_messages_sent.load(), next_expected_val); } return test_note("PASS orb queuing (poll & notify), got %i messages", next_expected_val); @@ -1056,7 +1056,7 @@ int uORBTest::UnitTest::latency_test(bool print) char *const args[1] = { nullptr }; pubsubtest_print = print; - pubsubtest_passed = false; + pubsubtest_passed.store(false); /* test pub / sub latency */ @@ -1071,7 +1071,7 @@ int uORBTest::UnitTest::latency_test(bool print) args); /* give the test task some data */ - while (!pubsubtest_passed) { + while (!pubsubtest_passed.load()) { ++t.val; t.timestamp = hrt_absolute_time(); @@ -1089,7 +1089,7 @@ int uORBTest::UnitTest::latency_test(bool print) orb_unadvertise(pfd0); - return pubsubtest_res; + return pubsubtest_res.load(); } int uORBTest::UnitTest::test_fail(const char *fmt, ...) diff --git a/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.hpp b/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.hpp index 908eeca9c7..120afcce9c 100644 --- a/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.hpp +++ b/platforms/common/uORB/uORB_tests/uORBTest_UnitTest.hpp @@ -42,6 +42,7 @@ #include #include +#include #include #include #include @@ -87,11 +88,11 @@ private: static int pub_test_multi2_entry(int argc, char *argv[]); int pub_test_multi2_main(); - volatile bool _thread_should_exit; + px4::atomic_bool _thread_should_exit{false}; - bool pubsubtest_passed{false}; + px4::atomic_bool pubsubtest_passed{false}; bool pubsubtest_print{false}; - int pubsubtest_res = OK; + px4::atomic pubsubtest_res{OK}; orb_advert_t _pfd[4] {}; ///< used for test_multi and test_multi_reversed @@ -113,7 +114,7 @@ private: static int pub_test_queue_entry(int argc, char *argv[]); int pub_test_queue_main(); int test_queue_poll_notify(); - volatile int _num_messages_sent = 0; + px4::atomic _num_messages_sent{0}; int test_fail(const char *fmt, ...); int test_note(const char *fmt, ...); diff --git a/platforms/posix/src/px4/common/drv_hrt.cpp b/platforms/posix/src/px4/common/drv_hrt.cpp index 1163f3fd8e..18157d0cfa 100644 --- a/platforms/posix/src/px4/common/drv_hrt.cpp +++ b/platforms/posix/src/px4/common/drv_hrt.cpp @@ -162,13 +162,13 @@ static void hrt_latency_update() /* bounded buckets */ for (index = 0; index < LATENCY_BUCKET_COUNT; index++) { if (latency <= latency_buckets[index]) { - latency_counters[index]++; + __atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED); return; } } /* catch-all at the end */ - latency_counters[index]++; + __atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED); } /*