uORB: fix TSAN issues using atomics

This commit is contained in:
Julian Oes 2026-02-18 12:43:02 +13:00
parent 91e25df7c4
commit 22a32eaa55
No known key found for this signature in database
GPG Key ID: F0ED380FEA56DE41
13 changed files with 92 additions and 75 deletions

View File

@ -55,7 +55,7 @@ bool Subscription::subscribe()
if (node) {
_node = node;
_last_generation = initial_generation;
_last_generation.store(initial_generation);
return true;
}
}
@ -70,7 +70,7 @@ void Subscription::unsubscribe()
}
_node = nullptr;
_last_generation = 0;
_last_generation.store(0);
}
bool Subscription::ChangeInstance(uint8_t instance)

View File

@ -41,6 +41,7 @@
#include <uORB/uORB.h>
#include <uORB/topics/uORBTopics.hpp>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include <lib/mathlib/mathlib.h>
@ -131,7 +132,7 @@ public:
bool updated()
{
if (subscribe()) {
return Manager::updates_available(_node, _last_generation);
return Manager::updates_available(_node, _last_generation.load());
}
return false;
@ -144,7 +145,10 @@ public:
bool update(void *dst)
{
if (subscribe()) {
return Manager::orb_data_copy(_node, dst, _last_generation, true);
unsigned gen = _last_generation.load();
bool ret = Manager::orb_data_copy(_node, dst, gen, true);
_last_generation.store(gen);
return ret;
}
return false;
@ -157,7 +161,10 @@ public:
bool copy(void *dst)
{
if (subscribe()) {
return Manager::orb_data_copy(_node, dst, _last_generation, false);
unsigned gen = _last_generation.load();
bool ret = Manager::orb_data_copy(_node, dst, gen, false);
_last_generation.store(gen);
return ret;
}
return false;
@ -170,7 +177,7 @@ public:
bool ChangeInstance(uint8_t instance);
uint8_t get_instance() const { return _instance; }
unsigned get_last_generation() const { return _last_generation; }
unsigned get_last_generation() const { return _last_generation.load(); }
orb_id_t get_topic() const { return get_orb_meta(_orb_id); }
ORB_ID orb_id() const { return _orb_id; }
@ -184,7 +191,7 @@ protected:
void *_node{nullptr};
unsigned _last_generation{0}; /**< last generation the subscriber has seen */
px4::atomic<unsigned> _last_generation{0}; /**< last generation the subscriber has seen */
ORB_ID _orb_id{ORB_ID::INVALID};
uint8_t _instance{0};

View File

@ -67,7 +67,9 @@ public:
void call() override
{
// signal immediately if no interval, otherwise only if interval has elapsed
if ((_interval_us == 0) || (hrt_elapsed_time(&_last_update) >= _interval_us)) {
hrt_abstime last_update = _last_update.load();
if ((_interval_us == 0) || (hrt_elapsed_time(&last_update) >= _interval_us)) {
pthread_cond_signal(&_cv);
}
}

View File

@ -163,8 +163,10 @@ public:
void call() override
{
// schedule immediately if updated (queue depth or subscription interval)
if ((_required_updates == 0)
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= _required_updates)) {
uint8_t req = _required_updates.load();
if ((req == 0)
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= req)) {
if (updated()) {
_work_item->ScheduleNow();
}
@ -179,13 +181,13 @@ public:
void set_required_updates(uint8_t required_updates)
{
// TODO: constrain to queue depth?
_required_updates = required_updates;
_required_updates.store(required_updates);
}
private:
px4::WorkItem *_work_item;
uint8_t _required_updates{0};
px4::atomic<uint8_t> _required_updates{0};
};
} // namespace uORB

View File

@ -38,7 +38,9 @@ namespace uORB
bool SubscriptionInterval::updated()
{
if (advertised() && (hrt_elapsed_time(&_last_update) >= _interval_us)) {
hrt_abstime last_update = _last_update.load();
if (advertised() && (hrt_elapsed_time(&last_update) >= _interval_us)) {
return _subscription.updated();
}
@ -62,10 +64,10 @@ bool SubscriptionInterval::copy(void *dst)
// make sure we don't set a timestamp before the timer started counting (now - _interval_us would wrap because it's unsigned)
if (now > _interval_us) {
// shift last update time forward, but don't let it get further behind than the interval
_last_update = math::constrain(_last_update + _interval_us, now - _interval_us, now);
_last_update.store(math::constrain(_last_update.load() + _interval_us, now - _interval_us, now));
} else {
_last_update = now;
_last_update.store(now);
}
return true;

View File

@ -39,6 +39,7 @@
#pragma once
#include <uORB/uORB.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include "uORBDeviceNode.hpp"
@ -132,11 +133,11 @@ public:
* Set the last data update
* @param t should be in range [now, now - _interval_us]
*/
void set_last_update(hrt_abstime t) { _last_update = t; }
void set_last_update(hrt_abstime t) { _last_update.store(t); }
protected:
Subscription _subscription;
uint64_t _last_update{0}; // last subscription update in microseconds
px4::atomic<uint64_t> _last_update{0}; // last subscription update in microseconds
uint32_t _interval_us{0}; // maximum update interval in microseconds
};

View File

@ -57,7 +57,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t inst
uORB::DeviceNode::~DeviceNode()
{
free(_data);
free(_data.load());
const char *devname = get_devname();
@ -148,7 +148,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
*
* Note that filp will usually be NULL.
*/
if (nullptr == _data) {
if (nullptr == _data.load()) {
#ifdef __PX4_NUTTX
@ -158,13 +158,15 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
lock();
/* re-check size */
if (nullptr == _data) {
if (nullptr == _data.load()) {
const size_t data_size = _meta->o_size * _meta->o_queue;
_data = (uint8_t *) px4_cache_aligned_alloc(data_size);
uint8_t *data = (uint8_t *) px4_cache_aligned_alloc(data_size);
if (_data) {
memset(_data, 0, data_size);
if (data) {
memset(data, 0, data_size);
}
_data.store(data);
}
unlock();
@ -175,7 +177,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
#endif /* __PX4_NUTTX */
/* failed or could not allocate */
if (nullptr == _data) {
if (nullptr == _data.load()) {
return -ENOMEM;
}
}
@ -190,7 +192,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
unsigned generation = _generation.fetch_add(1);
memcpy(_data + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
memcpy(_data.load() + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
// callbacks
for (auto item : _callbacks) {
@ -232,7 +234,7 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
return PX4_OK;
case ORBIOCISADVERTISED:
*(unsigned long *)arg = _advertised;
*(unsigned long *)arg = _advertised.load();
return PX4_OK;
@ -310,7 +312,7 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle)
* of subscribers and publishers. But we also do not have a leak since future
* publishers reuse the same DeviceNode object.
*/
devnode->_advertised = false;
devnode->_advertised.store(false);
return PX4_OK;
}
@ -347,7 +349,7 @@ uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t even
bool
uORB::DeviceNode::print_statistics(int max_topic_length)
{
if (!_advertised) {
if (!_advertised.load()) {
return false;
}
@ -411,10 +413,10 @@ int16_t uORB::DeviceNode::process_add_subscription()
// send the data to the remote entity.
uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator();
if (_data != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
if (_data.load() != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
// Only send the most recent data to initialize the remote end.
if (_data_valid) {
ch->send_message(_meta->o_name, _meta->o_size, _data + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
ch->send_message(_meta->o_name, _meta->o_size, _data.load() + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
}
}

View File

@ -175,9 +175,9 @@ public:
*
* This is used in the case of multi_pub/sub to check if it's valid to advertise
* and publish to this node or if another node should be tried. */
bool is_advertised() const { return _advertised; }
bool is_advertised() const { return _advertised.load(); }
void mark_as_advertised() { _advertised = true; }
void mark_as_advertised() { _advertised.store(true); }
/**
* Print statistics
@ -224,10 +224,10 @@ public:
*/
bool copy(void *dst, unsigned &generation)
{
if ((dst != nullptr) && (_data != nullptr)) {
if ((dst != nullptr) && (_data.load() != nullptr)) {
if (_meta->o_queue == 1) {
ATOMIC_ENTER;
memcpy(dst, _data, _meta->o_size);
memcpy(dst, _data.load(), _meta->o_size);
generation = _generation.load();
ATOMIC_LEAVE;
return true;
@ -249,7 +249,7 @@ public:
generation = current_generation - _meta->o_queue;
}
memcpy(dst, _data + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
memcpy(dst, _data.load() + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
ATOMIC_LEAVE;
++generation;
@ -279,13 +279,13 @@ private:
const orb_metadata *_meta; /**< object metadata information */
uint8_t *_data{nullptr}; /**< allocated object buffer */
px4::atomic<uint8_t *> _data{nullptr}; /**< allocated object buffer */
bool _data_valid{false}; /**< At least one valid data */
px4::atomic<unsigned> _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;
const uint8_t _instance; /**< orb multi instance identifier */
bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
px4::atomic_bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
int8_t _subscriber_count{0};

View File

@ -52,25 +52,25 @@
pthread_mutex_t uORB::Manager::_communicator_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
uORB::Manager *uORB::Manager::_Instance = nullptr;
px4::atomic<uORB::Manager *> uORB::Manager::_Instance{nullptr};
bool uORB::Manager::initialize()
{
if (_Instance == nullptr) {
_Instance = new uORB::Manager();
if (_Instance.load() == nullptr) {
_Instance.store(new uORB::Manager());
}
#if defined(__PX4_NUTTX) && !defined(CONFIG_BUILD_FLAT) && defined(__KERNEL__)
px4_register_boardct_ioctl(_ORBIOCDEVBASE, orb_ioctl);
#endif
return _Instance != nullptr;
return _Instance.load() != nullptr;
}
bool uORB::Manager::terminate()
{
if (_Instance != nullptr) {
delete _Instance;
_Instance = nullptr;
if (_Instance.load() != nullptr) {
delete _Instance.load();
_Instance.store(nullptr);
return true;
}
@ -277,14 +277,14 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
if (_publisher_rule.ignore_other_topics) {
if (!findTopic(_publisher_rule, meta->o_name)) {
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
return (orb_advert_t)_Instance;
return (orb_advert_t)_Instance.load();
}
}
} else {
if (findTopic(_publisher_rule, meta->o_name)) {
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
return (orb_advert_t)_Instance;
return (orb_advert_t)_Instance.load();
}
}
}
@ -337,7 +337,7 @@ int uORB::Manager::orb_unadvertise(orb_advert_t handle)
{
#ifdef ORB_USE_PUBLISHER_RULES
if (handle == _Instance) {
if (handle == _Instance.load()) {
return PX4_OK; //pretend success
}
@ -366,7 +366,7 @@ int uORB::Manager::orb_publish(const struct orb_metadata *meta, orb_advert_t han
{
#ifdef ORB_USE_PUBLISHER_RULES
if (handle == _Instance) {
if (handle == _Instance.load()) {
return PX4_OK; //pretend success
}

View File

@ -184,7 +184,7 @@ public:
* Make sure initialize() is called first.
* @return uORB::Manager*
*/
static uORB::Manager *get_instance() { return _Instance; }
static uORB::Manager *get_instance() { return _Instance.load(); }
/**
* Get the DeviceMaster. If it does not exist,
@ -489,7 +489,7 @@ private: // class methods
int node_open(const struct orb_metadata *meta, bool advertiser, int *instance = nullptr);
private: // data members
static Manager *_Instance;
static px4::atomic<Manager *> _Instance;
#ifdef CONFIG_ORB_COMMUNICATOR
// the communicator channel instance.

View File

@ -144,7 +144,7 @@ int uORBTest::UnitTest::pubsublatency_main()
PX4_INFO("max: %3i us", timing_max);
PX4_INFO("missed topic updates: %i", num_missed);
pubsubtest_passed = true;
pubsubtest_passed.store(true);
#if defined(CONFIG_ARCH_BOARD_PX4_SITL)
// relaxed on SITL (non-realtime)
@ -154,13 +154,13 @@ int uORBTest::UnitTest::pubsublatency_main()
#endif
if (mean > kMaxMeanUs) {
pubsubtest_res = PX4_ERROR;
pubsubtest_res.store(PX4_ERROR);
} else {
pubsubtest_res = PX4_OK;
pubsubtest_res.store(PX4_OK);
}
return pubsubtest_res;
return pubsubtest_res.load();
}
int uORBTest::UnitTest::test()
@ -412,7 +412,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
pub = orb_advertise_multi(ORB_ID(orb_test_medium_multi), &data_topic, &idx);
if (idx != i) {
_thread_should_exit = true;
_thread_should_exit.store(true);
PX4_ERR("Got wrong instance! should be: %i, but is %i", i, idx);
return -1;
}
@ -440,7 +440,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
}
px4_usleep(100 * 1000);
_thread_should_exit = true;
_thread_should_exit.store(true);
for (int i = 0; i < num_instances; ++i) {
orb_unadvertise(orb_pub[i]);
@ -454,7 +454,7 @@ int uORBTest::UnitTest::test_multi2()
test_note("Testing multi-topic 2 test (queue simulation)");
//test: first subscribe, then advertise
_thread_should_exit = false;
_thread_should_exit.store(false);
const int num_instances = 3;
int orb_data_fd[num_instances] {-1, -1, -1};
int orb_data_next = 0;
@ -478,7 +478,7 @@ int uORBTest::UnitTest::test_multi2()
hrt_abstime last_time[num_instances] {};
while (!_thread_should_exit) {
while (!_thread_should_exit.load()) {
px4_usleep(1000);
@ -945,7 +945,7 @@ int uORBTest::UnitTest::pub_test_queue_main()
const int queue_size = orb_get_queue_size(ORB_ID(orb_test_medium_queue_poll));
if ((ptopic = orb_advertise(ORB_ID(orb_test_medium_queue_poll), &t)) == nullptr) {
_thread_should_exit = true;
_thread_should_exit.store(true);
return test_fail("advertise failed: %d", errno);
}
@ -968,9 +968,9 @@ int uORBTest::UnitTest::pub_test_queue_main()
px4_usleep(20 * 1000); //give subscriber a chance to catch up
}
_num_messages_sent = t.val;
_num_messages_sent.store(t.val);
px4_usleep(100 * 1000);
_thread_should_exit = true;
_thread_should_exit.store(true);
orb_unadvertise(ptopic);
return 0;
@ -987,7 +987,7 @@ int uORBTest::UnitTest::test_queue_poll_notify()
return test_fail("subscribe failed: %d", errno);
}
_thread_should_exit = false;
_thread_should_exit.store(false);
char *const args[1] = { nullptr };
int pubsub_task = px4_task_spawn_cmd("uorb_test_queue",
@ -1006,12 +1006,12 @@ int uORBTest::UnitTest::test_queue_poll_notify()
fds[0].fd = sfd;
fds[0].events = POLLIN;
while (!_thread_should_exit) {
while (!_thread_should_exit.load()) {
int poll_ret = px4_poll(fds, 1, 500);
if (poll_ret == 0) {
if (_thread_should_exit) {
if (_thread_should_exit.load()) {
break;
}
@ -1032,9 +1032,9 @@ int uORBTest::UnitTest::test_queue_poll_notify()
}
}
if (_num_messages_sent != next_expected_val) {
if (_num_messages_sent.load() != next_expected_val) {
return test_fail("number of sent and received messages mismatch (sent: %i, received: %i)",
_num_messages_sent, next_expected_val);
_num_messages_sent.load(), next_expected_val);
}
return test_note("PASS orb queuing (poll & notify), got %i messages", next_expected_val);
@ -1056,7 +1056,7 @@ int uORBTest::UnitTest::latency_test(bool print)
char *const args[1] = { nullptr };
pubsubtest_print = print;
pubsubtest_passed = false;
pubsubtest_passed.store(false);
/* test pub / sub latency */
@ -1071,7 +1071,7 @@ int uORBTest::UnitTest::latency_test(bool print)
args);
/* give the test task some data */
while (!pubsubtest_passed) {
while (!pubsubtest_passed.load()) {
++t.val;
t.timestamp = hrt_absolute_time();
@ -1089,7 +1089,7 @@ int uORBTest::UnitTest::latency_test(bool print)
orb_unadvertise(pfd0);
return pubsubtest_res;
return pubsubtest_res.load();
}
int uORBTest::UnitTest::test_fail(const char *fmt, ...)

View File

@ -42,6 +42,7 @@
#include <uORB/topics/orb_test_medium.h>
#include <uORB/topics/orb_test_large.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include <px4_platform_common/posix.h>
#include <px4_platform_common/time.h>
@ -87,11 +88,11 @@ private:
static int pub_test_multi2_entry(int argc, char *argv[]);
int pub_test_multi2_main();
volatile bool _thread_should_exit;
px4::atomic_bool _thread_should_exit{false};
bool pubsubtest_passed{false};
px4::atomic_bool pubsubtest_passed{false};
bool pubsubtest_print{false};
int pubsubtest_res = OK;
px4::atomic<int> pubsubtest_res{OK};
orb_advert_t _pfd[4] {}; ///< used for test_multi and test_multi_reversed
@ -113,7 +114,7 @@ private:
static int pub_test_queue_entry(int argc, char *argv[]);
int pub_test_queue_main();
int test_queue_poll_notify();
volatile int _num_messages_sent = 0;
px4::atomic<int> _num_messages_sent{0};
int test_fail(const char *fmt, ...);
int test_note(const char *fmt, ...);

View File

@ -162,13 +162,13 @@ static void hrt_latency_update()
/* bounded buckets */
for (index = 0; index < LATENCY_BUCKET_COUNT; index++) {
if (latency <= latency_buckets[index]) {
latency_counters[index]++;
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
return;
}
}
/* catch-all at the end */
latency_counters[index]++;
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
}
/*