Compare commits

...

21 Commits

Author SHA1 Message Date
Julian Oes 1520aa5220 microbench: fixup stack warning 2026-02-19 06:07:18 +13:00
Julian Oes 30a774b77d test: avoid triggering TSAN printing status 2026-02-19 06:07:17 +13:00
Julian Oes 5dbf62cd11 build: disable fuzztest when building with TSAN
fuzztest's coverage instrumentation is incompatible with Thread
Sanitizer. Add px4_setup_gtest_without_fuzztest() macro to
cmake/px4_add_gtest.cmake that fetches GTest standalone and stubs out
fuzztest cmake functions. Guard all fuzztest-specific code on
TARGET fuzztest::fuzztest so it compiles cleanly without fuzztest.
2026-02-19 06:07:16 +13:00
Julian Oes 64ddfebfc6 lockstep_scheduler: fix TSAN lock-order-inversion and test correctness
Split set_absolute_time() into two phases: iterate the waiter list under
_timed_waits_mutex, then broadcast under a separate _broadcast_mutex.
This eliminates the lock-order-inversion cycle between cond_timedwait()
(holds passed_lock -> acquires _timed_waits_mutex) and set_absolute_time()
(held _timed_waits_mutex -> acquired passed_lock).

Fix tests to have each thread lock its own mutex before calling
cond_timedwait, as required by POSIX (the calling thread must own the
mutex passed to pthread_cond_wait). The previous cross-thread ownership
caused TSAN's deadlock detector to overflow its 64-entry limit.
2026-02-19 06:07:15 +13:00
Julian Oes ecc789fc05 boards: add 6X test target 2026-02-19 06:07:14 +13:00
Julian Oes 3cd9a56f1b simulation: fix TSAN issue 2026-02-18 21:03:45 +13:00
Julian Oes 98287f2b11 sensors: fix data race in VehicleAngularVelocity sensor selection
Move registerCallback() after all member variable writes in
SensorSelectionUpdate() and reorder Start() to complete initial
sensor selection before registering the sensor_selection callback.
This prevents Run() from reading partially-written members like
_selected_sensor_device_id and _filter_sample_rate_hz on the work
queue thread while Start() is still initializing them.

This fixes TSAN issues in unit tests.
2026-02-18 21:03:44 +13:00
Julian Oes bc0bdb8a84 SITL: properly shutdown SITL unit tests 2026-02-18 21:03:43 +13:00
Julian Oes e8937a98a5 dataman: fix TSAN issues 2026-02-18 21:03:42 +13:00
Julian Oes d6c8514fea px4_work_queue: properly clean up
Fixes TSAN issues in unit tets.
2026-02-18 21:03:41 +13:00
Julian Oes 2ad0b297ca mavlink: fix TSAN issues 2026-02-18 21:03:40 +13:00
Julian Oes 17ac592cd2 logger: fix TSAN issue 2026-02-18 21:03:39 +13:00
Julian Oes 2c833f5ae2 parameters: fix TSAN issue with AtomicTransaction
This fixes TSAN issues on destruction of AtomicTransaction happening
during the unit tests.

The static _MutexHolder instance wraps a pthread_mutex_t in a C++ class,
creating a destructor ordering problem during process exit when worker
threads may still hold the lock. Replace with a raw pthread_mutex_t
initialized via pthread_once, eliminating the C++ destructor entirely.
2026-02-18 21:03:38 +13:00
Julian Oes b56f2dbcbd perf: use atomics to prevent TSAN issues 2026-02-18 21:03:37 +13:00
Julian Oes a757a7c29d posix: you can't join yourself 2026-02-18 21:03:35 +13:00
Julian Oes 20669aba6f uORB: join threads in unit tests 2026-02-18 21:03:34 +13:00
Julian Oes 261cc16077 drv_hrt: fix TSAN issues 2026-02-18 21:03:33 +13:00
Julian Oes 9074c81838 px4_log: fix TSAN issue using atomic 2026-02-18 21:03:32 +13:00
Julian Oes 3680775e5c px4_platform: fix TSAN issues 2026-02-18 21:03:31 +13:00
Julian Oes 22a32eaa55 uORB: fix TSAN issues using atomics 2026-02-18 12:43:02 +13:00
Julian Oes 91e25df7c4 parameters: fix data races in DynamicSparseLayer
Fix two concurrency bugs found with ThreadSanitizer:

1. size() and byteSize() read _next_slot/_n_slots without the mutex,
   racing with store(). Add AtomicTransaction to both.

2. _grow() with concurrent writers causes buffer overflow: _n_slots is
   read outside the lock for malloc sizing but inside the lock for
   memcpy, so another thread's growth can make memcpy exceed the
   allocation. On POSIX, hold the lock throughout _grow() since
   malloc/free work fine under a mutex. Keep the CAS-based approach
   on NuttX where malloc can't be called with IRQs disabled.

Add DynamicSparseLayerTest with concurrent stress tests that reproduce
both issues.

This issue was originally found while running the new SIH based CI
tests.
2026-02-16 20:59:12 +13:00
51 changed files with 1005 additions and 395 deletions
+22
View File
@@ -0,0 +1,22 @@
CONFIG_COMMON_DISTANCE_SENSOR=n
CONFIG_COMMON_TELEMETRY=n
CONFIG_DRIVERS_ADC_ADS1115=n
CONFIG_DRIVERS_CAMERA_CAPTURE=n
CONFIG_DRIVERS_CAMERA_TRIGGER=n
CONFIG_DRIVERS_IMU_ANALOG_DEVICES_ADIS16448=n
CONFIG_DRIVERS_IRLOCK=n
CONFIG_MODULES_AIRSPEED_SELECTOR=n
CONFIG_MODULES_GIMBAL=n
CONFIG_MODULES_VTOL_ATT_CONTROL=n
CONFIG_MODULES_FW_ATT_CONTROL=n
CONFIG_MODULES_FW_AUTOTUNE_ATTITUDE_CONTROL=n
CONFIG_MODULES_FW_MODE_MANAGER=n
CONFIG_MODULES_FW_LATERAL_LONGITUDINAL_CONTROL=n
CONFIG_MODULES_FW_RATE_CONTROL=n
CONFIG_MODULES_FW_AUTOTUNE_ATTITUDE_CONTROL=n
CONFIG_MODULES_MC_AUTOTUNE_ATTITUDE_CONTROL=n
CONFIG_MODULES_PAYLOAD_DELIVERER=n
CONFIG_MODULES_SIMULATION_SIMULATOR_SIH=n
CONFIG_BOARD_TESTING=y
CONFIG_DRIVERS_TEST_PPM=y
CONFIG_SYSTEMCMDS_MICROBENCH=y
+46 -17
View File
@@ -31,6 +31,30 @@
#
############################################################################
#=============================================================================
#
# px4_setup_gtest_without_fuzztest
#
# Fetches GTest standalone and stubs out fuzztest cmake functions.
# Used when fuzztest is not available (e.g. TSAN builds where fuzztest's
# coverage instrumentation is incompatible).
#
macro(px4_setup_gtest_without_fuzztest)
include(FetchContent)
FetchContent_Declare(
googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG v1.16.0
)
FetchContent_MakeAvailable(googletest)
function(link_fuzztest name)
target_link_libraries(${name} PRIVATE gtest gtest_main)
endfunction()
macro(fuzztest_setup_fuzzing_flags)
endmacro()
endmacro()
#=============================================================================
#
# px4_add_unit_gtest
@@ -98,23 +122,28 @@ function(px4_add_functional_gtest)
add_executable(${TESTNAME} EXCLUDE_FROM_ALL ${SRC} ${EXTRA_SRCS})
# link the libary to test and gtest
target_link_libraries(${TESTNAME} PRIVATE ${LINKLIBS} gtest_functional_main
px4_layer
px4_platform
uORB
systemlib
cdev
px4_work_queue
px4_daemon
work_queue
parameters
events
perf
tinybson
uorb_msgs
fuzztest::fuzztest # Do not use link_fuzztest() here because that
# also links to fuzztest_gtest_main
test_stubs) # put test_stubs last
set(_FUNCTIONAL_GTEST_LIBS ${LINKLIBS} gtest_functional_main
px4_layer
px4_platform
uORB
systemlib
cdev
px4_work_queue
px4_daemon
work_queue
parameters
events
perf
tinybson
uorb_msgs)
if(TARGET fuzztest::fuzztest)
list(APPEND _FUNCTIONAL_GTEST_LIBS fuzztest::fuzztest) # Do not use link_fuzztest() here because that
# also links to fuzztest_gtest_main
else()
list(APPEND _FUNCTIONAL_GTEST_LIBS gtest)
endif()
list(APPEND _FUNCTIONAL_GTEST_LIBS test_stubs) # put test_stubs last
target_link_libraries(${TESTNAME} PRIVATE ${_FUNCTIONAL_GTEST_LIBS})
target_compile_definitions(${TESTNAME} PRIVATE MODULE_NAME="${TESTNAME}")
@@ -40,6 +40,8 @@
#pragma once
#include <px4_platform_common/atomic.h>
namespace px4
{
@@ -48,17 +50,17 @@ class AppState
public:
~AppState() {}
AppState() : _exitRequested(false), _isRunning(false) {}
AppState() {}
bool exitRequested() { return _exitRequested; }
void requestExit() { _exitRequested = true; }
bool exitRequested() { return _exitRequested.load(); }
void requestExit() { _exitRequested.store(true); }
bool isRunning() { return _isRunning; }
void setRunning(bool running) { _isRunning = running; }
bool isRunning() { return _isRunning.load(); }
void setRunning(bool running) { _isRunning.store(running); }
protected:
bool _exitRequested;
bool _isRunning;
px4::atomic_bool _exitRequested{false};
px4::atomic_bool _isRunning{false};
private:
AppState(const AppState &);
const AppState &operator=(const AppState &);
@@ -95,7 +95,9 @@ public:
} else
#endif // __PX4_NUTTX
{
return __atomic_load_n(&_value, __ATOMIC_SEQ_CST);
T val;
__atomic_load(&_value, &val, __ATOMIC_SEQ_CST);
return val;
}
}
@@ -69,7 +69,7 @@ public:
void Run();
void request_stop() { _should_exit.store(true); }
void request_stop() { _should_exit.store(true); SignalWorkerThread(); }
void print_status(bool last = false);
@@ -80,7 +80,14 @@ private:
bool should_exit() const { return _should_exit.load(); }
inline void SignalWorkerThread();
inline void SignalWorkerThread()
{
int sem_val;
if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) {
px4_sem_post(&_process_lock);
}
}
#ifdef __PX4_NUTTX
// In NuttX work can be enqueued from an ISR
@@ -167,6 +167,9 @@ __EXPORT px4_task_t px4_task_spawn_cmd(const char *name,
/** Deletes a task - does not do resource cleanup **/
__EXPORT int px4_task_delete(px4_task_t pid);
/** Wait for a task to exit. Returns 0 on success. Only supported on POSIX. **/
__EXPORT int px4_task_join(px4_task_t pid);
/** Send a signal to a task **/
__EXPORT int px4_task_kill(px4_task_t pid, int sig);
+7 -4
View File
@@ -50,12 +50,13 @@
#include <uORB/uORB.h>
#include <uORB/topics/log_message.h>
#include <drivers/drv_hrt.h>
#include <px4_platform_common/atomic.h>
#if defined(BOARD_ENABLE_LOG_HISTORY)
static LogHistory g_log_history;
#endif
static orb_advert_t orb_log_message_pub = nullptr;
static px4::atomic<orb_advert_t> orb_log_message_pub{nullptr};
__EXPORT const char *__px4_log_level_str[_PX4_LOG_LEVEL_PANIC + 1] = { "DEBUG", "INFO", "WARN", "ERROR", "PANIC" };
@@ -74,7 +75,7 @@ void px4_log_initialize(void)
log_message.severity = 6; // info
strcpy((char *)log_message.text, "initialized uORB logging");
log_message.timestamp = hrt_absolute_time();
orb_log_message_pub = orb_advertise(ORB_ID(log_message), &log_message);
orb_log_message_pub.store(orb_advertise(ORB_ID(log_message), &log_message));
}
__EXPORT void px4_log_modulename(int level, const char *module_name, const char *fmt, ...)
@@ -178,7 +179,9 @@ __EXPORT void px4_log_modulename(int level, const char *module_name, const char
}
/* publish an orb log message */
if (level >= _PX4_LOG_LEVEL_INFO && orb_log_message_pub) { //publish all messages
orb_advert_t orb_log_pub = orb_log_message_pub.load();
if (level >= _PX4_LOG_LEVEL_INFO && orb_log_pub) { //publish all messages
log_message_s log_message;
@@ -199,7 +202,7 @@ __EXPORT void px4_log_modulename(int level, const char *module_name, const char
va_end(argptr);
log_message.text[max_length - 1] = 0; //ensure 0-termination
log_message.timestamp = hrt_absolute_time();
orb_publish(ORB_ID(log_message), orb_log_message_pub, &log_message);
orb_publish(ORB_ID(log_message), orb_log_pub, &log_message);
}
}
@@ -144,15 +144,6 @@ void WorkQueue::Add(WorkItem *item)
SignalWorkerThread();
}
void WorkQueue::SignalWorkerThread()
{
int sem_val;
if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) {
px4_sem_post(&_process_lock);
}
}
void WorkQueue::Remove(WorkItem *item)
{
work_lock();
@@ -62,6 +62,14 @@ static BlockingQueue<const wq_config_t *, 1> *_wq_manager_create_queue{nullptr};
static px4::atomic_bool _wq_manager_should_exit{true};
static px4::atomic_bool _wq_manager_running{false};
static px4_task_t _wq_manager_task_id{-1};
#ifdef __PX4_POSIX
#include <pthread.h>
static constexpr int WQ_MAX_THREADS = 16;
static pthread_t _wq_threads[WQ_MAX_THREADS] {};
static int _wq_thread_count = 0;
#endif
static WorkQueue *
@@ -326,6 +334,14 @@ WorkQueueManagerRun(int, char **)
if (ret_create == 0) {
PX4_DEBUG("starting: %s, priority: %d, stack: %zu bytes", wq->name, param.sched_priority, stacksize);
#ifdef __PX4_POSIX
if (_wq_thread_count < WQ_MAX_THREADS) {
_wq_threads[_wq_thread_count++] = thread;
}
#endif
} else {
PX4_ERR("failed to create thread for %s (%i): %s", wq->name, ret_create, strerror(ret_create));
}
@@ -375,16 +391,16 @@ WorkQueueManagerStart()
_wq_manager_should_exit.store(false);
int task_id = px4_task_spawn_cmd("wq:manager",
SCHED_DEFAULT,
SCHED_PRIORITY_MAX,
PX4_STACK_ADJUSTED(1280),
(px4_main_t)&WorkQueueManagerRun,
nullptr);
_wq_manager_task_id = px4_task_spawn_cmd("wq:manager",
SCHED_DEFAULT,
SCHED_PRIORITY_MAX,
PX4_STACK_ADJUSTED(1280),
(px4_main_t)&WorkQueueManagerRun,
nullptr);
if (task_id < 0) {
if (_wq_manager_task_id < 0) {
_wq_manager_should_exit.store(true);
PX4_ERR("task start failed (%i)", task_id);
PX4_ERR("task start failed (%i)", _wq_manager_task_id);
return -errno;
}
@@ -413,20 +429,12 @@ WorkQueueManagerStop()
{
if (!_wq_manager_should_exit.load()) {
// error can't shutdown until all WorkItems are removed/stopped
if (_wq_manager_running.load() && (_wq_manager_wqs_list->size() > 0)) {
PX4_ERR("can't shutdown with active WQs");
WorkQueueManagerStatus();
return PX4_ERROR;
}
// first ask all WQs to stop
if (_wq_manager_wqs_list != nullptr) {
{
LockGuard lg{_wq_manager_wqs_list->mutex()};
// ask all work queues (threads) to stop
// NOTE: not currently safe without all WorkItems stopping first
for (WorkQueue *wq : *_wq_manager_wqs_list) {
wq->request_stop();
}
@@ -441,18 +449,31 @@ WorkQueueManagerStop()
_wq_manager_wqs_list = nullptr;
}
// signal wq:manager thread to exit
_wq_manager_should_exit.store(true);
if (_wq_manager_create_queue != nullptr) {
// push nullptr to wake the wq manager task
_wq_manager_create_queue->push(nullptr);
px4_usleep(10000);
// wait for wq:manager thread to finish
px4_task_join(_wq_manager_task_id);
_wq_manager_task_id = -1;
delete _wq_manager_create_queue;
_wq_manager_create_queue = nullptr;
}
#ifdef __PX4_POSIX
// join all work queue threads (safe now that wq:manager has exited)
for (int i = 0; i < _wq_thread_count; i++) {
pthread_join(_wq_threads[i], nullptr);
}
_wq_thread_count = 0;
#endif
} else {
PX4_WARN("not running");
return PX4_ERROR;
+2 -2
View File
@@ -55,7 +55,7 @@ bool Subscription::subscribe()
if (node) {
_node = node;
_last_generation = initial_generation;
_last_generation.store(initial_generation);
return true;
}
}
@@ -70,7 +70,7 @@ void Subscription::unsubscribe()
}
_node = nullptr;
_last_generation = 0;
_last_generation.store(0);
}
bool Subscription::ChangeInstance(uint8_t instance)
+12 -5
View File
@@ -41,6 +41,7 @@
#include <uORB/uORB.h>
#include <uORB/topics/uORBTopics.hpp>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include <lib/mathlib/mathlib.h>
@@ -131,7 +132,7 @@ public:
bool updated()
{
if (subscribe()) {
return Manager::updates_available(_node, _last_generation);
return Manager::updates_available(_node, _last_generation.load());
}
return false;
@@ -144,7 +145,10 @@ public:
bool update(void *dst)
{
if (subscribe()) {
return Manager::orb_data_copy(_node, dst, _last_generation, true);
unsigned gen = _last_generation.load();
bool ret = Manager::orb_data_copy(_node, dst, gen, true);
_last_generation.store(gen);
return ret;
}
return false;
@@ -157,7 +161,10 @@ public:
bool copy(void *dst)
{
if (subscribe()) {
return Manager::orb_data_copy(_node, dst, _last_generation, false);
unsigned gen = _last_generation.load();
bool ret = Manager::orb_data_copy(_node, dst, gen, false);
_last_generation.store(gen);
return ret;
}
return false;
@@ -170,7 +177,7 @@ public:
bool ChangeInstance(uint8_t instance);
uint8_t get_instance() const { return _instance; }
unsigned get_last_generation() const { return _last_generation; }
unsigned get_last_generation() const { return _last_generation.load(); }
orb_id_t get_topic() const { return get_orb_meta(_orb_id); }
ORB_ID orb_id() const { return _orb_id; }
@@ -184,7 +191,7 @@ protected:
void *_node{nullptr};
unsigned _last_generation{0}; /**< last generation the subscriber has seen */
px4::atomic<unsigned> _last_generation{0}; /**< last generation the subscriber has seen */
ORB_ID _orb_id{ORB_ID::INVALID};
uint8_t _instance{0};
@@ -67,7 +67,9 @@ public:
void call() override
{
// signal immediately if no interval, otherwise only if interval has elapsed
if ((_interval_us == 0) || (hrt_elapsed_time(&_last_update) >= _interval_us)) {
hrt_abstime last_update = _last_update.load();
if ((_interval_us == 0) || (hrt_elapsed_time(&last_update) >= _interval_us)) {
pthread_cond_signal(&_cv);
}
}
@@ -163,8 +163,10 @@ public:
void call() override
{
// schedule immediately if updated (queue depth or subscription interval)
if ((_required_updates == 0)
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= _required_updates)) {
uint8_t req = _required_updates.load();
if ((req == 0)
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= req)) {
if (updated()) {
_work_item->ScheduleNow();
}
@@ -179,13 +181,13 @@ public:
void set_required_updates(uint8_t required_updates)
{
// TODO: constrain to queue depth?
_required_updates = required_updates;
_required_updates.store(required_updates);
}
private:
px4::WorkItem *_work_item;
uint8_t _required_updates{0};
px4::atomic<uint8_t> _required_updates{0};
};
} // namespace uORB
@@ -38,7 +38,9 @@ namespace uORB
bool SubscriptionInterval::updated()
{
if (advertised() && (hrt_elapsed_time(&_last_update) >= _interval_us)) {
hrt_abstime last_update = _last_update.load();
if (advertised() && (hrt_elapsed_time(&last_update) >= _interval_us)) {
return _subscription.updated();
}
@@ -62,10 +64,10 @@ bool SubscriptionInterval::copy(void *dst)
// make sure we don't set a timestamp before the timer started counting (now - _interval_us would wrap because it's unsigned)
if (now > _interval_us) {
// shift last update time forward, but don't let it get further behind than the interval
_last_update = math::constrain(_last_update + _interval_us, now - _interval_us, now);
_last_update.store(math::constrain(_last_update.load() + _interval_us, now - _interval_us, now));
} else {
_last_update = now;
_last_update.store(now);
}
return true;
@@ -39,6 +39,7 @@
#pragma once
#include <uORB/uORB.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include "uORBDeviceNode.hpp"
@@ -132,11 +133,11 @@ public:
* Set the last data update
* @param t should be in range [now, now - _interval_us]
*/
void set_last_update(hrt_abstime t) { _last_update = t; }
void set_last_update(hrt_abstime t) { _last_update.store(t); }
protected:
Subscription _subscription;
uint64_t _last_update{0}; // last subscription update in microseconds
px4::atomic<uint64_t> _last_update{0}; // last subscription update in microseconds
uint32_t _interval_us{0}; // maximum update interval in microseconds
};
+15 -13
View File
@@ -57,7 +57,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t inst
uORB::DeviceNode::~DeviceNode()
{
free(_data);
free(_data.load());
const char *devname = get_devname();
@@ -148,7 +148,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
*
* Note that filp will usually be NULL.
*/
if (nullptr == _data) {
if (nullptr == _data.load()) {
#ifdef __PX4_NUTTX
@@ -158,13 +158,15 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
lock();
/* re-check size */
if (nullptr == _data) {
if (nullptr == _data.load()) {
const size_t data_size = _meta->o_size * _meta->o_queue;
_data = (uint8_t *) px4_cache_aligned_alloc(data_size);
uint8_t *data = (uint8_t *) px4_cache_aligned_alloc(data_size);
if (_data) {
memset(_data, 0, data_size);
if (data) {
memset(data, 0, data_size);
}
_data.store(data);
}
unlock();
@@ -175,7 +177,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
#endif /* __PX4_NUTTX */
/* failed or could not allocate */
if (nullptr == _data) {
if (nullptr == _data.load()) {
return -ENOMEM;
}
}
@@ -190,7 +192,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
unsigned generation = _generation.fetch_add(1);
memcpy(_data + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
memcpy(_data.load() + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
// callbacks
for (auto item : _callbacks) {
@@ -232,7 +234,7 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
return PX4_OK;
case ORBIOCISADVERTISED:
*(unsigned long *)arg = _advertised;
*(unsigned long *)arg = _advertised.load();
return PX4_OK;
@@ -310,7 +312,7 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle)
* of subscribers and publishers. But we also do not have a leak since future
* publishers reuse the same DeviceNode object.
*/
devnode->_advertised = false;
devnode->_advertised.store(false);
return PX4_OK;
}
@@ -347,7 +349,7 @@ uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t even
bool
uORB::DeviceNode::print_statistics(int max_topic_length)
{
if (!_advertised) {
if (!_advertised.load()) {
return false;
}
@@ -411,10 +413,10 @@ int16_t uORB::DeviceNode::process_add_subscription()
// send the data to the remote entity.
uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator();
if (_data != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
if (_data.load() != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
// Only send the most recent data to initialize the remote end.
if (_data_valid) {
ch->send_message(_meta->o_name, _meta->o_size, _data + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
ch->send_message(_meta->o_name, _meta->o_size, _data.load() + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
}
}
+7 -7
View File
@@ -175,9 +175,9 @@ public:
*
* This is used in the case of multi_pub/sub to check if it's valid to advertise
* and publish to this node or if another node should be tried. */
bool is_advertised() const { return _advertised; }
bool is_advertised() const { return _advertised.load(); }
void mark_as_advertised() { _advertised = true; }
void mark_as_advertised() { _advertised.store(true); }
/**
* Print statistics
@@ -224,10 +224,10 @@ public:
*/
bool copy(void *dst, unsigned &generation)
{
if ((dst != nullptr) && (_data != nullptr)) {
if ((dst != nullptr) && (_data.load() != nullptr)) {
if (_meta->o_queue == 1) {
ATOMIC_ENTER;
memcpy(dst, _data, _meta->o_size);
memcpy(dst, _data.load(), _meta->o_size);
generation = _generation.load();
ATOMIC_LEAVE;
return true;
@@ -249,7 +249,7 @@ public:
generation = current_generation - _meta->o_queue;
}
memcpy(dst, _data + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
memcpy(dst, _data.load() + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
ATOMIC_LEAVE;
++generation;
@@ -279,13 +279,13 @@ private:
const orb_metadata *_meta; /**< object metadata information */
uint8_t *_data{nullptr}; /**< allocated object buffer */
px4::atomic<uint8_t *> _data{nullptr}; /**< allocated object buffer */
bool _data_valid{false}; /**< At least one valid data */
px4::atomic<unsigned> _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;
const uint8_t _instance; /**< orb multi instance identifier */
bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
px4::atomic_bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
int8_t _subscriber_count{0};
+11 -11
View File
@@ -52,25 +52,25 @@
pthread_mutex_t uORB::Manager::_communicator_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
uORB::Manager *uORB::Manager::_Instance = nullptr;
px4::atomic<uORB::Manager *> uORB::Manager::_Instance{nullptr};
bool uORB::Manager::initialize()
{
if (_Instance == nullptr) {
_Instance = new uORB::Manager();
if (_Instance.load() == nullptr) {
_Instance.store(new uORB::Manager());
}
#if defined(__PX4_NUTTX) && !defined(CONFIG_BUILD_FLAT) && defined(__KERNEL__)
px4_register_boardct_ioctl(_ORBIOCDEVBASE, orb_ioctl);
#endif
return _Instance != nullptr;
return _Instance.load() != nullptr;
}
bool uORB::Manager::terminate()
{
if (_Instance != nullptr) {
delete _Instance;
_Instance = nullptr;
if (_Instance.load() != nullptr) {
delete _Instance.load();
_Instance.store(nullptr);
return true;
}
@@ -277,14 +277,14 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
if (_publisher_rule.ignore_other_topics) {
if (!findTopic(_publisher_rule, meta->o_name)) {
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
return (orb_advert_t)_Instance;
return (orb_advert_t)_Instance.load();
}
}
} else {
if (findTopic(_publisher_rule, meta->o_name)) {
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
return (orb_advert_t)_Instance;
return (orb_advert_t)_Instance.load();
}
}
}
@@ -337,7 +337,7 @@ int uORB::Manager::orb_unadvertise(orb_advert_t handle)
{
#ifdef ORB_USE_PUBLISHER_RULES
if (handle == _Instance) {
if (handle == _Instance.load()) {
return PX4_OK; //pretend success
}
@@ -366,7 +366,7 @@ int uORB::Manager::orb_publish(const struct orb_metadata *meta, orb_advert_t han
{
#ifdef ORB_USE_PUBLISHER_RULES
if (handle == _Instance) {
if (handle == _Instance.load()) {
return PX4_OK; //pretend success
}
+2 -2
View File
@@ -184,7 +184,7 @@ public:
* Make sure initialize() is called first.
* @return uORB::Manager*
*/
static uORB::Manager *get_instance() { return _Instance; }
static uORB::Manager *get_instance() { return _Instance.load(); }
/**
* Get the DeviceMaster. If it does not exist,
@@ -489,7 +489,7 @@ private: // class methods
int node_open(const struct orb_metadata *meta, bool advertiser, int *instance = nullptr);
private: // data members
static Manager *_Instance;
static px4::atomic<Manager *> _Instance;
#ifdef CONFIG_ORB_COMMUNICATOR
// the communicator channel instance.
@@ -144,7 +144,7 @@ int uORBTest::UnitTest::pubsublatency_main()
PX4_INFO("max: %3i us", timing_max);
PX4_INFO("missed topic updates: %i", num_missed);
pubsubtest_passed = true;
pubsubtest_passed.store(true);
#if defined(CONFIG_ARCH_BOARD_PX4_SITL)
// relaxed on SITL (non-realtime)
@@ -154,13 +154,13 @@ int uORBTest::UnitTest::pubsublatency_main()
#endif
if (mean > kMaxMeanUs) {
pubsubtest_res = PX4_ERROR;
pubsubtest_res.store(PX4_ERROR);
} else {
pubsubtest_res = PX4_OK;
pubsubtest_res.store(PX4_OK);
}
return pubsubtest_res;
return pubsubtest_res.load();
}
int uORBTest::UnitTest::test()
@@ -412,7 +412,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
pub = orb_advertise_multi(ORB_ID(orb_test_medium_multi), &data_topic, &idx);
if (idx != i) {
_thread_should_exit = true;
_thread_should_exit.store(true);
PX4_ERR("Got wrong instance! should be: %i, but is %i", i, idx);
return -1;
}
@@ -440,7 +440,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
}
px4_usleep(100 * 1000);
_thread_should_exit = true;
_thread_should_exit.store(true);
for (int i = 0; i < num_instances; ++i) {
orb_unadvertise(orb_pub[i]);
@@ -454,7 +454,7 @@ int uORBTest::UnitTest::test_multi2()
test_note("Testing multi-topic 2 test (queue simulation)");
//test: first subscribe, then advertise
_thread_should_exit = false;
_thread_should_exit.store(false);
const int num_instances = 3;
int orb_data_fd[num_instances] {-1, -1, -1};
int orb_data_next = 0;
@@ -478,7 +478,7 @@ int uORBTest::UnitTest::test_multi2()
hrt_abstime last_time[num_instances] {};
while (!_thread_should_exit) {
while (!_thread_should_exit.load()) {
px4_usleep(1000);
@@ -501,6 +501,8 @@ int uORBTest::UnitTest::test_multi2()
}
}
px4_task_join(pubsub_task);
for (int i = 0; i < num_instances; ++i) {
orb_unsubscribe(orb_data_fd[i]);
}
@@ -945,7 +947,7 @@ int uORBTest::UnitTest::pub_test_queue_main()
const int queue_size = orb_get_queue_size(ORB_ID(orb_test_medium_queue_poll));
if ((ptopic = orb_advertise(ORB_ID(orb_test_medium_queue_poll), &t)) == nullptr) {
_thread_should_exit = true;
_thread_should_exit.store(true);
return test_fail("advertise failed: %d", errno);
}
@@ -968,9 +970,9 @@ int uORBTest::UnitTest::pub_test_queue_main()
px4_usleep(20 * 1000); //give subscriber a chance to catch up
}
_num_messages_sent = t.val;
_num_messages_sent.store(t.val);
px4_usleep(100 * 1000);
_thread_should_exit = true;
_thread_should_exit.store(true);
orb_unadvertise(ptopic);
return 0;
@@ -987,7 +989,7 @@ int uORBTest::UnitTest::test_queue_poll_notify()
return test_fail("subscribe failed: %d", errno);
}
_thread_should_exit = false;
_thread_should_exit.store(false);
char *const args[1] = { nullptr };
int pubsub_task = px4_task_spawn_cmd("uorb_test_queue",
@@ -1006,12 +1008,12 @@ int uORBTest::UnitTest::test_queue_poll_notify()
fds[0].fd = sfd;
fds[0].events = POLLIN;
while (!_thread_should_exit) {
while (!_thread_should_exit.load()) {
int poll_ret = px4_poll(fds, 1, 500);
if (poll_ret == 0) {
if (_thread_should_exit) {
if (_thread_should_exit.load()) {
break;
}
@@ -1032,9 +1034,11 @@ int uORBTest::UnitTest::test_queue_poll_notify()
}
}
if (_num_messages_sent != next_expected_val) {
px4_task_join(pubsub_task);
if (_num_messages_sent.load() != next_expected_val) {
return test_fail("number of sent and received messages mismatch (sent: %i, received: %i)",
_num_messages_sent, next_expected_val);
_num_messages_sent.load(), next_expected_val);
}
return test_note("PASS orb queuing (poll & notify), got %i messages", next_expected_val);
@@ -1056,7 +1060,7 @@ int uORBTest::UnitTest::latency_test(bool print)
char *const args[1] = { nullptr };
pubsubtest_print = print;
pubsubtest_passed = false;
pubsubtest_passed.store(false);
/* test pub / sub latency */
@@ -1071,7 +1075,7 @@ int uORBTest::UnitTest::latency_test(bool print)
args);
/* give the test task some data */
while (!pubsubtest_passed) {
while (!pubsubtest_passed.load()) {
++t.val;
t.timestamp = hrt_absolute_time();
@@ -1087,9 +1091,10 @@ int uORBTest::UnitTest::latency_test(bool print)
return test_fail("failed launching task");
}
px4_task_join(pubsub_task);
orb_unadvertise(pfd0);
return pubsubtest_res;
return pubsubtest_res.load();
}
int uORBTest::UnitTest::test_fail(const char *fmt, ...)
@@ -42,6 +42,7 @@
#include <uORB/topics/orb_test_medium.h>
#include <uORB/topics/orb_test_large.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/defines.h>
#include <px4_platform_common/posix.h>
#include <px4_platform_common/time.h>
@@ -87,11 +88,11 @@ private:
static int pub_test_multi2_entry(int argc, char *argv[]);
int pub_test_multi2_main();
volatile bool _thread_should_exit;
px4::atomic_bool _thread_should_exit{false};
bool pubsubtest_passed{false};
px4::atomic_bool pubsubtest_passed{false};
bool pubsubtest_print{false};
int pubsubtest_res = OK;
px4::atomic<int> pubsubtest_res{OK};
orb_advert_t _pfd[4] {}; ///< used for test_multi and test_multi_reversed
@@ -113,7 +114,7 @@ private:
static int pub_test_queue_entry(int argc, char *argv[]);
int pub_test_queue_main();
int test_queue_poll_notify();
volatile int _num_messages_sent = 0;
px4::atomic<int> _num_messages_sent{0};
int test_fail(const char *fmt, ...);
int test_note(const char *fmt, ...);
+11
View File
@@ -40,6 +40,7 @@
#include <px4_platform_common/px4_config.h>
#include <px4_platform_common/log.h>
#include <px4_platform_common/tasks.h>
#include <px4_platform_common/time.h>
#include <nuttx/board.h>
#include <nuttx/kthread.h>
@@ -91,6 +92,16 @@ int px4_task_delete(int pid)
return task_delete(pid);
}
int px4_task_join(int pid)
{
// Wait for the task to exit by polling whether it still exists
while (kill(pid, 0) == 0) {
px4_usleep(10000);
}
return 0;
}
const char *px4_get_taskname(void)
{
#if CONFIG_TASK_NAME_SIZE > 0 && (defined(__KERNEL__) || defined(CONFIG_BUILD_FLAT))
+1 -1
View File
@@ -28,7 +28,7 @@ add_executable(px4
apps.cpp
)
if (BUILD_TESTING)
if (BUILD_TESTING AND TARGET fuzztest::fuzztest)
# Build mavlink fuzz tests. These run other modules and thus cannot be a functional/unit test
add_executable(mavlink_fuzz_tests EXCLUDE_FROM_ALL
src/px4/common/mavlink_fuzz_tests.cpp
+12 -6
View File
@@ -134,7 +134,10 @@ void hrt_store_absolute_time(volatile hrt_abstime *t)
*/
bool hrt_called(struct hrt_call *entry)
{
return (entry->deadline == 0);
hrt_lock();
bool result = (entry->deadline == 0);
hrt_unlock();
return result;
}
/*
@@ -162,13 +165,13 @@ static void hrt_latency_update()
/* bounded buckets */
for (index = 0; index < LATENCY_BUCKET_COUNT; index++) {
if (latency <= latency_buckets[index]) {
latency_counters[index]++;
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
return;
}
}
/* catch-all at the end */
latency_counters[index]++;
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
}
/*
@@ -416,12 +419,15 @@ hrt_call_invoke()
call->deadline = 0;
/* invoke the callout (if there is one) */
if (call->callout) {
hrt_callout callout = call->callout;
void *arg = call->arg;
if (callout) {
// Unlock so we don't deadlock in callback
hrt_unlock();
//PX4_INFO("call %p: %p(%p)", call, call->callout, call->arg);
call->callout(call->arg);
//PX4_INFO("call %p: %p(%p)", call, callout, arg);
callout(arg);
hrt_lock();
}
@@ -36,4 +36,9 @@ set(SRCS
)
px4_add_library(gtest_functional_main ${SRCS})
target_link_libraries(gtest_functional_main PUBLIC gtest fuzztest::init_fuzztest)
if(TARGET fuzztest::init_fuzztest)
target_link_libraries(gtest_functional_main PUBLIC gtest fuzztest::init_fuzztest)
target_compile_definitions(gtest_functional_main PRIVATE HAVE_FUZZTEST)
else()
target_link_libraries(gtest_functional_main PUBLIC gtest)
endif()
@@ -32,7 +32,9 @@
****************************************************************************/
#include <gtest/gtest.h>
#ifdef HAVE_FUZZTEST
#include <fuzztest/init_fuzztest.h>
#endif
#include <uORB/Subscription.hpp>
@@ -42,8 +44,10 @@ int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
#ifdef HAVE_FUZZTEST
fuzztest::ParseAbslFlags(argc, argv);
fuzztest::InitFuzzTest(&argc, &argv);
#endif
uORB::Manager::initialize();
param_init();
@@ -86,7 +86,7 @@ private:
pthread_cond_t *passed_cond{nullptr};
pthread_mutex_t *passed_lock{nullptr};
uint64_t time_us{0};
bool timeout{false};
std::atomic<bool> timeout{false};
std::atomic<bool> done{false};
std::atomic<bool> removed{true};
@@ -98,6 +98,7 @@ private:
std::atomic<uint64_t> _time_us{0};
TimedWait *_timed_waits{nullptr}; ///< head of linked list
std::mutex _timed_waits_mutex;
std::mutex _timed_waits_mutex; ///< protects _timed_waits linked list
std::mutex _broadcast_mutex; ///< protects the broadcast phase in set_absolute_time()
std::atomic<bool> _setting_time{false}; ///< true if set_absolute_time() is currently being executed
};
@@ -55,6 +55,18 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
_time_us = time_us;
// Collect entries that need wakeup while holding _timed_waits_mutex,
// then broadcast outside of it to avoid lock-order-inversion with
// cond_timedwait() (which holds passed_lock while acquiring _timed_waits_mutex).
struct WakeUp {
pthread_cond_t *cond;
pthread_mutex_t *lock;
};
static constexpr int MAX_WAKEUPS = 32;
WakeUp wakeups[MAX_WAKEUPS];
int num_wakeups = 0;
{
std::unique_lock<std::mutex> lock_timed_waits(_timed_waits_mutex);
_setting_time = true;
@@ -81,17 +93,29 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
if (timed_wait->time_us <= time_us &&
!timed_wait->timeout) {
// We are abusing the condition here to signal that the time
// has passed.
pthread_mutex_lock(timed_wait->passed_lock);
// Mark as timed out and collect for broadcast
timed_wait->timeout = true;
pthread_cond_broadcast(timed_wait->passed_cond);
pthread_mutex_unlock(timed_wait->passed_lock);
if (num_wakeups < MAX_WAKEUPS) {
wakeups[num_wakeups++] = {timed_wait->passed_cond, timed_wait->passed_lock};
}
}
timed_wait_prev = timed_wait;
timed_wait = timed_wait->next;
}
}
// Broadcast under _broadcast_mutex (not _timed_waits_mutex) so that
// locking passed_lock here doesn't create an ordering cycle.
{
std::lock_guard<std::mutex> lock_broadcast(_broadcast_mutex);
for (int i = 0; i < num_wakeups; ++i) {
pthread_mutex_lock(wakeups[i].lock);
pthread_cond_broadcast(wakeups[i].cond);
pthread_mutex_unlock(wakeups[i].lock);
}
_setting_time = false;
}
@@ -101,7 +125,7 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
{
// A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be
// longer. And using thread_local is more efficient than malloc.
static thread_local TimedWait timed_wait;
static thread_local TimedWait timed_wait{};
{
std::lock_guard<std::mutex> lock_timed_waits(_timed_waits_mutex);
@@ -144,9 +168,13 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
// deadlock due to a different locking order in set_absolute_time().
// Note that this case does not happen too frequently, and thus can be
// a bit more expensive.
// We wait for both the iteration phase (_timed_waits_mutex) and the
// broadcast phase (_broadcast_mutex) to complete.
pthread_mutex_unlock(lock);
_timed_waits_mutex.lock();
_timed_waits_mutex.unlock();
_broadcast_mutex.lock();
_broadcast_mutex.unlock();
pthread_mutex_lock(lock);
}
@@ -65,11 +65,12 @@ void test_condition_timing_out()
ls.set_absolute_time(some_time_us);
std::atomic<bool> should_have_timed_out{false};
pthread_mutex_lock(&lock);
// Use a thread to wait for condition while we already have the lock.
// This ensures the synchronization happens in the right order.
// The thread that calls cond_timedwait must also lock the mutex,
// otherwise set_absolute_time() would re-lock the main thread's
// own mutex in its broadcast phase, causing a deadlock.
TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() {
pthread_mutex_lock(&lock);
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), ETIMEDOUT);
EXPECT_TRUE(should_have_timed_out);
// It should be re-locked afterwards, so we should be able to unlock it.
@@ -100,17 +101,25 @@ void test_locked_semaphore_getting_unlocked()
LockstepScheduler ls;
ls.set_absolute_time(some_time_us);
pthread_mutex_lock(&lock);
// Use a thread to wait for condition while we already have the lock.
// This ensures the synchronization happens in the right order.
TestThread thread([&ls, &cond, &lock]() {
std::atomic<bool> thread_locked{false};
// The thread locks the mutex itself (POSIX requires the calling thread
// to own the mutex passed to pthread_cond_wait).
TestThread thread([&ls, &cond, &lock, &thread_locked]() {
pthread_mutex_lock(&lock);
thread_locked = true;
ls.set_absolute_time(some_time_us + 500);
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), 0);
// It should be re-locked afterwards, so we should be able to unlock it.
EXPECT_EQ(pthread_mutex_unlock(&lock), 0);
});
// Wait until thread has locked the mutex. Our pthread_mutex_lock below
// will then block until cond_timedwait releases it via pthread_cond_wait.
while (!thread_locked) {
std::this_thread::yield();
}
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
@@ -142,8 +151,9 @@ public:
void run()
{
pthread_mutex_lock(&_lock);
_thread = std::make_shared<TestThread>([this]() {
pthread_mutex_lock(&_lock);
_thread_ready = true;
_result = _ls.cond_timedwait(&_cond, &_lock, _timeout);
pthread_mutex_unlock(&_lock);
});
@@ -151,7 +161,7 @@ public:
void check()
{
if (_is_done) {
if (_is_done || !_thread_ready) {
return;
}
@@ -186,6 +196,7 @@ private:
pthread_mutex_t _lock;
LockstepScheduler &_ls;
std::atomic<bool> _is_done{false};
std::atomic<bool> _thread_ready{false};
std::atomic<int> _result {INITIAL_RESULT};
std::shared_ptr<TestThread> _thread{};
};
@@ -310,7 +321,7 @@ void test_usleep()
TEST(LockstepScheduler, All)
{
for (unsigned iteration = 1; iteration <= 100; ++iteration) {
//std::cout << "Test iteration: " << iteration << "\n";
std::cout << "Test iteration: " << iteration << "\n";
test_absolute_time();
test_condition_timing_out();
test_locked_semaphore_getting_unlocked();
+15 -1
View File
@@ -274,6 +274,21 @@ px4_task_t px4_task_spawn_cmd(const char *name, int scheduler, int priority, int
return taskid;
}
int px4_task_join(px4_task_t id)
{
if (id < PX4_MAX_TASKS) {
pthread_mutex_lock(&task_mutex);
pthread_t pid = taskmap[id].pid;
pthread_mutex_unlock(&task_mutex);
if (pid != 0) {
return pthread_join(pid, nullptr);
}
}
return -EINVAL;
}
int px4_task_delete(px4_task_t id)
{
int rv = 0;
@@ -291,7 +306,6 @@ int px4_task_delete(px4_task_t id)
// If current thread then exit, otherwise cancel
if (pthread_self() == pid) {
pthread_join(pid, nullptr);
taskmap[id].isused = false;
pthread_mutex_unlock(&task_mutex);
pthread_exit(nullptr);
@@ -51,7 +51,7 @@ sleep 10
logger off
sensors status
#sensors status # triggers TSAN
listener sensor_gyro
listener sensor_gyro_fifo
listener sensor_gyro_fft
@@ -13,4 +13,10 @@ pwm_out_sim start
tests @test_name@
pwm_out_sim stop
tone_alarm stop
work_queue stop
simulator_mavlink stop
dataman stop
shutdown
+2 -2
View File
@@ -272,14 +272,14 @@ static inline uint16_t get_latency_bucket_count(void) { return LATENCY_BUCKET_CO
static inline latency_info_t get_latency(uint16_t bucket_idx, uint16_t counter_idx)
{
latency_info_t ret = {latency_buckets[bucket_idx], latency_counters[counter_idx]};
latency_info_t ret = {latency_buckets[bucket_idx], __atomic_load_n(&latency_counters[counter_idx], __ATOMIC_RELAXED)};
return ret;
}
static inline void reset_latency_counters(void)
{
for (int i = 0; i <= get_latency_bucket_count(); i++) {
latency_counters[i] = 0;
__atomic_store_n(&latency_counters[i], 0, __ATOMIC_RELAXED);
}
}
+13 -11
View File
@@ -49,14 +49,16 @@ px4_add_module(
module.yaml
)
px4_add_functional_gtest(SRC septentrio_fuzz_tests.cpp
LINKLIBS
driver__septentrio
COMPILE_FLAGS
# There warnings come from within fuzztest
-Wno-float-equal
-Wno-sign-compare
-Wno-shadow
-Wno-extra
-Wno-non-template-friend
)
if(TARGET fuzztest::fuzztest)
px4_add_functional_gtest(SRC septentrio_fuzz_tests.cpp
LINKLIBS
driver__septentrio
COMPILE_FLAGS
# There warnings come from within fuzztest
-Wno-float-equal
-Wno-sign-compare
-Wno-shadow
-Wno-extra
-Wno-non-template-friend
)
endif()
+1
View File
@@ -223,3 +223,4 @@ if(${PX4_PLATFORM} STREQUAL "posix" OR ${PX4_PLATFORM} STREQUAL "ros2")
endif()
px4_add_functional_gtest(SRC ParameterTest.cpp LINKLIBS parameters)
px4_add_functional_gtest(SRC DynamicSparseLayerTest.cpp LINKLIBS parameters)
+33 -4
View File
@@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (c) 2023 PX4 Development Team. All rights reserved.
* Copyright (c) 2023-2026 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -144,11 +144,13 @@ public:
int size() const override
{
const AtomicTransaction transaction;
return _next_slot;
}
int byteSize() const override
{
const AtomicTransaction transaction;
return _n_slots * sizeof(Slot);
}
@@ -197,23 +199,25 @@ private:
return false;
}
#ifdef __PX4_NUTTX
int max_retries = 5;
// As malloc uses locking, so we need to re-enable IRQ's during malloc/free and
// then atomically exchange the buffer
// On NuttX, malloc/free can't be called with IRQs disabled,
// so we unlock around them and use CAS to handle races.
while (_next_slot >= _n_slots && max_retries-- > 0) {
Slot *previous_slots = nullptr;
Slot *new_slots = nullptr;
do {
previous_slots = _slots.load();
int alloc_n_slots = _n_slots;
transaction.unlock();
if (new_slots) {
free(new_slots);
}
new_slots = (Slot *) malloc(sizeof(Slot) * (_n_slots + _n_grow));
new_slots = (Slot *) malloc(sizeof(Slot) * (alloc_n_slots + _n_grow));
transaction.lock();
if (new_slots == nullptr) {
@@ -235,6 +239,31 @@ private:
transaction.lock();
}
#else
// On POSIX, malloc/free work fine under the mutex, so we can
// hold the lock throughout and avoid CAS/ABA race conditions.
if (_next_slot >= _n_slots) {
Slot *old = _slots.load();
Slot *new_slots = (Slot *)malloc(sizeof(Slot) * (_n_slots + _n_grow));
if (new_slots == nullptr) {
return false;
}
memcpy(new_slots, old, sizeof(Slot) * _n_slots);
for (int i = _n_slots; i < _n_slots + _n_grow; i++) {
new_slots[i] = {UINT16_MAX, param_value_u{}};
}
_slots.store(new_slots);
_n_slots += _n_grow;
free(old);
}
#endif
return _next_slot < _n_slots;
}
@@ -0,0 +1,245 @@
/****************************************************************************
*
* Copyright (c) 2026 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/**
* @file DynamicSparseLayerTest.cpp
*
* Concurrent stress tests for DynamicSparseLayer.
*
* Validates thread safety of store/get/contains under concurrent access,
* especially during _grow() operations. Designed to be run with ASAN/TSan
* to catch use-after-free or data races.
*/
#include <gtest/gtest.h>
#include <parameters/px4_parameters.hpp>
#include "DynamicSparseLayer.h"
#include <atomic>
#include <thread>
#include <vector>
/**
* Minimal ParamLayer parent that returns a default zero value for all params.
*/
class StubParamLayer : public ParamLayer
{
public:
StubParamLayer() : ParamLayer(nullptr) {}
bool store(param_t, param_value_u) override { return false; }
bool contains(param_t) const override { return false; }
px4::AtomicBitset<PARAM_COUNT> containedAsBitset() const override
{
return px4::AtomicBitset<PARAM_COUNT>();
}
param_value_u get(param_t) const override
{
param_value_u v{};
v.i = 0;
return v;
}
void reset(param_t) override {}
void refresh(param_t) override {}
int size() const override { return 0; }
int byteSize() const override { return 0; }
};
class DynamicSparseLayerConcurrentTest : public ::testing::Test
{
protected:
StubParamLayer stub;
};
// Single writer forces frequent _grow() while readers concurrently access
// stored params. With only one writer there is no concurrent growth, so
// this safely stresses the read-side during buffer reallocation.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentStoreGetRace)
{
for (int rep = 0; rep < 50; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
constexpr int NUM_PARAMS = 100;
std::atomic<bool> stop{false};
std::thread writer([&]() {
for (int i = 0; i < NUM_PARAMS; i++) {
param_value_u v{};
v.i = i * 10;
layer.store(static_cast<param_t>(i), v);
}
stop.store(true, std::memory_order_release);
});
std::vector<std::thread> readers;
for (int r = 0; r < 4; r++) {
readers.emplace_back([&]() {
while (!stop.load(std::memory_order_acquire)) {
int current_size = layer.size();
for (int i = 0; i < current_size && i < NUM_PARAMS; i++) {
layer.contains(static_cast<param_t>(i));
layer.get(static_cast<param_t>(i));
}
}
});
}
writer.join();
for (auto &t : readers) {
t.join();
}
for (int i = 0; i < NUM_PARAMS; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
ASSERT_EQ(layer.size(), NUM_PARAMS);
}
}
// Multiple writers store to disjoint param ranges concurrently.
// Pre-allocated to avoid concurrent _grow() which has a known ABA
// limitation in the CAS retry loop (see _grow() implementation).
// This test validates concurrent store correctness.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentMultipleWriters)
{
constexpr int PARAMS_PER_WRITER = 50;
constexpr int NUM_WRITERS = 4;
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
for (int rep = 0; rep < 20; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
std::vector<std::thread> writers;
for (int w = 0; w < NUM_WRITERS; w++) {
writers.emplace_back([&layer, w]() {
int base = w * PARAMS_PER_WRITER;
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
param_value_u v{};
v.i = (base + i) * 10;
layer.store(static_cast<param_t>(base + i), v);
}
});
}
for (auto &t : writers) {
t.join();
}
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
for (int i = 0; i < TOTAL; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
}
}
// Combined stress: writers store while readers access concurrently.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentWritersAndReaders)
{
constexpr int PARAMS_PER_WRITER = 50;
constexpr int NUM_WRITERS = 2;
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
for (int rep = 0; rep < 20; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
std::atomic<bool> stop{false};
std::vector<std::thread> writers;
for (int w = 0; w < NUM_WRITERS; w++) {
writers.emplace_back([&layer, w]() {
int base = w * PARAMS_PER_WRITER;
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
param_value_u v{};
v.i = (base + i) * 10;
layer.store(static_cast<param_t>(base + i), v);
}
});
}
std::vector<std::thread> readers;
for (int r = 0; r < 4; r++) {
readers.emplace_back([&]() {
while (!stop.load(std::memory_order_acquire)) {
int current_size = layer.size();
for (int i = 0; i < current_size && i < TOTAL; i++) {
layer.contains(static_cast<param_t>(i));
layer.get(static_cast<param_t>(i));
}
}
});
}
for (auto &t : writers) {
t.join();
}
stop.store(true, std::memory_order_release);
for (auto &t : readers) {
t.join();
}
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
for (int i = 0; i < TOTAL; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
}
}
+17 -1
View File
@@ -34,5 +34,21 @@
#include "atomic_transaction.h"
#ifdef __PX4_POSIX
_MutexHolder AtomicTransaction::_mutex_holder = _MutexHolder {};
static pthread_mutex_t _param_mutex;
static pthread_once_t _param_mutex_once = PTHREAD_ONCE_INIT;
static void _init_param_mutex()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&_param_mutex, &attr);
pthread_mutexattr_destroy(&attr);
}
pthread_mutex_t *AtomicTransaction::_get_mutex()
{
pthread_once(&_param_mutex_once, _init_param_mutex);
return &_param_mutex;
}
#endif
+3 -22
View File
@@ -39,25 +39,6 @@
#ifdef __PX4_POSIX
#include <pthread.h>
class _MutexHolder
{
public:
pthread_mutex_t _mutex;
pthread_mutexattr_t _mutex_attr;
_MutexHolder()
{
pthread_mutexattr_init(&_mutex_attr);
pthread_mutexattr_settype(&_mutex_attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&_mutex, &_mutex_attr);
}
~_MutexHolder()
{
pthread_mutex_destroy(&_mutex);
}
};
#endif
@@ -69,7 +50,7 @@ private:
#endif
#ifdef __PX4_POSIX
static _MutexHolder _mutex_holder;
static pthread_mutex_t *_get_mutex();
#endif
public:
@@ -89,7 +70,7 @@ public:
_irq_state = px4_enter_critical_section();
#endif
#ifdef __PX4_POSIX
pthread_mutex_lock(&_mutex_holder._mutex);
pthread_mutex_lock(_get_mutex());
#endif
}
@@ -99,7 +80,7 @@ public:
px4_leave_critical_section(_irq_state);
#endif
#ifdef __PX4_POSIX
pthread_mutex_unlock(&_mutex_holder._mutex);
pthread_mutex_unlock(_get_mutex());
#endif
}
};
+143 -110
View File
@@ -48,6 +48,8 @@
#include "perf_counter.h"
#include <px4_platform_common/atomic.h>
/**
* Header common to all counters.
*/
@@ -61,34 +63,34 @@ struct perf_ctr_header {
* PC_EVENT counter.
*/
struct perf_ctr_count : public perf_ctr_header {
uint64_t event_count{0};
px4::atomic<uint64_t> event_count;
};
/**
* PC_ELAPSED counter.
*/
struct perf_ctr_elapsed : public perf_ctr_header {
uint64_t event_count{0};
uint64_t time_start{0};
uint64_t time_total{0};
uint32_t time_least{0};
uint32_t time_most{0};
float mean{0.0f};
float M2{0.0f};
px4::atomic<uint64_t> event_count;
px4::atomic<uint64_t> time_start;
px4::atomic<uint64_t> time_total;
px4::atomic<uint32_t> time_least;
px4::atomic<uint32_t> time_most;
px4::atomic<float> mean;
px4::atomic<float> M2;
};
/**
* PC_INTERVAL counter.
*/
struct perf_ctr_interval : public perf_ctr_header {
uint64_t event_count{0};
uint64_t time_event{0};
uint64_t time_first{0};
uint64_t time_last{0};
uint32_t time_least{0};
uint32_t time_most{0};
float mean{0.0f};
float M2{0.0f};
px4::atomic<uint64_t> event_count;
px4::atomic<uint64_t> time_event;
px4::atomic<uint64_t> time_first;
px4::atomic<uint64_t> time_last;
px4::atomic<uint32_t> time_least;
px4::atomic<uint32_t> time_most;
px4::atomic<float> mean;
px4::atomic<float> M2;
};
/**
@@ -100,12 +102,8 @@ static sq_queue_t perf_counters = { nullptr, nullptr };
* mutex protecting access to the perf_counters linked list (which is read from & written to by different threads)
*/
pthread_mutex_t perf_counters_mutex = PTHREAD_MUTEX_INITIALIZER;
// FIXME: the mutex does **not** protect against access to/from the perf
// counter's data. It can still happen that a counter is updated while it is
// printed. This can lead to inconsistent output, or completely bogus values
// (especially the 64bit values which are in general not atomically updated).
// The same holds for shared perf counters (perf_alloc_once), that can be updated
// concurrently (this affects the 'ctrl_latency' counter).
// Note: the mutex only protects the linked list, not individual counter data.
// All counter field accesses use px4::atomic to prevent torn reads/writes.
perf_counter_t
@@ -208,7 +206,7 @@ perf_count(perf_counter_t handle)
switch (handle->type) {
case PC_COUNT:
((struct perf_ctr_count *)handle)->event_count++;
((struct perf_ctr_count *)handle)->event_count.fetch_add(1);
break;
case PC_INTERVAL:
@@ -229,7 +227,7 @@ perf_begin(perf_counter_t handle)
switch (handle->type) {
case PC_ELAPSED:
((struct perf_ctr_elapsed *)handle)->time_start = hrt_absolute_time();
((struct perf_ctr_elapsed *)handle)->time_start.store(hrt_absolute_time());
break;
default:
@@ -247,9 +245,10 @@ perf_end(perf_counter_t handle)
switch (handle->type) {
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
hrt_abstime ts = pce->time_start.load();
if (pce->time_start != 0) {
perf_set_elapsed(handle, hrt_elapsed_time(&pce->time_start));
if (ts != 0) {
perf_set_elapsed(handle, hrt_absolute_time() - ts);
}
}
break;
@@ -271,25 +270,33 @@ perf_set_elapsed(perf_counter_t handle, int64_t elapsed)
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
if (elapsed >= 0) {
pce->event_count++;
pce->time_total += elapsed;
uint64_t ec = pce->event_count.fetch_add(1) + 1;
pce->time_total.fetch_add((uint64_t)elapsed);
if ((pce->time_least > (uint32_t)elapsed) || (pce->time_least == 0)) {
pce->time_least = elapsed;
uint32_t tl = pce->time_least.load();
if ((tl > (uint32_t)elapsed) || (tl == 0)) {
pce->time_least.store((uint32_t)elapsed);
}
if (pce->time_most < (uint32_t)elapsed) {
pce->time_most = elapsed;
uint32_t tm = pce->time_most.load();
if (tm < (uint32_t)elapsed) {
pce->time_most.store((uint32_t)elapsed);
}
// maintain mean and variance of the elapsed time in seconds
// Knuth/Welford recursive mean and variance of update intervals (via Wikipedia)
float dt = elapsed / 1e6f;
float delta_intvl = dt - pce->mean;
pce->mean += delta_intvl / pce->event_count;
pce->M2 += delta_intvl * (dt - pce->mean);
float mean_val = pce->mean.load();
float delta_intvl = dt - mean_val;
mean_val += delta_intvl / ec;
pce->mean.store(mean_val);
float m2_val = pce->M2.load();
m2_val += delta_intvl * (dt - mean_val);
pce->M2.store(m2_val);
pce->time_start = 0;
pce->time_start.store((uint64_t)0);
}
}
break;
@@ -309,42 +316,55 @@ perf_count_interval(perf_counter_t handle, hrt_abstime now)
switch (handle->type) {
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
uint64_t ec = pci->event_count.load();
switch (pci->event_count) {
switch (ec) {
case 0:
pci->time_first = now;
pci->time_first.store(now);
break;
case 1:
pci->time_least = (uint32_t)(now - pci->time_last);
pci->time_most = (uint32_t)(now - pci->time_last);
pci->mean = pci->time_least / 1e6f;
pci->M2 = 0;
break;
case 1: {
uint64_t tl = pci->time_last.load();
uint32_t interval = (uint32_t)(now - tl);
pci->time_least.store(interval);
pci->time_most.store(interval);
pci->mean.store(interval / 1e6f);
pci->M2.store(0.0f);
break;
}
default: {
hrt_abstime interval = now - pci->time_last;
uint64_t tl = pci->time_last.load();
hrt_abstime interval = now - tl;
if ((uint32_t)interval < pci->time_least) {
pci->time_least = (uint32_t)interval;
uint32_t least = pci->time_least.load();
if ((uint32_t)interval < least) {
pci->time_least.store((uint32_t)interval);
}
if ((uint32_t)interval > pci->time_most) {
pci->time_most = (uint32_t)interval;
uint32_t most = pci->time_most.load();
if ((uint32_t)interval > most) {
pci->time_most.store((uint32_t)interval);
}
// maintain mean and variance of interval in seconds
// Knuth/Welford recursive mean and variance of update intervals (via Wikipedia)
float dt = interval / 1e6f;
float delta_intvl = dt - pci->mean;
pci->mean += delta_intvl / pci->event_count;
pci->M2 += delta_intvl * (dt - pci->mean);
float mean_val = pci->mean.load();
float delta_intvl = dt - mean_val;
mean_val += delta_intvl / ec;
pci->mean.store(mean_val);
float m2_val = pci->M2.load();
m2_val += delta_intvl * (dt - mean_val);
pci->M2.store(m2_val);
break;
}
}
pci->time_last = now;
pci->event_count++;
pci->time_last.store(now);
pci->event_count.fetch_add(1);
break;
}
@@ -362,7 +382,7 @@ perf_set_count(perf_counter_t handle, uint64_t count)
switch (handle->type) {
case PC_COUNT: {
((struct perf_ctr_count *)handle)->event_count = count;
((struct perf_ctr_count *)handle)->event_count.store(count);
}
break;
@@ -382,8 +402,7 @@ perf_cancel(perf_counter_t handle)
switch (handle->type) {
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
pce->time_start = 0;
pce->time_start.store((uint64_t)0);
}
break;
@@ -401,31 +420,31 @@ perf_reset(perf_counter_t handle)
switch (handle->type) {
case PC_COUNT:
((struct perf_ctr_count *)handle)->event_count = 0;
((struct perf_ctr_count *)handle)->event_count.store((uint64_t)0);
break;
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
pce->event_count = 0;
pce->time_start = 0;
pce->time_total = 0;
pce->time_least = 0;
pce->time_most = 0;
pce->mean = 0.0f;
pce->M2 = 0.0f;
pce->event_count.store((uint64_t)0);
pce->time_start.store((uint64_t)0);
pce->time_total.store((uint64_t)0);
pce->time_least.store((uint32_t)0);
pce->time_most.store((uint32_t)0);
pce->mean.store(0.0f);
pce->M2.store(0.0f);
break;
}
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
pci->event_count = 0;
pci->time_event = 0;
pci->time_first = 0;
pci->time_last = 0;
pci->time_least = 0;
pci->time_most = 0;
pci->mean = 0.0f;
pci->M2 = 0.0f;
pci->event_count.store((uint64_t)0);
pci->time_event.store((uint64_t)0);
pci->time_first.store((uint64_t)0);
pci->time_last.store((uint64_t)0);
pci->time_least.store((uint32_t)0);
pci->time_most.store((uint32_t)0);
pci->mean.store(0.0f);
pci->M2.store(0.0f);
break;
}
}
@@ -439,38 +458,43 @@ perf_print_counter(perf_counter_t handle)
}
switch (handle->type) {
case PC_COUNT:
PX4_INFO_RAW("%s: %" PRIu64 " events\n",
handle->name,
((struct perf_ctr_count *)handle)->event_count);
break;
case PC_COUNT: {
uint64_t ec = ((struct perf_ctr_count *)handle)->event_count.load();
PX4_INFO_RAW("%s: %" PRIu64 " events\n",
handle->name, ec);
break;
}
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
float rms = sqrtf(pce->M2 / (pce->event_count - 1));
uint64_t ec = pce->event_count.load();
uint64_t tt = pce->time_total.load();
uint32_t tl = pce->time_least.load();
uint32_t tm = pce->time_most.load();
float m2 = pce->M2.load();
float rms = sqrtf(m2 / (ec - 1));
PX4_INFO_RAW("%s: %" PRIu64 " events, %" PRIu64 "us elapsed, %.2fus avg, min %" PRIu32 "us max %" PRIu32
"us %5.3fus rms\n",
handle->name,
pce->event_count,
pce->time_total,
(pce->event_count == 0) ? 0 : (double)pce->time_total / (double)pce->event_count,
pce->time_least,
pce->time_most,
(double)(1e6f * rms));
handle->name, ec, tt,
(ec == 0) ? 0 : (double)tt / (double)ec,
tl, tm, (double)(1e6f * rms));
break;
}
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
float rms = sqrtf(pci->M2 / (pci->event_count - 1));
uint64_t ec = pci->event_count.load();
uint64_t tf = pci->time_first.load();
uint64_t tla = pci->time_last.load();
uint32_t tl = pci->time_least.load();
uint32_t tm = pci->time_most.load();
float m2 = pci->M2.load();
float rms = sqrtf(m2 / (ec - 1));
PX4_INFO_RAW("%s: %" PRIu64 " events, %.2fus avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms\n",
handle->name,
pci->event_count,
(pci->event_count == 0) ? 0 : (double)(pci->time_last - pci->time_first) / (double)pci->event_count,
pci->time_least,
pci->time_most,
(double)(1e6f * rms));
handle->name, ec,
(ec == 0) ? 0 : (double)(tla - tf) / (double)ec,
tl, tm, (double)(1e6f * rms));
break;
}
@@ -493,35 +517,44 @@ perf_print_counter_buffer(char *buffer, int length, perf_counter_t handle)
case PC_COUNT:
num_written = snprintf(buffer, length, "%s: %" PRIu64 " events",
handle->name,
((struct perf_ctr_count *)handle)->event_count);
((struct perf_ctr_count *)handle)->event_count.load());
break;
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
float rms = sqrtf(pce->M2 / (pce->event_count - 1));
uint64_t ec = pce->event_count.load();
uint64_t tt = pce->time_total.load();
uint32_t tl = pce->time_least.load();
uint32_t tm = pce->time_most.load();
float m2 = pce->M2.load();
float rms = sqrtf(m2 / (ec - 1));
num_written = snprintf(buffer, length,
"%s: %" PRIu64 " events, %" PRIu64 "us elapsed, %.2fus avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms",
handle->name,
pce->event_count,
pce->time_total,
(pce->event_count == 0) ? 0 : (double)pce->time_total / (double)pce->event_count,
pce->time_least,
pce->time_most,
ec,
tt,
(ec == 0) ? 0 : (double)tt / (double)ec,
tl,
tm,
(double)(1e6f * rms));
break;
}
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
float rms = sqrtf(pci->M2 / (pci->event_count - 1));
uint64_t ec = pci->event_count.load();
uint32_t tl = pci->time_least.load();
uint32_t tm = pci->time_most.load();
float m2 = pci->M2.load();
float rms = sqrtf(m2 / (ec - 1));
num_written = snprintf(buffer, length,
"%s: %" PRIu64 " events, %.2f avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms",
handle->name,
pci->event_count,
(pci->event_count == 0) ? 0 : (double)(pci->time_last - pci->time_first) / (double)pci->event_count,
pci->time_least,
pci->time_most,
ec,
(ec == 0) ? 0 : (double)(pci->time_last.load() - pci->time_first.load()) / (double)ec,
tl,
tm,
(double)(1e6f * rms));
break;
}
@@ -543,16 +576,16 @@ perf_event_count(perf_counter_t handle)
switch (handle->type) {
case PC_COUNT:
return ((struct perf_ctr_count *)handle)->event_count;
return ((struct perf_ctr_count *)handle)->event_count.load();
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
return pce->event_count;
return pce->event_count.load();
}
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
return pci->event_count;
return pci->event_count.load();
}
default:
@@ -572,12 +605,12 @@ perf_mean(perf_counter_t handle)
switch (handle->type) {
case PC_ELAPSED: {
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
return pce->mean;
return pce->mean.load();
}
case PC_INTERVAL: {
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
return pci->mean;
return pci->mean.load();
}
default:
+21 -14
View File
@@ -57,6 +57,8 @@
#include <uORB/topics/dataman_request.h>
#include <uORB/topics/dataman_response.h>
#include <px4_platform_common/atomic.h>
#include "dataman.h"
__BEGIN_DECLS
@@ -126,7 +128,7 @@ static struct {
uint8_t *data_end;
} ram;
};
bool running;
px4::atomic_bool running{false};
bool silence = false;
} dm_operations_data;
@@ -172,13 +174,14 @@ static enum {
static px4_sem_t g_init_sema;
static bool g_task_should_exit; /**< if true, dataman task should exit */
static px4::atomic_bool g_task_should_exit{false}; /**< if true, dataman task should exit */
static px4_task_t g_task_id = -1; /**< dataman task handle for join */
/* Work queue management functions */
static bool is_running()
{
return dm_operations_data.running;
return dm_operations_data.running.load();
}
/* Calculate the offset in file of specific item */
@@ -607,7 +610,7 @@ _file_initialize(unsigned max_offset)
g_dm_ops->write(DM_KEY_SAFE_POINTS_STATE, 0, reinterpret_cast<uint8_t *>(&stats), sizeof(mission_stats_entry_s));
}
dm_operations_data.running = true;
dm_operations_data.running.store(true);
return 0;
}
@@ -627,7 +630,7 @@ _ram_initialize(unsigned max_offset)
memset(dm_operations_data.ram.data, 0, max_offset);
dm_operations_data.ram.data_end = &dm_operations_data.ram.data[max_offset - 1];
dm_operations_data.running = true;
dm_operations_data.running.store(true);
return 0;
}
@@ -637,7 +640,7 @@ static void
_file_shutdown()
{
close(dm_operations_data.file.fd);
dm_operations_data.running = false;
dm_operations_data.running.store(false);
}
#endif
@@ -645,7 +648,7 @@ static void
_ram_shutdown()
{
free(dm_operations_data.ram.data);
dm_operations_data.running = false;
dm_operations_data.running.store(false);
}
static int
@@ -683,7 +686,7 @@ task_main(int argc, char *argv[])
g_func_counts[i] = 0;
}
g_task_should_exit = false;
g_task_should_exit.store(false);
uORB::Publication<dataman_response_s> dataman_response_pub{ORB_ID(dataman_response)};
const int dataman_request_sub = orb_subscribe(ORB_ID(dataman_request));
@@ -698,7 +701,7 @@ task_main(int argc, char *argv[])
int ret = g_dm_ops->initialize(max_offset);
if (ret) {
g_task_should_exit = true;
g_task_should_exit.store(true);
goto end;
}
@@ -825,7 +828,7 @@ task_main(int argc, char *argv[])
}
/* time to go???? */
if (g_task_should_exit) {
if (g_task_should_exit.load()) {
break;
}
}
@@ -856,9 +859,11 @@ start()
px4_sem_setprotocol(&g_init_sema, SEM_PRIO_NONE);
/* start the worker thread with low priority for disk IO */
if (px4_task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT - 10,
PX4_STACK_ADJUSTED(TASK_STACK_SIZE), task_main,
nullptr) < 0) {
g_task_id = px4_task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT - 10,
PX4_STACK_ADJUSTED(TASK_STACK_SIZE), task_main,
nullptr);
if (g_task_id < 0) {
px4_sem_destroy(&g_init_sema);
PX4_ERR("task start failed");
return -1;
@@ -887,7 +892,7 @@ static void
stop()
{
/* Tell the worker task to shut down */
g_task_should_exit = true;
g_task_should_exit.store(true);
}
static void
@@ -1017,6 +1022,8 @@ dataman_main(int argc, char *argv[])
if (!strcmp(argv[1], "stop")) {
stop();
px4_task_join(g_task_id);
g_task_id = -1;
#ifdef CONFIG_DATAMAN_PERSISTENT_STORAGE
free(k_data_manager_device_path);
k_data_manager_device_path = nullptr;
+1 -1
View File
@@ -1134,7 +1134,7 @@ bool Logger::start_stop_logging()
}
}
desired_state = desired_state || _manually_logging_override;
desired_state = desired_state || _manually_logging_override.load();
// only start/stop if this is a state transition
if (updated && _prev_file_log_start_state != desired_state) {
+2 -2
View File
@@ -144,7 +144,7 @@ public:
void print_statistics(LogType type);
void set_arm_override(bool override) { _manually_logging_override = override; }
void set_arm_override(bool override) { _manually_logging_override.store(override); }
void trigger_watchdog_now()
{
@@ -345,7 +345,7 @@ private:
LogFileName _file_name[(int)LogType::Count];
bool _prev_file_log_start_state{false}; ///< previous state depending on logging mode (arming or aux1 state)
bool _manually_logging_override{false};
px4::atomic_bool _manually_logging_override{false};
Statistics _statistics[(int)LogType::Count];
hrt_abstime _last_sync_time{0}; ///< last time a sync msg was sent
+99 -20
View File
@@ -89,11 +89,73 @@ events::EventBuffer *Mavlink::_event_buffer = nullptr;
Mavlink *mavlink_module_instances[MAVLINK_COMM_NUM_BUFFERS] {};
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length) { mavlink_module_instances[chan]->send_bytes(ch, length); }
void mavlink_start_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_start(length); }
void mavlink_end_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_finish(); }
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return mavlink_module_instances[channel]->get_status(); }
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return mavlink_module_instances[channel]->get_buffer(); }
// Per-channel status, buffer, and send mutex live in static arrays so they survive instance teardown.
// This prevents use-after-free when the MAVLink C library accesses channel status/buffer during a send
// that races with instance destruction.
static mavlink_status_t mavlink_channel_statuses[MAVLINK_COMM_NUM_BUFFERS] {};
static mavlink_message_t mavlink_channel_buffers[MAVLINK_COMM_NUM_BUFFERS] {};
// Recursive mutex: allows callers to hold lock_send() while send_start()/send_finish()
// re-lock internally (called from _mav_finalize_message_chan_send via MAVLINK_START_UART_SEND).
pthread_mutex_t mavlink_channel_send_mutexes[MAVLINK_COMM_NUM_BUFFERS];
static pthread_once_t mavlink_channel_mutexes_once = PTHREAD_ONCE_INIT;
static void init_channel_send_mutexes()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
pthread_mutex_init(&mavlink_channel_send_mutexes[i], &attr);
}
pthread_mutexattr_destroy(&attr);
}
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_bytes(ch, length); }
}
void mavlink_start_uart_send(mavlink_channel_t chan, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_start(length); }
}
void mavlink_end_uart_send(mavlink_channel_t chan, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_finish(); }
}
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return &mavlink_channel_statuses[channel]; }
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return &mavlink_channel_buffers[channel]; }
mavlink_status_t *Mavlink::get_status()
{
if (_instance_id >= 0) {
return &mavlink_channel_statuses[_instance_id];
}
return &_mavlink_status_early;
}
mavlink_message_t *Mavlink::get_buffer()
{
if (_instance_id >= 0) {
return &mavlink_channel_buffers[_instance_id];
}
return &_mavlink_buffer_early;
}
void Mavlink::lock_send() { pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]); }
void Mavlink::unlock_send() { pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]); }
static void usage();
@@ -245,6 +307,8 @@ bool Mavlink::set_channel()
bool
Mavlink::set_instance_id()
{
pthread_once(&mavlink_channel_mutexes_once, init_channel_send_mutexes);
LockGuard lg{mavlink_module_mutex};
// instance count
@@ -263,6 +327,10 @@ Mavlink::set_instance_id()
for (int instance_id = 0; instance_id < MAVLINK_COMM_NUM_BUFFERS; instance_id++) {
if (mavlink_module_instances[instance_id] == nullptr) {
// Copy early-init status/buffer to static per-channel arrays
mavlink_channel_statuses[instance_id] = _mavlink_status_early;
mavlink_channel_buffers[instance_id] = _mavlink_buffer_early;
mavlink_module_instances[instance_id] = this;
_instance_id = instance_id;
mavlink_instance_count.fetch_add(1);
@@ -377,7 +445,12 @@ Mavlink::destroy_all_instances()
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
if (mavlink_module_instances[i] != nullptr) {
Mavlink *inst = mavlink_module_instances[i];
// Lock the channel send mutex to ensure no in-flight sends
pthread_mutex_lock(&mavlink_channel_send_mutexes[i]);
mavlink_module_instances[i] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[i]);
mavlink_instance_count.fetch_sub(1);
delete inst;
}
@@ -744,7 +817,7 @@ Mavlink::get_free_tx_buf()
void Mavlink::send_start(int length)
{
pthread_mutex_lock(&_send_mutex);
pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]);
_last_write_try_time = hrt_absolute_time();
// check if there is space in the buffer
@@ -752,7 +825,9 @@ void Mavlink::send_start(int length)
// not enough space in buffer to send
count_txerrbytes(length);
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.tx_buffer_overruns++;
pthread_mutex_unlock(&_tstatus_mutex);
// prevent writes
_tx_buffer_low = true;
@@ -765,7 +840,7 @@ void Mavlink::send_start(int length)
void Mavlink::send_finish()
{
if (_tx_buffer_low || (_buf_fill == 0)) {
pthread_mutex_unlock(&_send_mutex);
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
return;
}
@@ -817,7 +892,9 @@ void Mavlink::send_finish()
#endif // MAVLINK_UDP
if (ret == (int)_buf_fill) {
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.tx_message_count++;
pthread_mutex_unlock(&_tstatus_mutex);
count_txbytes(_buf_fill);
_last_write_success_time = _last_write_try_time;
@@ -827,7 +904,7 @@ void Mavlink::send_finish()
_buf_fill = 0;
pthread_mutex_unlock(&_send_mutex);
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
}
void Mavlink::send_bytes(const uint8_t *buf, unsigned packet_len)
@@ -2277,8 +2354,8 @@ Mavlink::task_main(int argc, char *argv[])
pthread_mutex_init(&_mavlink_shell_mutex, nullptr);
pthread_mutex_init(&_message_buffer_mutex, nullptr);
pthread_mutex_init(&_send_mutex, nullptr);
pthread_mutex_init(&_radio_status_mutex, nullptr);
pthread_mutex_init(&_tstatus_mutex, nullptr);
/* if we are passing on mavlink messages, we need to prepare a buffer for this instance */
if (get_forwarding_on()) {
@@ -2502,20 +2579,16 @@ Mavlink::task_main(int argc, char *argv[])
if (_bytes_timestamp != 0) {
const float dt = (t - _bytes_timestamp) * 1e-6f;
_tstatus.tx_rate_avg = _bytes_tx / dt;
_tstatus.tx_error_rate_avg = _bytes_txerr / dt;
_tstatus.rx_rate_avg = _bytes_rx / dt;
_bytes_tx = 0;
_bytes_txerr = 0;
_bytes_rx = 0;
_tstatus.tx_rate_avg = _bytes_tx.fetch_and(0) / dt;
_tstatus.tx_error_rate_avg = _bytes_txerr.fetch_and(0) / dt;
_tstatus.rx_rate_avg = _bytes_rx.fetch_and(0) / dt;
}
_bytes_timestamp = t;
}
// publish status at 1 Hz, or sooner if HEARTBEAT has updated
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated) {
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated.load()) {
publish_telemetry_status();
}
@@ -2548,9 +2621,9 @@ Mavlink::task_main(int argc, char *argv[])
}
pthread_mutex_destroy(&_mavlink_shell_mutex);
pthread_mutex_destroy(&_send_mutex);
pthread_mutex_destroy(&_radio_status_mutex);
pthread_mutex_destroy(&_message_buffer_mutex);
pthread_mutex_destroy(&_tstatus_mutex);
PX4_INFO("exiting channel %i", (int)_channel);
@@ -2833,6 +2906,7 @@ void Mavlink::check_requested_subscriptions()
void Mavlink::publish_telemetry_status()
{
// many fields are populated in place
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.mode = _mode;
_tstatus.data_rate = _datarate;
@@ -2844,10 +2918,11 @@ void Mavlink::publish_telemetry_status()
_tstatus.streams = _streams.size();
// telemetry_status is also updated from the receiver thread, but never the same fields
_tstatus.timestamp = hrt_absolute_time();
_telemetry_status_pub.publish(_tstatus);
_tstatus_updated = false;
pthread_mutex_unlock(&_tstatus_mutex);
_tstatus_updated.store(false);
}
void Mavlink::configure_sik_radio()
@@ -2923,7 +2998,9 @@ int Mavlink::start_helper(int argc, char *argv[])
int instance_id = instance->get_instance_id();
if (instance_id >= 0) {
pthread_mutex_lock(&mavlink_channel_send_mutexes[instance_id]);
mavlink_module_instances[instance_id] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[instance_id]);
mavlink_instance_count.fetch_sub(1);
}
@@ -3241,7 +3318,9 @@ Mavlink::stop_command(int argc, char *argv[])
for (int mavlink_instance = 0; mavlink_instance < MAVLINK_COMM_NUM_BUFFERS; mavlink_instance++) {
if (mavlink_module_instances[mavlink_instance] == inst) {
pthread_mutex_lock(&mavlink_channel_send_mutexes[mavlink_instance]);
mavlink_module_instances[mavlink_instance] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[mavlink_instance]);
mavlink_instance_count.fetch_sub(1);
delete inst;
return PX4_OK;
+26 -19
View File
@@ -62,6 +62,7 @@
#include <parameters/param.h>
#include <lib/variable_length_ringbuffer/VariableLengthRingbuffer.hpp>
#include <perf/perf_counter.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/cli.h>
#include <px4_platform_common/px4_config.h>
#include <px4_platform_common/defines.h>
@@ -132,8 +133,8 @@ public:
static Mavlink *new_instance();
static Mavlink *get_instance_for_device(const char *device_name);
mavlink_message_t *get_buffer() { return &_mavlink_buffer; }
mavlink_status_t *get_status() { return &_mavlink_status; }
mavlink_message_t *get_buffer();
mavlink_status_t *get_status();
void setProtocolVersion(uint8_t version);
uint8_t getProtocolVersion() const { return _protocol_version; };
@@ -255,7 +256,7 @@ public:
bool get_forwarding_on() { return _forwarding_on; }
bool is_gcs_connected() { return _tstatus.heartbeat_type_gcs; }
bool is_gcs_connected() { lock_telemetry_status(); bool r = _tstatus.heartbeat_type_gcs; unlock_telemetry_status(); return r; }
#if defined(MAVLINK_UDP)
static Mavlink *get_instance_for_network_port(unsigned long port);
@@ -383,34 +384,39 @@ public:
float get_baudrate() { return _baudrate; }
/* Functions for waiting to start transmission until message received. */
void set_has_received_messages(bool received_messages) { _received_messages = received_messages; }
bool get_has_received_messages() { return _received_messages; }
void set_has_received_messages(bool received_messages) { _received_messages.store(received_messages); }
bool get_has_received_messages() { return _received_messages.load(); }
void set_wait_to_transmit(bool wait) { _wait_to_transmit = wait; }
bool get_wait_to_transmit() { return _wait_to_transmit; }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages.load()))); }
/**
* Count transmitted bytes
*/
void count_txbytes(unsigned n) { _bytes_tx += n; };
void count_txbytes(unsigned n) { _bytes_tx.fetch_add(n); };
/**
* Count bytes not transmitted because of errors
*/
void count_txerrbytes(unsigned n) { _bytes_txerr += n; };
void count_txerrbytes(unsigned n) { _bytes_txerr.fetch_add(n); };
/**
* Count received bytes
*/
void count_rxbytes(unsigned n) { _bytes_rx += n; };
void count_rxbytes(unsigned n) { _bytes_rx.fetch_add(n); };
/**
* Get the receive status of this MAVLink link
*/
telemetry_status_s &telemetry_status() { return _tstatus; }
void telemetry_status_updated() { _tstatus_updated = true; }
void lock_telemetry_status() { pthread_mutex_lock(&_tstatus_mutex); }
void unlock_telemetry_status() { pthread_mutex_unlock(&_tstatus_mutex); }
void telemetry_status_updated() { _tstatus_updated.store(true); }
void set_telemetry_status_type(uint8_t type) { _tstatus.type = type; }
void set_telemetry_status_type(uint8_t type) { pthread_mutex_lock(&_tstatus_mutex); _tstatus.type = type; pthread_mutex_unlock(&_tstatus_mutex); }
void lock_send();
void unlock_send();
void update_radio_status(const radio_status_s &radio_status);
@@ -521,14 +527,15 @@ private:
static constexpr int MAVLINK_MAX_INTERVAL{10000};
static constexpr float MAVLINK_MIN_MULTIPLIER{0.0005f};
mavlink_message_t _mavlink_buffer {};
mavlink_status_t _mavlink_status {};
// Used before _instance_id is assigned; copied to static per-channel arrays in set_instance_id()
mavlink_message_t _mavlink_buffer_early {};
mavlink_status_t _mavlink_status_early {};
/* states */
bool _hil_enabled{false}; /**< Hardware In the Loop mode */
bool _is_usb_uart{false}; /**< Port is USB */
bool _wait_to_transmit{false}; /**< Wait to transmit until received messages. */
bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
px4::atomic_bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
px4::atomic_bool _should_check_events{false}; /**< Events subscription: only one MAVLink instance should check */
px4::atomic_bool _sending_parameters{false}; /**< True if parameters are currently sent out */
@@ -580,9 +587,9 @@ private:
uint64_t _mavlink_start_time{0};
uint8_t _protocol_version = 0; ///< after initialization the only values are 1 and 2
unsigned _bytes_tx{0};
unsigned _bytes_txerr{0};
unsigned _bytes_rx{0};
px4::atomic<unsigned> _bytes_tx{0};
px4::atomic<unsigned> _bytes_txerr{0};
px4::atomic<unsigned> _bytes_rx{0};
hrt_abstime _bytes_timestamp{0};
#if defined(MAVLINK_UDP)
@@ -613,14 +620,14 @@ private:
radio_status_s _rstatus {};
telemetry_status_s _tstatus {};
bool _tstatus_updated{false};
px4::atomic_bool _tstatus_updated{false};
pthread_mutex_t _tstatus_mutex {};
ping_statistics_s _ping_stats {};
pthread_mutex_t _message_buffer_mutex{};
VariableLengthRingbuffer _message_buffer{};
pthread_mutex_t _send_mutex {};
pthread_mutex_t _radio_status_mutex {};
DEFINE_PARAMETERS(
+8
View File
@@ -1732,7 +1732,9 @@ MavlinkReceiver::handle_message_ping(mavlink_message_t *msg)
ping.target_system = msg->sysid;
ping.target_component = msg->compid;
_mavlink.lock_send();
mavlink_msg_ping_send_struct(_mavlink.get_channel(), &ping);
_mavlink.unlock_send();
} else if ((ping.target_system == mavlink_system.sysid) &&
(ping.target_component ==
@@ -2304,7 +2306,9 @@ MavlinkReceiver::get_message_interval(int msgId)
}
// send back this value...
_mavlink.lock_send();
mavlink_msg_message_interval_send(_mavlink.get_channel(), msgId, interval);
_mavlink.unlock_send();
}
void
@@ -2918,6 +2922,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
}
if ((t >= _last_heartbeat_check + (TIMEOUT / 2)) || force) {
_mavlink.lock_telemetry_status();
telemetry_status_s &tstatus = _mavlink.telemetry_status();
tstatus.heartbeat_type_antenna_tracker = (t <= TIMEOUT + _heartbeat_type_antenna_tracker);
@@ -2937,6 +2942,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
tstatus.heartbeat_component_udp_bridge = (t <= TIMEOUT + _heartbeat_component_udp_bridge);
tstatus.heartbeat_component_uart_bridge = (t <= TIMEOUT + _heartbeat_component_uart_bridge);
_mavlink.unlock_telemetry_status();
_mavlink.telemetry_status_updated();
_last_heartbeat_check = t;
}
@@ -3295,6 +3301,7 @@ MavlinkReceiver::run()
if (t - last_send_update > timeout * 1000) {
_mission_manager.check_active_mission();
_mavlink.lock_send();
_mission_manager.send();
if (_mavlink.get_mode() != Mavlink::MAVLINK_MODE::MAVLINK_MODE_IRIDIUM) {
@@ -3307,6 +3314,7 @@ MavlinkReceiver::run()
}
_mavlink_log_handler.send();
_mavlink.unlock_send();
last_send_update = t;
}
@@ -969,13 +969,15 @@ bool MavlinkFtpTest::_setup_ftp_msg(const MavlinkFTP::PayloadHeader *payload_hea
payload->padding = 0;
msg->checksum = 0;
mavlink_msg_file_transfer_protocol_pack(clientSystemId, // Sender system id
clientComponentId, // Sender component id
msg, // Message to pack payload into
0, // Target network
serverSystemId, // Target system id
serverComponentId, // Target component id
payload_bytes); // Payload to pack into message
mavlink_status_t local_status{};
mavlink_msg_file_transfer_protocol_pack_status(clientSystemId, // Sender system id
clientComponentId, // Sender component id
&local_status, // Local status to avoid global race
msg, // Message to pack payload into
0, // Target network
serverSystemId, // Target system id
serverComponentId, // Target component id
payload_bytes); // Payload to pack into message
return true;
}
@@ -73,16 +73,17 @@ bool VehicleAngularVelocity::Start()
// force initial updates
ParametersUpdate(true);
// initial sensor selection before registering callbacks to avoid data race with Run()
if (!SensorSelectionUpdate(hrt_absolute_time(), true)) {
ScheduleNow();
}
// sensor_selection needed to change the active sensor if the primary stops updating
if (!_sensor_selection_sub.registerCallback()) {
PX4_ERR("callback registration failed");
return false;
}
if (!SensorSelectionUpdate(hrt_absolute_time(), true)) {
ScheduleNow();
}
return true;
}
@@ -281,7 +282,7 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
}
if (sensor_gyro_fifo_sub.get().device_id == device_id) {
if (_sensor_gyro_fifo_sub.ChangeInstance(i) && _sensor_gyro_fifo_sub.registerCallback()) {
if (_sensor_gyro_fifo_sub.ChangeInstance(i)) {
// make sure non-FIFO sub is unregistered
_sensor_sub.unregisterCallback();
@@ -296,13 +297,16 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
_bias.zero();
_fifo_available = true;
perf_count(_selection_changed_perf);
PX4_DEBUG("selecting sensor_gyro_fifo:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
return true;
// register callback after all member writes to avoid data race with Run()
if (_sensor_gyro_fifo_sub.registerCallback()) {
perf_count(_selection_changed_perf);
PX4_DEBUG("selecting sensor_gyro_fifo:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
return true;
} else {
PX4_ERR("unable to register callback for sensor_gyro_fifo:%" PRIu8 " %" PRIu32,
i, sensor_gyro_fifo_sub.get().device_id);
} else {
PX4_ERR("unable to register callback for sensor_gyro_fifo:%" PRIu8 " %" PRIu32,
i, sensor_gyro_fifo_sub.get().device_id);
}
}
}
}
@@ -323,7 +327,7 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
}
if (sensor_gyro_sub.get().device_id == device_id) {
if (_sensor_sub.ChangeInstance(i) && _sensor_sub.registerCallback()) {
if (_sensor_sub.ChangeInstance(i)) {
// make sure FIFO sub is unregistered
_sensor_gyro_fifo_sub.unregisterCallback();
@@ -338,13 +342,16 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
_bias.zero();
_fifo_available = false;
perf_count(_selection_changed_perf);
PX4_DEBUG("selecting sensor_gyro:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
return true;
// register callback after all member writes to avoid data race with Run()
if (_sensor_sub.registerCallback()) {
perf_count(_selection_changed_perf);
PX4_DEBUG("selecting sensor_gyro:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
return true;
} else {
PX4_ERR("unable to register callback for sensor_gyro:%" PRIu8 " %" PRIu32,
i, sensor_gyro_sub.get().device_id);
} else {
PX4_ERR("unable to register callback for sensor_gyro:%" PRIu8 " %" PRIu32,
i, sensor_gyro_sub.get().device_id);
}
}
}
}
@@ -1687,6 +1687,7 @@ int simulator_mavlink_main(int argc, char *argv[])
} else {
px4_task_delete(g_sim_task);
px4_task_join(g_sim_task);
g_sim_task = -1;
}
+1 -1
View File
@@ -34,7 +34,7 @@
px4_add_module(
MODULE systemcmds__microbench
MAIN microbench
STACK_MAIN 4096
STACK_MAIN 16384
COMPILE_FLAGS
-Wno-double-promotion
-Wno-unused-but-set-variable
+9 -5
View File
@@ -1,8 +1,12 @@
px4_add_git_submodule(TARGET git_fuzztest PATH "fuzztest")
message(STATUS "Adding fuzztest")
# This will also add GTest
add_subdirectory(fuzztest EXCLUDE_FROM_ALL)
if(CMAKE_BUILD_TYPE STREQUAL "ThreadSanitizer")
message(STATUS "TSAN build: skipping fuzztest, fetching GTest only")
px4_setup_gtest_without_fuzztest()
else()
px4_add_git_submodule(TARGET git_fuzztest PATH "fuzztest")
message(STATUS "Adding fuzztest")
# This will also add GTest
add_subdirectory(fuzztest EXCLUDE_FROM_ALL)
endif()
# Ensure there's no -R without any filter expression since that trips newer ctest versions
if(TESTFILTER)