mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-06-03 07:40:05 +08:00
Compare commits
21 Commits
crsf-bind
...
pr-fix-tsan
| Author | SHA1 | Date | |
|---|---|---|---|
| 1520aa5220 | |||
| 30a774b77d | |||
| 5dbf62cd11 | |||
| 64ddfebfc6 | |||
| ecc789fc05 | |||
| 3cd9a56f1b | |||
| 98287f2b11 | |||
| bc0bdb8a84 | |||
| e8937a98a5 | |||
| d6c8514fea | |||
| 2ad0b297ca | |||
| 17ac592cd2 | |||
| 2c833f5ae2 | |||
| b56f2dbcbd | |||
| a757a7c29d | |||
| 20669aba6f | |||
| 261cc16077 | |||
| 9074c81838 | |||
| 3680775e5c | |||
| 22a32eaa55 | |||
| 91e25df7c4 |
@@ -0,0 +1,22 @@
|
||||
CONFIG_COMMON_DISTANCE_SENSOR=n
|
||||
CONFIG_COMMON_TELEMETRY=n
|
||||
CONFIG_DRIVERS_ADC_ADS1115=n
|
||||
CONFIG_DRIVERS_CAMERA_CAPTURE=n
|
||||
CONFIG_DRIVERS_CAMERA_TRIGGER=n
|
||||
CONFIG_DRIVERS_IMU_ANALOG_DEVICES_ADIS16448=n
|
||||
CONFIG_DRIVERS_IRLOCK=n
|
||||
CONFIG_MODULES_AIRSPEED_SELECTOR=n
|
||||
CONFIG_MODULES_GIMBAL=n
|
||||
CONFIG_MODULES_VTOL_ATT_CONTROL=n
|
||||
CONFIG_MODULES_FW_ATT_CONTROL=n
|
||||
CONFIG_MODULES_FW_AUTOTUNE_ATTITUDE_CONTROL=n
|
||||
CONFIG_MODULES_FW_MODE_MANAGER=n
|
||||
CONFIG_MODULES_FW_LATERAL_LONGITUDINAL_CONTROL=n
|
||||
CONFIG_MODULES_FW_RATE_CONTROL=n
|
||||
CONFIG_MODULES_FW_AUTOTUNE_ATTITUDE_CONTROL=n
|
||||
CONFIG_MODULES_MC_AUTOTUNE_ATTITUDE_CONTROL=n
|
||||
CONFIG_MODULES_PAYLOAD_DELIVERER=n
|
||||
CONFIG_MODULES_SIMULATION_SIMULATOR_SIH=n
|
||||
CONFIG_BOARD_TESTING=y
|
||||
CONFIG_DRIVERS_TEST_PPM=y
|
||||
CONFIG_SYSTEMCMDS_MICROBENCH=y
|
||||
+46
-17
@@ -31,6 +31,30 @@
|
||||
#
|
||||
############################################################################
|
||||
|
||||
#=============================================================================
|
||||
#
|
||||
# px4_setup_gtest_without_fuzztest
|
||||
#
|
||||
# Fetches GTest standalone and stubs out fuzztest cmake functions.
|
||||
# Used when fuzztest is not available (e.g. TSAN builds where fuzztest's
|
||||
# coverage instrumentation is incompatible).
|
||||
#
|
||||
macro(px4_setup_gtest_without_fuzztest)
|
||||
include(FetchContent)
|
||||
FetchContent_Declare(
|
||||
googletest
|
||||
GIT_REPOSITORY https://github.com/google/googletest.git
|
||||
GIT_TAG v1.16.0
|
||||
)
|
||||
FetchContent_MakeAvailable(googletest)
|
||||
|
||||
function(link_fuzztest name)
|
||||
target_link_libraries(${name} PRIVATE gtest gtest_main)
|
||||
endfunction()
|
||||
macro(fuzztest_setup_fuzzing_flags)
|
||||
endmacro()
|
||||
endmacro()
|
||||
|
||||
#=============================================================================
|
||||
#
|
||||
# px4_add_unit_gtest
|
||||
@@ -98,23 +122,28 @@ function(px4_add_functional_gtest)
|
||||
add_executable(${TESTNAME} EXCLUDE_FROM_ALL ${SRC} ${EXTRA_SRCS})
|
||||
|
||||
# link the libary to test and gtest
|
||||
target_link_libraries(${TESTNAME} PRIVATE ${LINKLIBS} gtest_functional_main
|
||||
px4_layer
|
||||
px4_platform
|
||||
uORB
|
||||
systemlib
|
||||
cdev
|
||||
px4_work_queue
|
||||
px4_daemon
|
||||
work_queue
|
||||
parameters
|
||||
events
|
||||
perf
|
||||
tinybson
|
||||
uorb_msgs
|
||||
fuzztest::fuzztest # Do not use link_fuzztest() here because that
|
||||
# also links to fuzztest_gtest_main
|
||||
test_stubs) # put test_stubs last
|
||||
set(_FUNCTIONAL_GTEST_LIBS ${LINKLIBS} gtest_functional_main
|
||||
px4_layer
|
||||
px4_platform
|
||||
uORB
|
||||
systemlib
|
||||
cdev
|
||||
px4_work_queue
|
||||
px4_daemon
|
||||
work_queue
|
||||
parameters
|
||||
events
|
||||
perf
|
||||
tinybson
|
||||
uorb_msgs)
|
||||
if(TARGET fuzztest::fuzztest)
|
||||
list(APPEND _FUNCTIONAL_GTEST_LIBS fuzztest::fuzztest) # Do not use link_fuzztest() here because that
|
||||
# also links to fuzztest_gtest_main
|
||||
else()
|
||||
list(APPEND _FUNCTIONAL_GTEST_LIBS gtest)
|
||||
endif()
|
||||
list(APPEND _FUNCTIONAL_GTEST_LIBS test_stubs) # put test_stubs last
|
||||
target_link_libraries(${TESTNAME} PRIVATE ${_FUNCTIONAL_GTEST_LIBS})
|
||||
|
||||
target_compile_definitions(${TESTNAME} PRIVATE MODULE_NAME="${TESTNAME}")
|
||||
|
||||
|
||||
@@ -40,6 +40,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <px4_platform_common/atomic.h>
|
||||
|
||||
namespace px4
|
||||
{
|
||||
|
||||
@@ -48,17 +50,17 @@ class AppState
|
||||
public:
|
||||
~AppState() {}
|
||||
|
||||
AppState() : _exitRequested(false), _isRunning(false) {}
|
||||
AppState() {}
|
||||
|
||||
bool exitRequested() { return _exitRequested; }
|
||||
void requestExit() { _exitRequested = true; }
|
||||
bool exitRequested() { return _exitRequested.load(); }
|
||||
void requestExit() { _exitRequested.store(true); }
|
||||
|
||||
bool isRunning() { return _isRunning; }
|
||||
void setRunning(bool running) { _isRunning = running; }
|
||||
bool isRunning() { return _isRunning.load(); }
|
||||
void setRunning(bool running) { _isRunning.store(running); }
|
||||
|
||||
protected:
|
||||
bool _exitRequested;
|
||||
bool _isRunning;
|
||||
px4::atomic_bool _exitRequested{false};
|
||||
px4::atomic_bool _isRunning{false};
|
||||
private:
|
||||
AppState(const AppState &);
|
||||
const AppState &operator=(const AppState &);
|
||||
|
||||
@@ -95,7 +95,9 @@ public:
|
||||
} else
|
||||
#endif // __PX4_NUTTX
|
||||
{
|
||||
return __atomic_load_n(&_value, __ATOMIC_SEQ_CST);
|
||||
T val;
|
||||
__atomic_load(&_value, &val, __ATOMIC_SEQ_CST);
|
||||
return val;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ public:
|
||||
|
||||
void Run();
|
||||
|
||||
void request_stop() { _should_exit.store(true); }
|
||||
void request_stop() { _should_exit.store(true); SignalWorkerThread(); }
|
||||
|
||||
void print_status(bool last = false);
|
||||
|
||||
@@ -80,7 +80,14 @@ private:
|
||||
|
||||
bool should_exit() const { return _should_exit.load(); }
|
||||
|
||||
inline void SignalWorkerThread();
|
||||
inline void SignalWorkerThread()
|
||||
{
|
||||
int sem_val;
|
||||
|
||||
if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) {
|
||||
px4_sem_post(&_process_lock);
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef __PX4_NUTTX
|
||||
// In NuttX work can be enqueued from an ISR
|
||||
|
||||
@@ -167,6 +167,9 @@ __EXPORT px4_task_t px4_task_spawn_cmd(const char *name,
|
||||
/** Deletes a task - does not do resource cleanup **/
|
||||
__EXPORT int px4_task_delete(px4_task_t pid);
|
||||
|
||||
/** Wait for a task to exit. Returns 0 on success. Only supported on POSIX. **/
|
||||
__EXPORT int px4_task_join(px4_task_t pid);
|
||||
|
||||
/** Send a signal to a task **/
|
||||
__EXPORT int px4_task_kill(px4_task_t pid, int sig);
|
||||
|
||||
|
||||
@@ -50,12 +50,13 @@
|
||||
#include <uORB/uORB.h>
|
||||
#include <uORB/topics/log_message.h>
|
||||
#include <drivers/drv_hrt.h>
|
||||
#include <px4_platform_common/atomic.h>
|
||||
|
||||
#if defined(BOARD_ENABLE_LOG_HISTORY)
|
||||
static LogHistory g_log_history;
|
||||
#endif
|
||||
|
||||
static orb_advert_t orb_log_message_pub = nullptr;
|
||||
static px4::atomic<orb_advert_t> orb_log_message_pub{nullptr};
|
||||
|
||||
__EXPORT const char *__px4_log_level_str[_PX4_LOG_LEVEL_PANIC + 1] = { "DEBUG", "INFO", "WARN", "ERROR", "PANIC" };
|
||||
|
||||
@@ -74,7 +75,7 @@ void px4_log_initialize(void)
|
||||
log_message.severity = 6; // info
|
||||
strcpy((char *)log_message.text, "initialized uORB logging");
|
||||
log_message.timestamp = hrt_absolute_time();
|
||||
orb_log_message_pub = orb_advertise(ORB_ID(log_message), &log_message);
|
||||
orb_log_message_pub.store(orb_advertise(ORB_ID(log_message), &log_message));
|
||||
}
|
||||
|
||||
__EXPORT void px4_log_modulename(int level, const char *module_name, const char *fmt, ...)
|
||||
@@ -178,7 +179,9 @@ __EXPORT void px4_log_modulename(int level, const char *module_name, const char
|
||||
}
|
||||
|
||||
/* publish an orb log message */
|
||||
if (level >= _PX4_LOG_LEVEL_INFO && orb_log_message_pub) { //publish all messages
|
||||
orb_advert_t orb_log_pub = orb_log_message_pub.load();
|
||||
|
||||
if (level >= _PX4_LOG_LEVEL_INFO && orb_log_pub) { //publish all messages
|
||||
|
||||
log_message_s log_message;
|
||||
|
||||
@@ -199,7 +202,7 @@ __EXPORT void px4_log_modulename(int level, const char *module_name, const char
|
||||
va_end(argptr);
|
||||
log_message.text[max_length - 1] = 0; //ensure 0-termination
|
||||
log_message.timestamp = hrt_absolute_time();
|
||||
orb_publish(ORB_ID(log_message), orb_log_message_pub, &log_message);
|
||||
orb_publish(ORB_ID(log_message), orb_log_pub, &log_message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -144,15 +144,6 @@ void WorkQueue::Add(WorkItem *item)
|
||||
SignalWorkerThread();
|
||||
}
|
||||
|
||||
void WorkQueue::SignalWorkerThread()
|
||||
{
|
||||
int sem_val;
|
||||
|
||||
if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) {
|
||||
px4_sem_post(&_process_lock);
|
||||
}
|
||||
}
|
||||
|
||||
void WorkQueue::Remove(WorkItem *item)
|
||||
{
|
||||
work_lock();
|
||||
|
||||
@@ -62,6 +62,14 @@ static BlockingQueue<const wq_config_t *, 1> *_wq_manager_create_queue{nullptr};
|
||||
|
||||
static px4::atomic_bool _wq_manager_should_exit{true};
|
||||
static px4::atomic_bool _wq_manager_running{false};
|
||||
static px4_task_t _wq_manager_task_id{-1};
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
#include <pthread.h>
|
||||
static constexpr int WQ_MAX_THREADS = 16;
|
||||
static pthread_t _wq_threads[WQ_MAX_THREADS] {};
|
||||
static int _wq_thread_count = 0;
|
||||
#endif
|
||||
|
||||
|
||||
static WorkQueue *
|
||||
@@ -326,6 +334,14 @@ WorkQueueManagerRun(int, char **)
|
||||
if (ret_create == 0) {
|
||||
PX4_DEBUG("starting: %s, priority: %d, stack: %zu bytes", wq->name, param.sched_priority, stacksize);
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
|
||||
if (_wq_thread_count < WQ_MAX_THREADS) {
|
||||
_wq_threads[_wq_thread_count++] = thread;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
} else {
|
||||
PX4_ERR("failed to create thread for %s (%i): %s", wq->name, ret_create, strerror(ret_create));
|
||||
}
|
||||
@@ -375,16 +391,16 @@ WorkQueueManagerStart()
|
||||
|
||||
_wq_manager_should_exit.store(false);
|
||||
|
||||
int task_id = px4_task_spawn_cmd("wq:manager",
|
||||
SCHED_DEFAULT,
|
||||
SCHED_PRIORITY_MAX,
|
||||
PX4_STACK_ADJUSTED(1280),
|
||||
(px4_main_t)&WorkQueueManagerRun,
|
||||
nullptr);
|
||||
_wq_manager_task_id = px4_task_spawn_cmd("wq:manager",
|
||||
SCHED_DEFAULT,
|
||||
SCHED_PRIORITY_MAX,
|
||||
PX4_STACK_ADJUSTED(1280),
|
||||
(px4_main_t)&WorkQueueManagerRun,
|
||||
nullptr);
|
||||
|
||||
if (task_id < 0) {
|
||||
if (_wq_manager_task_id < 0) {
|
||||
_wq_manager_should_exit.store(true);
|
||||
PX4_ERR("task start failed (%i)", task_id);
|
||||
PX4_ERR("task start failed (%i)", _wq_manager_task_id);
|
||||
return -errno;
|
||||
}
|
||||
|
||||
@@ -413,20 +429,12 @@ WorkQueueManagerStop()
|
||||
{
|
||||
if (!_wq_manager_should_exit.load()) {
|
||||
|
||||
// error can't shutdown until all WorkItems are removed/stopped
|
||||
if (_wq_manager_running.load() && (_wq_manager_wqs_list->size() > 0)) {
|
||||
PX4_ERR("can't shutdown with active WQs");
|
||||
WorkQueueManagerStatus();
|
||||
return PX4_ERROR;
|
||||
}
|
||||
|
||||
// first ask all WQs to stop
|
||||
if (_wq_manager_wqs_list != nullptr) {
|
||||
{
|
||||
LockGuard lg{_wq_manager_wqs_list->mutex()};
|
||||
|
||||
// ask all work queues (threads) to stop
|
||||
// NOTE: not currently safe without all WorkItems stopping first
|
||||
for (WorkQueue *wq : *_wq_manager_wqs_list) {
|
||||
wq->request_stop();
|
||||
}
|
||||
@@ -441,18 +449,31 @@ WorkQueueManagerStop()
|
||||
_wq_manager_wqs_list = nullptr;
|
||||
}
|
||||
|
||||
// signal wq:manager thread to exit
|
||||
_wq_manager_should_exit.store(true);
|
||||
|
||||
if (_wq_manager_create_queue != nullptr) {
|
||||
// push nullptr to wake the wq manager task
|
||||
_wq_manager_create_queue->push(nullptr);
|
||||
|
||||
px4_usleep(10000);
|
||||
// wait for wq:manager thread to finish
|
||||
px4_task_join(_wq_manager_task_id);
|
||||
_wq_manager_task_id = -1;
|
||||
|
||||
delete _wq_manager_create_queue;
|
||||
_wq_manager_create_queue = nullptr;
|
||||
}
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
|
||||
// join all work queue threads (safe now that wq:manager has exited)
|
||||
for (int i = 0; i < _wq_thread_count; i++) {
|
||||
pthread_join(_wq_threads[i], nullptr);
|
||||
}
|
||||
|
||||
_wq_thread_count = 0;
|
||||
#endif
|
||||
|
||||
} else {
|
||||
PX4_WARN("not running");
|
||||
return PX4_ERROR;
|
||||
|
||||
@@ -55,7 +55,7 @@ bool Subscription::subscribe()
|
||||
|
||||
if (node) {
|
||||
_node = node;
|
||||
_last_generation = initial_generation;
|
||||
_last_generation.store(initial_generation);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -70,7 +70,7 @@ void Subscription::unsubscribe()
|
||||
}
|
||||
|
||||
_node = nullptr;
|
||||
_last_generation = 0;
|
||||
_last_generation.store(0);
|
||||
}
|
||||
|
||||
bool Subscription::ChangeInstance(uint8_t instance)
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include <uORB/uORB.h>
|
||||
#include <uORB/topics/uORBTopics.hpp>
|
||||
|
||||
#include <px4_platform_common/atomic.h>
|
||||
#include <px4_platform_common/defines.h>
|
||||
#include <lib/mathlib/mathlib.h>
|
||||
|
||||
@@ -131,7 +132,7 @@ public:
|
||||
bool updated()
|
||||
{
|
||||
if (subscribe()) {
|
||||
return Manager::updates_available(_node, _last_generation);
|
||||
return Manager::updates_available(_node, _last_generation.load());
|
||||
}
|
||||
|
||||
return false;
|
||||
@@ -144,7 +145,10 @@ public:
|
||||
bool update(void *dst)
|
||||
{
|
||||
if (subscribe()) {
|
||||
return Manager::orb_data_copy(_node, dst, _last_generation, true);
|
||||
unsigned gen = _last_generation.load();
|
||||
bool ret = Manager::orb_data_copy(_node, dst, gen, true);
|
||||
_last_generation.store(gen);
|
||||
return ret;
|
||||
}
|
||||
|
||||
return false;
|
||||
@@ -157,7 +161,10 @@ public:
|
||||
bool copy(void *dst)
|
||||
{
|
||||
if (subscribe()) {
|
||||
return Manager::orb_data_copy(_node, dst, _last_generation, false);
|
||||
unsigned gen = _last_generation.load();
|
||||
bool ret = Manager::orb_data_copy(_node, dst, gen, false);
|
||||
_last_generation.store(gen);
|
||||
return ret;
|
||||
}
|
||||
|
||||
return false;
|
||||
@@ -170,7 +177,7 @@ public:
|
||||
bool ChangeInstance(uint8_t instance);
|
||||
|
||||
uint8_t get_instance() const { return _instance; }
|
||||
unsigned get_last_generation() const { return _last_generation; }
|
||||
unsigned get_last_generation() const { return _last_generation.load(); }
|
||||
orb_id_t get_topic() const { return get_orb_meta(_orb_id); }
|
||||
|
||||
ORB_ID orb_id() const { return _orb_id; }
|
||||
@@ -184,7 +191,7 @@ protected:
|
||||
|
||||
void *_node{nullptr};
|
||||
|
||||
unsigned _last_generation{0}; /**< last generation the subscriber has seen */
|
||||
px4::atomic<unsigned> _last_generation{0}; /**< last generation the subscriber has seen */
|
||||
|
||||
ORB_ID _orb_id{ORB_ID::INVALID};
|
||||
uint8_t _instance{0};
|
||||
|
||||
@@ -67,7 +67,9 @@ public:
|
||||
void call() override
|
||||
{
|
||||
// signal immediately if no interval, otherwise only if interval has elapsed
|
||||
if ((_interval_us == 0) || (hrt_elapsed_time(&_last_update) >= _interval_us)) {
|
||||
hrt_abstime last_update = _last_update.load();
|
||||
|
||||
if ((_interval_us == 0) || (hrt_elapsed_time(&last_update) >= _interval_us)) {
|
||||
pthread_cond_signal(&_cv);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,8 +163,10 @@ public:
|
||||
void call() override
|
||||
{
|
||||
// schedule immediately if updated (queue depth or subscription interval)
|
||||
if ((_required_updates == 0)
|
||||
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= _required_updates)) {
|
||||
uint8_t req = _required_updates.load();
|
||||
|
||||
if ((req == 0)
|
||||
|| (Manager::updates_available(_subscription.get_node(), _subscription.get_last_generation()) >= req)) {
|
||||
if (updated()) {
|
||||
_work_item->ScheduleNow();
|
||||
}
|
||||
@@ -179,13 +181,13 @@ public:
|
||||
void set_required_updates(uint8_t required_updates)
|
||||
{
|
||||
// TODO: constrain to queue depth?
|
||||
_required_updates = required_updates;
|
||||
_required_updates.store(required_updates);
|
||||
}
|
||||
|
||||
private:
|
||||
px4::WorkItem *_work_item;
|
||||
|
||||
uint8_t _required_updates{0};
|
||||
px4::atomic<uint8_t> _required_updates{0};
|
||||
};
|
||||
|
||||
} // namespace uORB
|
||||
|
||||
@@ -38,7 +38,9 @@ namespace uORB
|
||||
|
||||
bool SubscriptionInterval::updated()
|
||||
{
|
||||
if (advertised() && (hrt_elapsed_time(&_last_update) >= _interval_us)) {
|
||||
hrt_abstime last_update = _last_update.load();
|
||||
|
||||
if (advertised() && (hrt_elapsed_time(&last_update) >= _interval_us)) {
|
||||
return _subscription.updated();
|
||||
}
|
||||
|
||||
@@ -62,10 +64,10 @@ bool SubscriptionInterval::copy(void *dst)
|
||||
// make sure we don't set a timestamp before the timer started counting (now - _interval_us would wrap because it's unsigned)
|
||||
if (now > _interval_us) {
|
||||
// shift last update time forward, but don't let it get further behind than the interval
|
||||
_last_update = math::constrain(_last_update + _interval_us, now - _interval_us, now);
|
||||
_last_update.store(math::constrain(_last_update.load() + _interval_us, now - _interval_us, now));
|
||||
|
||||
} else {
|
||||
_last_update = now;
|
||||
_last_update.store(now);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <uORB/uORB.h>
|
||||
#include <px4_platform_common/atomic.h>
|
||||
#include <px4_platform_common/defines.h>
|
||||
|
||||
#include "uORBDeviceNode.hpp"
|
||||
@@ -132,11 +133,11 @@ public:
|
||||
* Set the last data update
|
||||
* @param t should be in range [now, now - _interval_us]
|
||||
*/
|
||||
void set_last_update(hrt_abstime t) { _last_update = t; }
|
||||
void set_last_update(hrt_abstime t) { _last_update.store(t); }
|
||||
protected:
|
||||
|
||||
Subscription _subscription;
|
||||
uint64_t _last_update{0}; // last subscription update in microseconds
|
||||
px4::atomic<uint64_t> _last_update{0}; // last subscription update in microseconds
|
||||
uint32_t _interval_us{0}; // maximum update interval in microseconds
|
||||
|
||||
};
|
||||
|
||||
@@ -57,7 +57,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t inst
|
||||
|
||||
uORB::DeviceNode::~DeviceNode()
|
||||
{
|
||||
free(_data);
|
||||
free(_data.load());
|
||||
|
||||
const char *devname = get_devname();
|
||||
|
||||
@@ -148,7 +148,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
|
||||
*
|
||||
* Note that filp will usually be NULL.
|
||||
*/
|
||||
if (nullptr == _data) {
|
||||
if (nullptr == _data.load()) {
|
||||
|
||||
#ifdef __PX4_NUTTX
|
||||
|
||||
@@ -158,13 +158,15 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
|
||||
lock();
|
||||
|
||||
/* re-check size */
|
||||
if (nullptr == _data) {
|
||||
if (nullptr == _data.load()) {
|
||||
const size_t data_size = _meta->o_size * _meta->o_queue;
|
||||
_data = (uint8_t *) px4_cache_aligned_alloc(data_size);
|
||||
uint8_t *data = (uint8_t *) px4_cache_aligned_alloc(data_size);
|
||||
|
||||
if (_data) {
|
||||
memset(_data, 0, data_size);
|
||||
if (data) {
|
||||
memset(data, 0, data_size);
|
||||
}
|
||||
|
||||
_data.store(data);
|
||||
}
|
||||
|
||||
unlock();
|
||||
@@ -175,7 +177,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
|
||||
#endif /* __PX4_NUTTX */
|
||||
|
||||
/* failed or could not allocate */
|
||||
if (nullptr == _data) {
|
||||
if (nullptr == _data.load()) {
|
||||
return -ENOMEM;
|
||||
}
|
||||
}
|
||||
@@ -190,7 +192,7 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
|
||||
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
|
||||
unsigned generation = _generation.fetch_add(1);
|
||||
|
||||
memcpy(_data + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
|
||||
memcpy(_data.load() + (_meta->o_size * (generation % _meta->o_queue)), buffer, _meta->o_size);
|
||||
|
||||
// callbacks
|
||||
for (auto item : _callbacks) {
|
||||
@@ -232,7 +234,7 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
|
||||
return PX4_OK;
|
||||
|
||||
case ORBIOCISADVERTISED:
|
||||
*(unsigned long *)arg = _advertised;
|
||||
*(unsigned long *)arg = _advertised.load();
|
||||
|
||||
return PX4_OK;
|
||||
|
||||
@@ -310,7 +312,7 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle)
|
||||
* of subscribers and publishers. But we also do not have a leak since future
|
||||
* publishers reuse the same DeviceNode object.
|
||||
*/
|
||||
devnode->_advertised = false;
|
||||
devnode->_advertised.store(false);
|
||||
|
||||
return PX4_OK;
|
||||
}
|
||||
@@ -347,7 +349,7 @@ uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t even
|
||||
bool
|
||||
uORB::DeviceNode::print_statistics(int max_topic_length)
|
||||
{
|
||||
if (!_advertised) {
|
||||
if (!_advertised.load()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -411,10 +413,10 @@ int16_t uORB::DeviceNode::process_add_subscription()
|
||||
// send the data to the remote entity.
|
||||
uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator();
|
||||
|
||||
if (_data != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
|
||||
if (_data.load() != nullptr && ch != nullptr) { // _data will not be null if there is a publisher.
|
||||
// Only send the most recent data to initialize the remote end.
|
||||
if (_data_valid) {
|
||||
ch->send_message(_meta->o_name, _meta->o_size, _data + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
|
||||
ch->send_message(_meta->o_name, _meta->o_size, _data.load() + (_meta->o_size * ((_generation.load() - 1) % _meta->o_queue)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -175,9 +175,9 @@ public:
|
||||
*
|
||||
* This is used in the case of multi_pub/sub to check if it's valid to advertise
|
||||
* and publish to this node or if another node should be tried. */
|
||||
bool is_advertised() const { return _advertised; }
|
||||
bool is_advertised() const { return _advertised.load(); }
|
||||
|
||||
void mark_as_advertised() { _advertised = true; }
|
||||
void mark_as_advertised() { _advertised.store(true); }
|
||||
|
||||
/**
|
||||
* Print statistics
|
||||
@@ -224,10 +224,10 @@ public:
|
||||
*/
|
||||
bool copy(void *dst, unsigned &generation)
|
||||
{
|
||||
if ((dst != nullptr) && (_data != nullptr)) {
|
||||
if ((dst != nullptr) && (_data.load() != nullptr)) {
|
||||
if (_meta->o_queue == 1) {
|
||||
ATOMIC_ENTER;
|
||||
memcpy(dst, _data, _meta->o_size);
|
||||
memcpy(dst, _data.load(), _meta->o_size);
|
||||
generation = _generation.load();
|
||||
ATOMIC_LEAVE;
|
||||
return true;
|
||||
@@ -249,7 +249,7 @@ public:
|
||||
generation = current_generation - _meta->o_queue;
|
||||
}
|
||||
|
||||
memcpy(dst, _data + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
|
||||
memcpy(dst, _data.load() + (_meta->o_size * (generation % _meta->o_queue)), _meta->o_size);
|
||||
ATOMIC_LEAVE;
|
||||
|
||||
++generation;
|
||||
@@ -279,13 +279,13 @@ private:
|
||||
|
||||
const orb_metadata *_meta; /**< object metadata information */
|
||||
|
||||
uint8_t *_data{nullptr}; /**< allocated object buffer */
|
||||
px4::atomic<uint8_t *> _data{nullptr}; /**< allocated object buffer */
|
||||
bool _data_valid{false}; /**< At least one valid data */
|
||||
px4::atomic<unsigned> _generation{0}; /**< object generation count */
|
||||
List<uORB::SubscriptionCallback *> _callbacks;
|
||||
|
||||
const uint8_t _instance; /**< orb multi instance identifier */
|
||||
bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
|
||||
px4::atomic_bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
|
||||
|
||||
int8_t _subscriber_count{0};
|
||||
|
||||
|
||||
@@ -52,25 +52,25 @@
|
||||
pthread_mutex_t uORB::Manager::_communicator_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
#endif
|
||||
|
||||
uORB::Manager *uORB::Manager::_Instance = nullptr;
|
||||
px4::atomic<uORB::Manager *> uORB::Manager::_Instance{nullptr};
|
||||
|
||||
bool uORB::Manager::initialize()
|
||||
{
|
||||
if (_Instance == nullptr) {
|
||||
_Instance = new uORB::Manager();
|
||||
if (_Instance.load() == nullptr) {
|
||||
_Instance.store(new uORB::Manager());
|
||||
}
|
||||
|
||||
#if defined(__PX4_NUTTX) && !defined(CONFIG_BUILD_FLAT) && defined(__KERNEL__)
|
||||
px4_register_boardct_ioctl(_ORBIOCDEVBASE, orb_ioctl);
|
||||
#endif
|
||||
return _Instance != nullptr;
|
||||
return _Instance.load() != nullptr;
|
||||
}
|
||||
|
||||
bool uORB::Manager::terminate()
|
||||
{
|
||||
if (_Instance != nullptr) {
|
||||
delete _Instance;
|
||||
_Instance = nullptr;
|
||||
if (_Instance.load() != nullptr) {
|
||||
delete _Instance.load();
|
||||
_Instance.store(nullptr);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -277,14 +277,14 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
|
||||
if (_publisher_rule.ignore_other_topics) {
|
||||
if (!findTopic(_publisher_rule, meta->o_name)) {
|
||||
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
|
||||
return (orb_advert_t)_Instance;
|
||||
return (orb_advert_t)_Instance.load();
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if (findTopic(_publisher_rule, meta->o_name)) {
|
||||
PX4_DEBUG("not allowing %s to publish topic %s", prog_name, meta->o_name);
|
||||
return (orb_advert_t)_Instance;
|
||||
return (orb_advert_t)_Instance.load();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -337,7 +337,7 @@ int uORB::Manager::orb_unadvertise(orb_advert_t handle)
|
||||
{
|
||||
#ifdef ORB_USE_PUBLISHER_RULES
|
||||
|
||||
if (handle == _Instance) {
|
||||
if (handle == _Instance.load()) {
|
||||
return PX4_OK; //pretend success
|
||||
}
|
||||
|
||||
@@ -366,7 +366,7 @@ int uORB::Manager::orb_publish(const struct orb_metadata *meta, orb_advert_t han
|
||||
{
|
||||
#ifdef ORB_USE_PUBLISHER_RULES
|
||||
|
||||
if (handle == _Instance) {
|
||||
if (handle == _Instance.load()) {
|
||||
return PX4_OK; //pretend success
|
||||
}
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ public:
|
||||
* Make sure initialize() is called first.
|
||||
* @return uORB::Manager*
|
||||
*/
|
||||
static uORB::Manager *get_instance() { return _Instance; }
|
||||
static uORB::Manager *get_instance() { return _Instance.load(); }
|
||||
|
||||
/**
|
||||
* Get the DeviceMaster. If it does not exist,
|
||||
@@ -489,7 +489,7 @@ private: // class methods
|
||||
int node_open(const struct orb_metadata *meta, bool advertiser, int *instance = nullptr);
|
||||
|
||||
private: // data members
|
||||
static Manager *_Instance;
|
||||
static px4::atomic<Manager *> _Instance;
|
||||
|
||||
#ifdef CONFIG_ORB_COMMUNICATOR
|
||||
// the communicator channel instance.
|
||||
|
||||
@@ -144,7 +144,7 @@ int uORBTest::UnitTest::pubsublatency_main()
|
||||
PX4_INFO("max: %3i us", timing_max);
|
||||
PX4_INFO("missed topic updates: %i", num_missed);
|
||||
|
||||
pubsubtest_passed = true;
|
||||
pubsubtest_passed.store(true);
|
||||
|
||||
#if defined(CONFIG_ARCH_BOARD_PX4_SITL)
|
||||
// relaxed on SITL (non-realtime)
|
||||
@@ -154,13 +154,13 @@ int uORBTest::UnitTest::pubsublatency_main()
|
||||
#endif
|
||||
|
||||
if (mean > kMaxMeanUs) {
|
||||
pubsubtest_res = PX4_ERROR;
|
||||
pubsubtest_res.store(PX4_ERROR);
|
||||
|
||||
} else {
|
||||
pubsubtest_res = PX4_OK;
|
||||
pubsubtest_res.store(PX4_OK);
|
||||
}
|
||||
|
||||
return pubsubtest_res;
|
||||
return pubsubtest_res.load();
|
||||
}
|
||||
|
||||
int uORBTest::UnitTest::test()
|
||||
@@ -412,7 +412,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
|
||||
pub = orb_advertise_multi(ORB_ID(orb_test_medium_multi), &data_topic, &idx);
|
||||
|
||||
if (idx != i) {
|
||||
_thread_should_exit = true;
|
||||
_thread_should_exit.store(true);
|
||||
PX4_ERR("Got wrong instance! should be: %i, but is %i", i, idx);
|
||||
return -1;
|
||||
}
|
||||
@@ -440,7 +440,7 @@ int uORBTest::UnitTest::pub_test_multi2_main()
|
||||
}
|
||||
|
||||
px4_usleep(100 * 1000);
|
||||
_thread_should_exit = true;
|
||||
_thread_should_exit.store(true);
|
||||
|
||||
for (int i = 0; i < num_instances; ++i) {
|
||||
orb_unadvertise(orb_pub[i]);
|
||||
@@ -454,7 +454,7 @@ int uORBTest::UnitTest::test_multi2()
|
||||
test_note("Testing multi-topic 2 test (queue simulation)");
|
||||
//test: first subscribe, then advertise
|
||||
|
||||
_thread_should_exit = false;
|
||||
_thread_should_exit.store(false);
|
||||
const int num_instances = 3;
|
||||
int orb_data_fd[num_instances] {-1, -1, -1};
|
||||
int orb_data_next = 0;
|
||||
@@ -478,7 +478,7 @@ int uORBTest::UnitTest::test_multi2()
|
||||
|
||||
hrt_abstime last_time[num_instances] {};
|
||||
|
||||
while (!_thread_should_exit) {
|
||||
while (!_thread_should_exit.load()) {
|
||||
|
||||
px4_usleep(1000);
|
||||
|
||||
@@ -501,6 +501,8 @@ int uORBTest::UnitTest::test_multi2()
|
||||
}
|
||||
}
|
||||
|
||||
px4_task_join(pubsub_task);
|
||||
|
||||
for (int i = 0; i < num_instances; ++i) {
|
||||
orb_unsubscribe(orb_data_fd[i]);
|
||||
}
|
||||
@@ -945,7 +947,7 @@ int uORBTest::UnitTest::pub_test_queue_main()
|
||||
const int queue_size = orb_get_queue_size(ORB_ID(orb_test_medium_queue_poll));
|
||||
|
||||
if ((ptopic = orb_advertise(ORB_ID(orb_test_medium_queue_poll), &t)) == nullptr) {
|
||||
_thread_should_exit = true;
|
||||
_thread_should_exit.store(true);
|
||||
return test_fail("advertise failed: %d", errno);
|
||||
}
|
||||
|
||||
@@ -968,9 +970,9 @@ int uORBTest::UnitTest::pub_test_queue_main()
|
||||
px4_usleep(20 * 1000); //give subscriber a chance to catch up
|
||||
}
|
||||
|
||||
_num_messages_sent = t.val;
|
||||
_num_messages_sent.store(t.val);
|
||||
px4_usleep(100 * 1000);
|
||||
_thread_should_exit = true;
|
||||
_thread_should_exit.store(true);
|
||||
orb_unadvertise(ptopic);
|
||||
|
||||
return 0;
|
||||
@@ -987,7 +989,7 @@ int uORBTest::UnitTest::test_queue_poll_notify()
|
||||
return test_fail("subscribe failed: %d", errno);
|
||||
}
|
||||
|
||||
_thread_should_exit = false;
|
||||
_thread_should_exit.store(false);
|
||||
|
||||
char *const args[1] = { nullptr };
|
||||
int pubsub_task = px4_task_spawn_cmd("uorb_test_queue",
|
||||
@@ -1006,12 +1008,12 @@ int uORBTest::UnitTest::test_queue_poll_notify()
|
||||
fds[0].fd = sfd;
|
||||
fds[0].events = POLLIN;
|
||||
|
||||
while (!_thread_should_exit) {
|
||||
while (!_thread_should_exit.load()) {
|
||||
|
||||
int poll_ret = px4_poll(fds, 1, 500);
|
||||
|
||||
if (poll_ret == 0) {
|
||||
if (_thread_should_exit) {
|
||||
if (_thread_should_exit.load()) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1032,9 +1034,11 @@ int uORBTest::UnitTest::test_queue_poll_notify()
|
||||
}
|
||||
}
|
||||
|
||||
if (_num_messages_sent != next_expected_val) {
|
||||
px4_task_join(pubsub_task);
|
||||
|
||||
if (_num_messages_sent.load() != next_expected_val) {
|
||||
return test_fail("number of sent and received messages mismatch (sent: %i, received: %i)",
|
||||
_num_messages_sent, next_expected_val);
|
||||
_num_messages_sent.load(), next_expected_val);
|
||||
}
|
||||
|
||||
return test_note("PASS orb queuing (poll & notify), got %i messages", next_expected_val);
|
||||
@@ -1056,7 +1060,7 @@ int uORBTest::UnitTest::latency_test(bool print)
|
||||
char *const args[1] = { nullptr };
|
||||
|
||||
pubsubtest_print = print;
|
||||
pubsubtest_passed = false;
|
||||
pubsubtest_passed.store(false);
|
||||
|
||||
/* test pub / sub latency */
|
||||
|
||||
@@ -1071,7 +1075,7 @@ int uORBTest::UnitTest::latency_test(bool print)
|
||||
args);
|
||||
|
||||
/* give the test task some data */
|
||||
while (!pubsubtest_passed) {
|
||||
while (!pubsubtest_passed.load()) {
|
||||
++t.val;
|
||||
t.timestamp = hrt_absolute_time();
|
||||
|
||||
@@ -1087,9 +1091,10 @@ int uORBTest::UnitTest::latency_test(bool print)
|
||||
return test_fail("failed launching task");
|
||||
}
|
||||
|
||||
px4_task_join(pubsub_task);
|
||||
orb_unadvertise(pfd0);
|
||||
|
||||
return pubsubtest_res;
|
||||
return pubsubtest_res.load();
|
||||
}
|
||||
|
||||
int uORBTest::UnitTest::test_fail(const char *fmt, ...)
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include <uORB/topics/orb_test_medium.h>
|
||||
#include <uORB/topics/orb_test_large.h>
|
||||
|
||||
#include <px4_platform_common/atomic.h>
|
||||
#include <px4_platform_common/defines.h>
|
||||
#include <px4_platform_common/posix.h>
|
||||
#include <px4_platform_common/time.h>
|
||||
@@ -87,11 +88,11 @@ private:
|
||||
static int pub_test_multi2_entry(int argc, char *argv[]);
|
||||
int pub_test_multi2_main();
|
||||
|
||||
volatile bool _thread_should_exit;
|
||||
px4::atomic_bool _thread_should_exit{false};
|
||||
|
||||
bool pubsubtest_passed{false};
|
||||
px4::atomic_bool pubsubtest_passed{false};
|
||||
bool pubsubtest_print{false};
|
||||
int pubsubtest_res = OK;
|
||||
px4::atomic<int> pubsubtest_res{OK};
|
||||
|
||||
orb_advert_t _pfd[4] {}; ///< used for test_multi and test_multi_reversed
|
||||
|
||||
@@ -113,7 +114,7 @@ private:
|
||||
static int pub_test_queue_entry(int argc, char *argv[]);
|
||||
int pub_test_queue_main();
|
||||
int test_queue_poll_notify();
|
||||
volatile int _num_messages_sent = 0;
|
||||
px4::atomic<int> _num_messages_sent{0};
|
||||
|
||||
int test_fail(const char *fmt, ...);
|
||||
int test_note(const char *fmt, ...);
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include <px4_platform_common/px4_config.h>
|
||||
#include <px4_platform_common/log.h>
|
||||
#include <px4_platform_common/tasks.h>
|
||||
#include <px4_platform_common/time.h>
|
||||
|
||||
#include <nuttx/board.h>
|
||||
#include <nuttx/kthread.h>
|
||||
@@ -91,6 +92,16 @@ int px4_task_delete(int pid)
|
||||
return task_delete(pid);
|
||||
}
|
||||
|
||||
int px4_task_join(int pid)
|
||||
{
|
||||
// Wait for the task to exit by polling whether it still exists
|
||||
while (kill(pid, 0) == 0) {
|
||||
px4_usleep(10000);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
const char *px4_get_taskname(void)
|
||||
{
|
||||
#if CONFIG_TASK_NAME_SIZE > 0 && (defined(__KERNEL__) || defined(CONFIG_BUILD_FLAT))
|
||||
|
||||
@@ -28,7 +28,7 @@ add_executable(px4
|
||||
apps.cpp
|
||||
)
|
||||
|
||||
if (BUILD_TESTING)
|
||||
if (BUILD_TESTING AND TARGET fuzztest::fuzztest)
|
||||
# Build mavlink fuzz tests. These run other modules and thus cannot be a functional/unit test
|
||||
add_executable(mavlink_fuzz_tests EXCLUDE_FROM_ALL
|
||||
src/px4/common/mavlink_fuzz_tests.cpp
|
||||
|
||||
@@ -134,7 +134,10 @@ void hrt_store_absolute_time(volatile hrt_abstime *t)
|
||||
*/
|
||||
bool hrt_called(struct hrt_call *entry)
|
||||
{
|
||||
return (entry->deadline == 0);
|
||||
hrt_lock();
|
||||
bool result = (entry->deadline == 0);
|
||||
hrt_unlock();
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -162,13 +165,13 @@ static void hrt_latency_update()
|
||||
/* bounded buckets */
|
||||
for (index = 0; index < LATENCY_BUCKET_COUNT; index++) {
|
||||
if (latency <= latency_buckets[index]) {
|
||||
latency_counters[index]++;
|
||||
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* catch-all at the end */
|
||||
latency_counters[index]++;
|
||||
__atomic_fetch_add(&latency_counters[index], 1, __ATOMIC_RELAXED);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -416,12 +419,15 @@ hrt_call_invoke()
|
||||
call->deadline = 0;
|
||||
|
||||
/* invoke the callout (if there is one) */
|
||||
if (call->callout) {
|
||||
hrt_callout callout = call->callout;
|
||||
void *arg = call->arg;
|
||||
|
||||
if (callout) {
|
||||
// Unlock so we don't deadlock in callback
|
||||
hrt_unlock();
|
||||
|
||||
//PX4_INFO("call %p: %p(%p)", call, call->callout, call->arg);
|
||||
call->callout(call->arg);
|
||||
//PX4_INFO("call %p: %p(%p)", call, callout, arg);
|
||||
callout(arg);
|
||||
|
||||
hrt_lock();
|
||||
}
|
||||
|
||||
@@ -36,4 +36,9 @@ set(SRCS
|
||||
)
|
||||
|
||||
px4_add_library(gtest_functional_main ${SRCS})
|
||||
target_link_libraries(gtest_functional_main PUBLIC gtest fuzztest::init_fuzztest)
|
||||
if(TARGET fuzztest::init_fuzztest)
|
||||
target_link_libraries(gtest_functional_main PUBLIC gtest fuzztest::init_fuzztest)
|
||||
target_compile_definitions(gtest_functional_main PRIVATE HAVE_FUZZTEST)
|
||||
else()
|
||||
target_link_libraries(gtest_functional_main PUBLIC gtest)
|
||||
endif()
|
||||
|
||||
@@ -32,7 +32,9 @@
|
||||
****************************************************************************/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#ifdef HAVE_FUZZTEST
|
||||
#include <fuzztest/init_fuzztest.h>
|
||||
#endif
|
||||
|
||||
#include <uORB/Subscription.hpp>
|
||||
|
||||
@@ -42,8 +44,10 @@ int main(int argc, char **argv)
|
||||
{
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
||||
#ifdef HAVE_FUZZTEST
|
||||
fuzztest::ParseAbslFlags(argc, argv);
|
||||
fuzztest::InitFuzzTest(&argc, &argv);
|
||||
#endif
|
||||
|
||||
uORB::Manager::initialize();
|
||||
param_init();
|
||||
|
||||
+3
-2
@@ -86,7 +86,7 @@ private:
|
||||
pthread_cond_t *passed_cond{nullptr};
|
||||
pthread_mutex_t *passed_lock{nullptr};
|
||||
uint64_t time_us{0};
|
||||
bool timeout{false};
|
||||
std::atomic<bool> timeout{false};
|
||||
std::atomic<bool> done{false};
|
||||
std::atomic<bool> removed{true};
|
||||
|
||||
@@ -98,6 +98,7 @@ private:
|
||||
std::atomic<uint64_t> _time_us{0};
|
||||
|
||||
TimedWait *_timed_waits{nullptr}; ///< head of linked list
|
||||
std::mutex _timed_waits_mutex;
|
||||
std::mutex _timed_waits_mutex; ///< protects _timed_waits linked list
|
||||
std::mutex _broadcast_mutex; ///< protects the broadcast phase in set_absolute_time()
|
||||
std::atomic<bool> _setting_time{false}; ///< true if set_absolute_time() is currently being executed
|
||||
};
|
||||
|
||||
@@ -55,6 +55,18 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
|
||||
|
||||
_time_us = time_us;
|
||||
|
||||
// Collect entries that need wakeup while holding _timed_waits_mutex,
|
||||
// then broadcast outside of it to avoid lock-order-inversion with
|
||||
// cond_timedwait() (which holds passed_lock while acquiring _timed_waits_mutex).
|
||||
struct WakeUp {
|
||||
pthread_cond_t *cond;
|
||||
pthread_mutex_t *lock;
|
||||
};
|
||||
|
||||
static constexpr int MAX_WAKEUPS = 32;
|
||||
WakeUp wakeups[MAX_WAKEUPS];
|
||||
int num_wakeups = 0;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock_timed_waits(_timed_waits_mutex);
|
||||
_setting_time = true;
|
||||
@@ -81,17 +93,29 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
|
||||
|
||||
if (timed_wait->time_us <= time_us &&
|
||||
!timed_wait->timeout) {
|
||||
// We are abusing the condition here to signal that the time
|
||||
// has passed.
|
||||
pthread_mutex_lock(timed_wait->passed_lock);
|
||||
// Mark as timed out and collect for broadcast
|
||||
timed_wait->timeout = true;
|
||||
pthread_cond_broadcast(timed_wait->passed_cond);
|
||||
pthread_mutex_unlock(timed_wait->passed_lock);
|
||||
|
||||
if (num_wakeups < MAX_WAKEUPS) {
|
||||
wakeups[num_wakeups++] = {timed_wait->passed_cond, timed_wait->passed_lock};
|
||||
}
|
||||
}
|
||||
|
||||
timed_wait_prev = timed_wait;
|
||||
timed_wait = timed_wait->next;
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast under _broadcast_mutex (not _timed_waits_mutex) so that
|
||||
// locking passed_lock here doesn't create an ordering cycle.
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_broadcast(_broadcast_mutex);
|
||||
|
||||
for (int i = 0; i < num_wakeups; ++i) {
|
||||
pthread_mutex_lock(wakeups[i].lock);
|
||||
pthread_cond_broadcast(wakeups[i].cond);
|
||||
pthread_mutex_unlock(wakeups[i].lock);
|
||||
}
|
||||
|
||||
_setting_time = false;
|
||||
}
|
||||
@@ -101,7 +125,7 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
|
||||
{
|
||||
// A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be
|
||||
// longer. And using thread_local is more efficient than malloc.
|
||||
static thread_local TimedWait timed_wait;
|
||||
static thread_local TimedWait timed_wait{};
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_timed_waits(_timed_waits_mutex);
|
||||
|
||||
@@ -144,9 +168,13 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
|
||||
// deadlock due to a different locking order in set_absolute_time().
|
||||
// Note that this case does not happen too frequently, and thus can be
|
||||
// a bit more expensive.
|
||||
// We wait for both the iteration phase (_timed_waits_mutex) and the
|
||||
// broadcast phase (_broadcast_mutex) to complete.
|
||||
pthread_mutex_unlock(lock);
|
||||
_timed_waits_mutex.lock();
|
||||
_timed_waits_mutex.unlock();
|
||||
_broadcast_mutex.lock();
|
||||
_broadcast_mutex.unlock();
|
||||
pthread_mutex_lock(lock);
|
||||
}
|
||||
|
||||
|
||||
+21
-10
@@ -65,11 +65,12 @@ void test_condition_timing_out()
|
||||
ls.set_absolute_time(some_time_us);
|
||||
|
||||
std::atomic<bool> should_have_timed_out{false};
|
||||
pthread_mutex_lock(&lock);
|
||||
|
||||
// Use a thread to wait for condition while we already have the lock.
|
||||
// This ensures the synchronization happens in the right order.
|
||||
// The thread that calls cond_timedwait must also lock the mutex,
|
||||
// otherwise set_absolute_time() would re-lock the main thread's
|
||||
// own mutex in its broadcast phase, causing a deadlock.
|
||||
TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() {
|
||||
pthread_mutex_lock(&lock);
|
||||
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), ETIMEDOUT);
|
||||
EXPECT_TRUE(should_have_timed_out);
|
||||
// It should be re-locked afterwards, so we should be able to unlock it.
|
||||
@@ -100,17 +101,25 @@ void test_locked_semaphore_getting_unlocked()
|
||||
LockstepScheduler ls;
|
||||
ls.set_absolute_time(some_time_us);
|
||||
|
||||
pthread_mutex_lock(&lock);
|
||||
// Use a thread to wait for condition while we already have the lock.
|
||||
// This ensures the synchronization happens in the right order.
|
||||
TestThread thread([&ls, &cond, &lock]() {
|
||||
std::atomic<bool> thread_locked{false};
|
||||
|
||||
// The thread locks the mutex itself (POSIX requires the calling thread
|
||||
// to own the mutex passed to pthread_cond_wait).
|
||||
TestThread thread([&ls, &cond, &lock, &thread_locked]() {
|
||||
pthread_mutex_lock(&lock);
|
||||
thread_locked = true;
|
||||
ls.set_absolute_time(some_time_us + 500);
|
||||
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), 0);
|
||||
// It should be re-locked afterwards, so we should be able to unlock it.
|
||||
EXPECT_EQ(pthread_mutex_unlock(&lock), 0);
|
||||
});
|
||||
|
||||
// Wait until thread has locked the mutex. Our pthread_mutex_lock below
|
||||
// will then block until cond_timedwait releases it via pthread_cond_wait.
|
||||
while (!thread_locked) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&lock);
|
||||
pthread_cond_broadcast(&cond);
|
||||
pthread_mutex_unlock(&lock);
|
||||
@@ -142,8 +151,9 @@ public:
|
||||
|
||||
void run()
|
||||
{
|
||||
pthread_mutex_lock(&_lock);
|
||||
_thread = std::make_shared<TestThread>([this]() {
|
||||
pthread_mutex_lock(&_lock);
|
||||
_thread_ready = true;
|
||||
_result = _ls.cond_timedwait(&_cond, &_lock, _timeout);
|
||||
pthread_mutex_unlock(&_lock);
|
||||
});
|
||||
@@ -151,7 +161,7 @@ public:
|
||||
|
||||
void check()
|
||||
{
|
||||
if (_is_done) {
|
||||
if (_is_done || !_thread_ready) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -186,6 +196,7 @@ private:
|
||||
pthread_mutex_t _lock;
|
||||
LockstepScheduler &_ls;
|
||||
std::atomic<bool> _is_done{false};
|
||||
std::atomic<bool> _thread_ready{false};
|
||||
std::atomic<int> _result {INITIAL_RESULT};
|
||||
std::shared_ptr<TestThread> _thread{};
|
||||
};
|
||||
@@ -310,7 +321,7 @@ void test_usleep()
|
||||
TEST(LockstepScheduler, All)
|
||||
{
|
||||
for (unsigned iteration = 1; iteration <= 100; ++iteration) {
|
||||
//std::cout << "Test iteration: " << iteration << "\n";
|
||||
std::cout << "Test iteration: " << iteration << "\n";
|
||||
test_absolute_time();
|
||||
test_condition_timing_out();
|
||||
test_locked_semaphore_getting_unlocked();
|
||||
|
||||
@@ -274,6 +274,21 @@ px4_task_t px4_task_spawn_cmd(const char *name, int scheduler, int priority, int
|
||||
return taskid;
|
||||
}
|
||||
|
||||
int px4_task_join(px4_task_t id)
|
||||
{
|
||||
if (id < PX4_MAX_TASKS) {
|
||||
pthread_mutex_lock(&task_mutex);
|
||||
pthread_t pid = taskmap[id].pid;
|
||||
pthread_mutex_unlock(&task_mutex);
|
||||
|
||||
if (pid != 0) {
|
||||
return pthread_join(pid, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
int px4_task_delete(px4_task_t id)
|
||||
{
|
||||
int rv = 0;
|
||||
@@ -291,7 +306,6 @@ int px4_task_delete(px4_task_t id)
|
||||
|
||||
// If current thread then exit, otherwise cancel
|
||||
if (pthread_self() == pid) {
|
||||
pthread_join(pid, nullptr);
|
||||
taskmap[id].isused = false;
|
||||
pthread_mutex_unlock(&task_mutex);
|
||||
pthread_exit(nullptr);
|
||||
|
||||
@@ -51,7 +51,7 @@ sleep 10
|
||||
|
||||
logger off
|
||||
|
||||
sensors status
|
||||
#sensors status # triggers TSAN
|
||||
listener sensor_gyro
|
||||
listener sensor_gyro_fifo
|
||||
listener sensor_gyro_fft
|
||||
|
||||
@@ -13,4 +13,10 @@ pwm_out_sim start
|
||||
|
||||
tests @test_name@
|
||||
|
||||
pwm_out_sim stop
|
||||
tone_alarm stop
|
||||
work_queue stop
|
||||
simulator_mavlink stop
|
||||
dataman stop
|
||||
|
||||
shutdown
|
||||
|
||||
@@ -272,14 +272,14 @@ static inline uint16_t get_latency_bucket_count(void) { return LATENCY_BUCKET_CO
|
||||
|
||||
static inline latency_info_t get_latency(uint16_t bucket_idx, uint16_t counter_idx)
|
||||
{
|
||||
latency_info_t ret = {latency_buckets[bucket_idx], latency_counters[counter_idx]};
|
||||
latency_info_t ret = {latency_buckets[bucket_idx], __atomic_load_n(&latency_counters[counter_idx], __ATOMIC_RELAXED)};
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline void reset_latency_counters(void)
|
||||
{
|
||||
for (int i = 0; i <= get_latency_bucket_count(); i++) {
|
||||
latency_counters[i] = 0;
|
||||
__atomic_store_n(&latency_counters[i], 0, __ATOMIC_RELAXED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -49,14 +49,16 @@ px4_add_module(
|
||||
module.yaml
|
||||
)
|
||||
|
||||
px4_add_functional_gtest(SRC septentrio_fuzz_tests.cpp
|
||||
LINKLIBS
|
||||
driver__septentrio
|
||||
COMPILE_FLAGS
|
||||
# There warnings come from within fuzztest
|
||||
-Wno-float-equal
|
||||
-Wno-sign-compare
|
||||
-Wno-shadow
|
||||
-Wno-extra
|
||||
-Wno-non-template-friend
|
||||
)
|
||||
if(TARGET fuzztest::fuzztest)
|
||||
px4_add_functional_gtest(SRC septentrio_fuzz_tests.cpp
|
||||
LINKLIBS
|
||||
driver__septentrio
|
||||
COMPILE_FLAGS
|
||||
# There warnings come from within fuzztest
|
||||
-Wno-float-equal
|
||||
-Wno-sign-compare
|
||||
-Wno-shadow
|
||||
-Wno-extra
|
||||
-Wno-non-template-friend
|
||||
)
|
||||
endif()
|
||||
|
||||
@@ -223,3 +223,4 @@ if(${PX4_PLATFORM} STREQUAL "posix" OR ${PX4_PLATFORM} STREQUAL "ros2")
|
||||
endif()
|
||||
|
||||
px4_add_functional_gtest(SRC ParameterTest.cpp LINKLIBS parameters)
|
||||
px4_add_functional_gtest(SRC DynamicSparseLayerTest.cpp LINKLIBS parameters)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/****************************************************************************
|
||||
*
|
||||
* Copyright (c) 2023 PX4 Development Team. All rights reserved.
|
||||
* Copyright (c) 2023-2026 PX4 Development Team. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
@@ -144,11 +144,13 @@ public:
|
||||
|
||||
int size() const override
|
||||
{
|
||||
const AtomicTransaction transaction;
|
||||
return _next_slot;
|
||||
}
|
||||
|
||||
int byteSize() const override
|
||||
{
|
||||
const AtomicTransaction transaction;
|
||||
return _n_slots * sizeof(Slot);
|
||||
}
|
||||
|
||||
@@ -197,23 +199,25 @@ private:
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef __PX4_NUTTX
|
||||
int max_retries = 5;
|
||||
|
||||
// As malloc uses locking, so we need to re-enable IRQ's during malloc/free and
|
||||
// then atomically exchange the buffer
|
||||
// On NuttX, malloc/free can't be called with IRQs disabled,
|
||||
// so we unlock around them and use CAS to handle races.
|
||||
while (_next_slot >= _n_slots && max_retries-- > 0) {
|
||||
Slot *previous_slots = nullptr;
|
||||
Slot *new_slots = nullptr;
|
||||
|
||||
do {
|
||||
previous_slots = _slots.load();
|
||||
int alloc_n_slots = _n_slots;
|
||||
transaction.unlock();
|
||||
|
||||
if (new_slots) {
|
||||
free(new_slots);
|
||||
}
|
||||
|
||||
new_slots = (Slot *) malloc(sizeof(Slot) * (_n_slots + _n_grow));
|
||||
new_slots = (Slot *) malloc(sizeof(Slot) * (alloc_n_slots + _n_grow));
|
||||
transaction.lock();
|
||||
|
||||
if (new_slots == nullptr) {
|
||||
@@ -235,6 +239,31 @@ private:
|
||||
transaction.lock();
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
// On POSIX, malloc/free work fine under the mutex, so we can
|
||||
// hold the lock throughout and avoid CAS/ABA race conditions.
|
||||
if (_next_slot >= _n_slots) {
|
||||
Slot *old = _slots.load();
|
||||
Slot *new_slots = (Slot *)malloc(sizeof(Slot) * (_n_slots + _n_grow));
|
||||
|
||||
if (new_slots == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
memcpy(new_slots, old, sizeof(Slot) * _n_slots);
|
||||
|
||||
for (int i = _n_slots; i < _n_slots + _n_grow; i++) {
|
||||
new_slots[i] = {UINT16_MAX, param_value_u{}};
|
||||
}
|
||||
|
||||
_slots.store(new_slots);
|
||||
_n_slots += _n_grow;
|
||||
free(old);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
return _next_slot < _n_slots;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,245 @@
|
||||
/****************************************************************************
|
||||
*
|
||||
* Copyright (c) 2026 PX4 Development Team. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
* are met:
|
||||
*
|
||||
* 1. Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in
|
||||
* the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* 3. Neither the name PX4 nor the names of its contributors may be
|
||||
* used to endorse or promote products derived from this software
|
||||
* without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
||||
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
|
||||
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
||||
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
||||
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
****************************************************************************/
|
||||
|
||||
/**
|
||||
* @file DynamicSparseLayerTest.cpp
|
||||
*
|
||||
* Concurrent stress tests for DynamicSparseLayer.
|
||||
*
|
||||
* Validates thread safety of store/get/contains under concurrent access,
|
||||
* especially during _grow() operations. Designed to be run with ASAN/TSan
|
||||
* to catch use-after-free or data races.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <parameters/px4_parameters.hpp>
|
||||
#include "DynamicSparseLayer.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
/**
|
||||
* Minimal ParamLayer parent that returns a default zero value for all params.
|
||||
*/
|
||||
class StubParamLayer : public ParamLayer
|
||||
{
|
||||
public:
|
||||
StubParamLayer() : ParamLayer(nullptr) {}
|
||||
|
||||
bool store(param_t, param_value_u) override { return false; }
|
||||
bool contains(param_t) const override { return false; }
|
||||
|
||||
px4::AtomicBitset<PARAM_COUNT> containedAsBitset() const override
|
||||
{
|
||||
return px4::AtomicBitset<PARAM_COUNT>();
|
||||
}
|
||||
|
||||
param_value_u get(param_t) const override
|
||||
{
|
||||
param_value_u v{};
|
||||
v.i = 0;
|
||||
return v;
|
||||
}
|
||||
|
||||
void reset(param_t) override {}
|
||||
void refresh(param_t) override {}
|
||||
int size() const override { return 0; }
|
||||
int byteSize() const override { return 0; }
|
||||
};
|
||||
|
||||
class DynamicSparseLayerConcurrentTest : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
StubParamLayer stub;
|
||||
};
|
||||
|
||||
// Single writer forces frequent _grow() while readers concurrently access
|
||||
// stored params. With only one writer there is no concurrent growth, so
|
||||
// this safely stresses the read-side during buffer reallocation.
|
||||
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentStoreGetRace)
|
||||
{
|
||||
for (int rep = 0; rep < 50; rep++) {
|
||||
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
|
||||
|
||||
constexpr int NUM_PARAMS = 100;
|
||||
std::atomic<bool> stop{false};
|
||||
|
||||
std::thread writer([&]() {
|
||||
for (int i = 0; i < NUM_PARAMS; i++) {
|
||||
param_value_u v{};
|
||||
v.i = i * 10;
|
||||
layer.store(static_cast<param_t>(i), v);
|
||||
}
|
||||
|
||||
stop.store(true, std::memory_order_release);
|
||||
});
|
||||
|
||||
std::vector<std::thread> readers;
|
||||
|
||||
for (int r = 0; r < 4; r++) {
|
||||
readers.emplace_back([&]() {
|
||||
while (!stop.load(std::memory_order_acquire)) {
|
||||
int current_size = layer.size();
|
||||
|
||||
for (int i = 0; i < current_size && i < NUM_PARAMS; i++) {
|
||||
layer.contains(static_cast<param_t>(i));
|
||||
layer.get(static_cast<param_t>(i));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
writer.join();
|
||||
|
||||
for (auto &t : readers) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
for (int i = 0; i < NUM_PARAMS; i++) {
|
||||
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
param_value_u v = layer.get(static_cast<param_t>(i));
|
||||
ASSERT_EQ(v.i, i * 10)
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
}
|
||||
|
||||
ASSERT_EQ(layer.size(), NUM_PARAMS);
|
||||
}
|
||||
}
|
||||
|
||||
// Multiple writers store to disjoint param ranges concurrently.
|
||||
// Pre-allocated to avoid concurrent _grow() which has a known ABA
|
||||
// limitation in the CAS retry loop (see _grow() implementation).
|
||||
// This test validates concurrent store correctness.
|
||||
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentMultipleWriters)
|
||||
{
|
||||
constexpr int PARAMS_PER_WRITER = 50;
|
||||
constexpr int NUM_WRITERS = 4;
|
||||
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
|
||||
|
||||
for (int rep = 0; rep < 20; rep++) {
|
||||
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
|
||||
|
||||
std::vector<std::thread> writers;
|
||||
|
||||
for (int w = 0; w < NUM_WRITERS; w++) {
|
||||
writers.emplace_back([&layer, w]() {
|
||||
int base = w * PARAMS_PER_WRITER;
|
||||
|
||||
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
|
||||
param_value_u v{};
|
||||
v.i = (base + i) * 10;
|
||||
layer.store(static_cast<param_t>(base + i), v);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : writers) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
|
||||
|
||||
for (int i = 0; i < TOTAL; i++) {
|
||||
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
param_value_u v = layer.get(static_cast<param_t>(i));
|
||||
ASSERT_EQ(v.i, i * 10)
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Combined stress: writers store while readers access concurrently.
|
||||
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentWritersAndReaders)
|
||||
{
|
||||
constexpr int PARAMS_PER_WRITER = 50;
|
||||
constexpr int NUM_WRITERS = 2;
|
||||
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
|
||||
|
||||
for (int rep = 0; rep < 20; rep++) {
|
||||
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
|
||||
std::atomic<bool> stop{false};
|
||||
|
||||
std::vector<std::thread> writers;
|
||||
|
||||
for (int w = 0; w < NUM_WRITERS; w++) {
|
||||
writers.emplace_back([&layer, w]() {
|
||||
int base = w * PARAMS_PER_WRITER;
|
||||
|
||||
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
|
||||
param_value_u v{};
|
||||
v.i = (base + i) * 10;
|
||||
layer.store(static_cast<param_t>(base + i), v);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<std::thread> readers;
|
||||
|
||||
for (int r = 0; r < 4; r++) {
|
||||
readers.emplace_back([&]() {
|
||||
while (!stop.load(std::memory_order_acquire)) {
|
||||
int current_size = layer.size();
|
||||
|
||||
for (int i = 0; i < current_size && i < TOTAL; i++) {
|
||||
layer.contains(static_cast<param_t>(i));
|
||||
layer.get(static_cast<param_t>(i));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : writers) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
stop.store(true, std::memory_order_release);
|
||||
|
||||
for (auto &t : readers) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
|
||||
|
||||
for (int i = 0; i < TOTAL; i++) {
|
||||
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
param_value_u v = layer.get(static_cast<param_t>(i));
|
||||
ASSERT_EQ(v.i, i * 10)
|
||||
<< "rep=" << rep << " param=" << i;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,5 +34,21 @@
|
||||
#include "atomic_transaction.h"
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
_MutexHolder AtomicTransaction::_mutex_holder = _MutexHolder {};
|
||||
static pthread_mutex_t _param_mutex;
|
||||
static pthread_once_t _param_mutex_once = PTHREAD_ONCE_INIT;
|
||||
|
||||
static void _init_param_mutex()
|
||||
{
|
||||
pthread_mutexattr_t attr;
|
||||
pthread_mutexattr_init(&attr);
|
||||
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||
pthread_mutex_init(&_param_mutex, &attr);
|
||||
pthread_mutexattr_destroy(&attr);
|
||||
}
|
||||
|
||||
pthread_mutex_t *AtomicTransaction::_get_mutex()
|
||||
{
|
||||
pthread_once(&_param_mutex_once, _init_param_mutex);
|
||||
return &_param_mutex;
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -39,25 +39,6 @@
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
#include <pthread.h>
|
||||
|
||||
class _MutexHolder
|
||||
{
|
||||
public:
|
||||
pthread_mutex_t _mutex;
|
||||
pthread_mutexattr_t _mutex_attr;
|
||||
|
||||
_MutexHolder()
|
||||
{
|
||||
pthread_mutexattr_init(&_mutex_attr);
|
||||
pthread_mutexattr_settype(&_mutex_attr, PTHREAD_MUTEX_RECURSIVE);
|
||||
pthread_mutex_init(&_mutex, &_mutex_attr);
|
||||
}
|
||||
|
||||
~_MutexHolder()
|
||||
{
|
||||
pthread_mutex_destroy(&_mutex);
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
||||
|
||||
@@ -69,7 +50,7 @@ private:
|
||||
#endif
|
||||
|
||||
#ifdef __PX4_POSIX
|
||||
static _MutexHolder _mutex_holder;
|
||||
static pthread_mutex_t *_get_mutex();
|
||||
#endif
|
||||
|
||||
public:
|
||||
@@ -89,7 +70,7 @@ public:
|
||||
_irq_state = px4_enter_critical_section();
|
||||
#endif
|
||||
#ifdef __PX4_POSIX
|
||||
pthread_mutex_lock(&_mutex_holder._mutex);
|
||||
pthread_mutex_lock(_get_mutex());
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -99,7 +80,7 @@ public:
|
||||
px4_leave_critical_section(_irq_state);
|
||||
#endif
|
||||
#ifdef __PX4_POSIX
|
||||
pthread_mutex_unlock(&_mutex_holder._mutex);
|
||||
pthread_mutex_unlock(_get_mutex());
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
+143
-110
@@ -48,6 +48,8 @@
|
||||
|
||||
#include "perf_counter.h"
|
||||
|
||||
#include <px4_platform_common/atomic.h>
|
||||
|
||||
/**
|
||||
* Header common to all counters.
|
||||
*/
|
||||
@@ -61,34 +63,34 @@ struct perf_ctr_header {
|
||||
* PC_EVENT counter.
|
||||
*/
|
||||
struct perf_ctr_count : public perf_ctr_header {
|
||||
uint64_t event_count{0};
|
||||
px4::atomic<uint64_t> event_count;
|
||||
};
|
||||
|
||||
/**
|
||||
* PC_ELAPSED counter.
|
||||
*/
|
||||
struct perf_ctr_elapsed : public perf_ctr_header {
|
||||
uint64_t event_count{0};
|
||||
uint64_t time_start{0};
|
||||
uint64_t time_total{0};
|
||||
uint32_t time_least{0};
|
||||
uint32_t time_most{0};
|
||||
float mean{0.0f};
|
||||
float M2{0.0f};
|
||||
px4::atomic<uint64_t> event_count;
|
||||
px4::atomic<uint64_t> time_start;
|
||||
px4::atomic<uint64_t> time_total;
|
||||
px4::atomic<uint32_t> time_least;
|
||||
px4::atomic<uint32_t> time_most;
|
||||
px4::atomic<float> mean;
|
||||
px4::atomic<float> M2;
|
||||
};
|
||||
|
||||
/**
|
||||
* PC_INTERVAL counter.
|
||||
*/
|
||||
struct perf_ctr_interval : public perf_ctr_header {
|
||||
uint64_t event_count{0};
|
||||
uint64_t time_event{0};
|
||||
uint64_t time_first{0};
|
||||
uint64_t time_last{0};
|
||||
uint32_t time_least{0};
|
||||
uint32_t time_most{0};
|
||||
float mean{0.0f};
|
||||
float M2{0.0f};
|
||||
px4::atomic<uint64_t> event_count;
|
||||
px4::atomic<uint64_t> time_event;
|
||||
px4::atomic<uint64_t> time_first;
|
||||
px4::atomic<uint64_t> time_last;
|
||||
px4::atomic<uint32_t> time_least;
|
||||
px4::atomic<uint32_t> time_most;
|
||||
px4::atomic<float> mean;
|
||||
px4::atomic<float> M2;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -100,12 +102,8 @@ static sq_queue_t perf_counters = { nullptr, nullptr };
|
||||
* mutex protecting access to the perf_counters linked list (which is read from & written to by different threads)
|
||||
*/
|
||||
pthread_mutex_t perf_counters_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
// FIXME: the mutex does **not** protect against access to/from the perf
|
||||
// counter's data. It can still happen that a counter is updated while it is
|
||||
// printed. This can lead to inconsistent output, or completely bogus values
|
||||
// (especially the 64bit values which are in general not atomically updated).
|
||||
// The same holds for shared perf counters (perf_alloc_once), that can be updated
|
||||
// concurrently (this affects the 'ctrl_latency' counter).
|
||||
// Note: the mutex only protects the linked list, not individual counter data.
|
||||
// All counter field accesses use px4::atomic to prevent torn reads/writes.
|
||||
|
||||
|
||||
perf_counter_t
|
||||
@@ -208,7 +206,7 @@ perf_count(perf_counter_t handle)
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_COUNT:
|
||||
((struct perf_ctr_count *)handle)->event_count++;
|
||||
((struct perf_ctr_count *)handle)->event_count.fetch_add(1);
|
||||
break;
|
||||
|
||||
case PC_INTERVAL:
|
||||
@@ -229,7 +227,7 @@ perf_begin(perf_counter_t handle)
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_ELAPSED:
|
||||
((struct perf_ctr_elapsed *)handle)->time_start = hrt_absolute_time();
|
||||
((struct perf_ctr_elapsed *)handle)->time_start.store(hrt_absolute_time());
|
||||
break;
|
||||
|
||||
default:
|
||||
@@ -247,9 +245,10 @@ perf_end(perf_counter_t handle)
|
||||
switch (handle->type) {
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
hrt_abstime ts = pce->time_start.load();
|
||||
|
||||
if (pce->time_start != 0) {
|
||||
perf_set_elapsed(handle, hrt_elapsed_time(&pce->time_start));
|
||||
if (ts != 0) {
|
||||
perf_set_elapsed(handle, hrt_absolute_time() - ts);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -271,25 +270,33 @@ perf_set_elapsed(perf_counter_t handle, int64_t elapsed)
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
|
||||
if (elapsed >= 0) {
|
||||
pce->event_count++;
|
||||
pce->time_total += elapsed;
|
||||
uint64_t ec = pce->event_count.fetch_add(1) + 1;
|
||||
pce->time_total.fetch_add((uint64_t)elapsed);
|
||||
|
||||
if ((pce->time_least > (uint32_t)elapsed) || (pce->time_least == 0)) {
|
||||
pce->time_least = elapsed;
|
||||
uint32_t tl = pce->time_least.load();
|
||||
|
||||
if ((tl > (uint32_t)elapsed) || (tl == 0)) {
|
||||
pce->time_least.store((uint32_t)elapsed);
|
||||
}
|
||||
|
||||
if (pce->time_most < (uint32_t)elapsed) {
|
||||
pce->time_most = elapsed;
|
||||
uint32_t tm = pce->time_most.load();
|
||||
|
||||
if (tm < (uint32_t)elapsed) {
|
||||
pce->time_most.store((uint32_t)elapsed);
|
||||
}
|
||||
|
||||
// maintain mean and variance of the elapsed time in seconds
|
||||
// Knuth/Welford recursive mean and variance of update intervals (via Wikipedia)
|
||||
float dt = elapsed / 1e6f;
|
||||
float delta_intvl = dt - pce->mean;
|
||||
pce->mean += delta_intvl / pce->event_count;
|
||||
pce->M2 += delta_intvl * (dt - pce->mean);
|
||||
float mean_val = pce->mean.load();
|
||||
float delta_intvl = dt - mean_val;
|
||||
mean_val += delta_intvl / ec;
|
||||
pce->mean.store(mean_val);
|
||||
float m2_val = pce->M2.load();
|
||||
m2_val += delta_intvl * (dt - mean_val);
|
||||
pce->M2.store(m2_val);
|
||||
|
||||
pce->time_start = 0;
|
||||
pce->time_start.store((uint64_t)0);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -309,42 +316,55 @@ perf_count_interval(perf_counter_t handle, hrt_abstime now)
|
||||
switch (handle->type) {
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
uint64_t ec = pci->event_count.load();
|
||||
|
||||
switch (pci->event_count) {
|
||||
switch (ec) {
|
||||
case 0:
|
||||
pci->time_first = now;
|
||||
pci->time_first.store(now);
|
||||
break;
|
||||
|
||||
case 1:
|
||||
pci->time_least = (uint32_t)(now - pci->time_last);
|
||||
pci->time_most = (uint32_t)(now - pci->time_last);
|
||||
pci->mean = pci->time_least / 1e6f;
|
||||
pci->M2 = 0;
|
||||
break;
|
||||
case 1: {
|
||||
uint64_t tl = pci->time_last.load();
|
||||
uint32_t interval = (uint32_t)(now - tl);
|
||||
pci->time_least.store(interval);
|
||||
pci->time_most.store(interval);
|
||||
pci->mean.store(interval / 1e6f);
|
||||
pci->M2.store(0.0f);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
hrt_abstime interval = now - pci->time_last;
|
||||
uint64_t tl = pci->time_last.load();
|
||||
hrt_abstime interval = now - tl;
|
||||
|
||||
if ((uint32_t)interval < pci->time_least) {
|
||||
pci->time_least = (uint32_t)interval;
|
||||
uint32_t least = pci->time_least.load();
|
||||
|
||||
if ((uint32_t)interval < least) {
|
||||
pci->time_least.store((uint32_t)interval);
|
||||
}
|
||||
|
||||
if ((uint32_t)interval > pci->time_most) {
|
||||
pci->time_most = (uint32_t)interval;
|
||||
uint32_t most = pci->time_most.load();
|
||||
|
||||
if ((uint32_t)interval > most) {
|
||||
pci->time_most.store((uint32_t)interval);
|
||||
}
|
||||
|
||||
// maintain mean and variance of interval in seconds
|
||||
// Knuth/Welford recursive mean and variance of update intervals (via Wikipedia)
|
||||
float dt = interval / 1e6f;
|
||||
float delta_intvl = dt - pci->mean;
|
||||
pci->mean += delta_intvl / pci->event_count;
|
||||
pci->M2 += delta_intvl * (dt - pci->mean);
|
||||
float mean_val = pci->mean.load();
|
||||
float delta_intvl = dt - mean_val;
|
||||
mean_val += delta_intvl / ec;
|
||||
pci->mean.store(mean_val);
|
||||
float m2_val = pci->M2.load();
|
||||
m2_val += delta_intvl * (dt - mean_val);
|
||||
pci->M2.store(m2_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pci->time_last = now;
|
||||
pci->event_count++;
|
||||
pci->time_last.store(now);
|
||||
pci->event_count.fetch_add(1);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -362,7 +382,7 @@ perf_set_count(perf_counter_t handle, uint64_t count)
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_COUNT: {
|
||||
((struct perf_ctr_count *)handle)->event_count = count;
|
||||
((struct perf_ctr_count *)handle)->event_count.store(count);
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -382,8 +402,7 @@ perf_cancel(perf_counter_t handle)
|
||||
switch (handle->type) {
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
|
||||
pce->time_start = 0;
|
||||
pce->time_start.store((uint64_t)0);
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -401,31 +420,31 @@ perf_reset(perf_counter_t handle)
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_COUNT:
|
||||
((struct perf_ctr_count *)handle)->event_count = 0;
|
||||
((struct perf_ctr_count *)handle)->event_count.store((uint64_t)0);
|
||||
break;
|
||||
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
pce->event_count = 0;
|
||||
pce->time_start = 0;
|
||||
pce->time_total = 0;
|
||||
pce->time_least = 0;
|
||||
pce->time_most = 0;
|
||||
pce->mean = 0.0f;
|
||||
pce->M2 = 0.0f;
|
||||
pce->event_count.store((uint64_t)0);
|
||||
pce->time_start.store((uint64_t)0);
|
||||
pce->time_total.store((uint64_t)0);
|
||||
pce->time_least.store((uint32_t)0);
|
||||
pce->time_most.store((uint32_t)0);
|
||||
pce->mean.store(0.0f);
|
||||
pce->M2.store(0.0f);
|
||||
break;
|
||||
}
|
||||
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
pci->event_count = 0;
|
||||
pci->time_event = 0;
|
||||
pci->time_first = 0;
|
||||
pci->time_last = 0;
|
||||
pci->time_least = 0;
|
||||
pci->time_most = 0;
|
||||
pci->mean = 0.0f;
|
||||
pci->M2 = 0.0f;
|
||||
pci->event_count.store((uint64_t)0);
|
||||
pci->time_event.store((uint64_t)0);
|
||||
pci->time_first.store((uint64_t)0);
|
||||
pci->time_last.store((uint64_t)0);
|
||||
pci->time_least.store((uint32_t)0);
|
||||
pci->time_most.store((uint32_t)0);
|
||||
pci->mean.store(0.0f);
|
||||
pci->M2.store(0.0f);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -439,38 +458,43 @@ perf_print_counter(perf_counter_t handle)
|
||||
}
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_COUNT:
|
||||
PX4_INFO_RAW("%s: %" PRIu64 " events\n",
|
||||
handle->name,
|
||||
((struct perf_ctr_count *)handle)->event_count);
|
||||
break;
|
||||
case PC_COUNT: {
|
||||
uint64_t ec = ((struct perf_ctr_count *)handle)->event_count.load();
|
||||
PX4_INFO_RAW("%s: %" PRIu64 " events\n",
|
||||
handle->name, ec);
|
||||
break;
|
||||
}
|
||||
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
float rms = sqrtf(pce->M2 / (pce->event_count - 1));
|
||||
uint64_t ec = pce->event_count.load();
|
||||
uint64_t tt = pce->time_total.load();
|
||||
uint32_t tl = pce->time_least.load();
|
||||
uint32_t tm = pce->time_most.load();
|
||||
float m2 = pce->M2.load();
|
||||
float rms = sqrtf(m2 / (ec - 1));
|
||||
PX4_INFO_RAW("%s: %" PRIu64 " events, %" PRIu64 "us elapsed, %.2fus avg, min %" PRIu32 "us max %" PRIu32
|
||||
"us %5.3fus rms\n",
|
||||
handle->name,
|
||||
pce->event_count,
|
||||
pce->time_total,
|
||||
(pce->event_count == 0) ? 0 : (double)pce->time_total / (double)pce->event_count,
|
||||
pce->time_least,
|
||||
pce->time_most,
|
||||
(double)(1e6f * rms));
|
||||
handle->name, ec, tt,
|
||||
(ec == 0) ? 0 : (double)tt / (double)ec,
|
||||
tl, tm, (double)(1e6f * rms));
|
||||
break;
|
||||
}
|
||||
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
float rms = sqrtf(pci->M2 / (pci->event_count - 1));
|
||||
uint64_t ec = pci->event_count.load();
|
||||
uint64_t tf = pci->time_first.load();
|
||||
uint64_t tla = pci->time_last.load();
|
||||
uint32_t tl = pci->time_least.load();
|
||||
uint32_t tm = pci->time_most.load();
|
||||
float m2 = pci->M2.load();
|
||||
float rms = sqrtf(m2 / (ec - 1));
|
||||
|
||||
PX4_INFO_RAW("%s: %" PRIu64 " events, %.2fus avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms\n",
|
||||
handle->name,
|
||||
pci->event_count,
|
||||
(pci->event_count == 0) ? 0 : (double)(pci->time_last - pci->time_first) / (double)pci->event_count,
|
||||
pci->time_least,
|
||||
pci->time_most,
|
||||
(double)(1e6f * rms));
|
||||
handle->name, ec,
|
||||
(ec == 0) ? 0 : (double)(tla - tf) / (double)ec,
|
||||
tl, tm, (double)(1e6f * rms));
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -493,35 +517,44 @@ perf_print_counter_buffer(char *buffer, int length, perf_counter_t handle)
|
||||
case PC_COUNT:
|
||||
num_written = snprintf(buffer, length, "%s: %" PRIu64 " events",
|
||||
handle->name,
|
||||
((struct perf_ctr_count *)handle)->event_count);
|
||||
((struct perf_ctr_count *)handle)->event_count.load());
|
||||
break;
|
||||
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
float rms = sqrtf(pce->M2 / (pce->event_count - 1));
|
||||
uint64_t ec = pce->event_count.load();
|
||||
uint64_t tt = pce->time_total.load();
|
||||
uint32_t tl = pce->time_least.load();
|
||||
uint32_t tm = pce->time_most.load();
|
||||
float m2 = pce->M2.load();
|
||||
float rms = sqrtf(m2 / (ec - 1));
|
||||
num_written = snprintf(buffer, length,
|
||||
"%s: %" PRIu64 " events, %" PRIu64 "us elapsed, %.2fus avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms",
|
||||
handle->name,
|
||||
pce->event_count,
|
||||
pce->time_total,
|
||||
(pce->event_count == 0) ? 0 : (double)pce->time_total / (double)pce->event_count,
|
||||
pce->time_least,
|
||||
pce->time_most,
|
||||
ec,
|
||||
tt,
|
||||
(ec == 0) ? 0 : (double)tt / (double)ec,
|
||||
tl,
|
||||
tm,
|
||||
(double)(1e6f * rms));
|
||||
break;
|
||||
}
|
||||
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
float rms = sqrtf(pci->M2 / (pci->event_count - 1));
|
||||
uint64_t ec = pci->event_count.load();
|
||||
uint32_t tl = pci->time_least.load();
|
||||
uint32_t tm = pci->time_most.load();
|
||||
float m2 = pci->M2.load();
|
||||
float rms = sqrtf(m2 / (ec - 1));
|
||||
|
||||
num_written = snprintf(buffer, length,
|
||||
"%s: %" PRIu64 " events, %.2f avg, min %" PRIu32 "us max %" PRIu32 "us %5.3fus rms",
|
||||
handle->name,
|
||||
pci->event_count,
|
||||
(pci->event_count == 0) ? 0 : (double)(pci->time_last - pci->time_first) / (double)pci->event_count,
|
||||
pci->time_least,
|
||||
pci->time_most,
|
||||
ec,
|
||||
(ec == 0) ? 0 : (double)(pci->time_last.load() - pci->time_first.load()) / (double)ec,
|
||||
tl,
|
||||
tm,
|
||||
(double)(1e6f * rms));
|
||||
break;
|
||||
}
|
||||
@@ -543,16 +576,16 @@ perf_event_count(perf_counter_t handle)
|
||||
|
||||
switch (handle->type) {
|
||||
case PC_COUNT:
|
||||
return ((struct perf_ctr_count *)handle)->event_count;
|
||||
return ((struct perf_ctr_count *)handle)->event_count.load();
|
||||
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
return pce->event_count;
|
||||
return pce->event_count.load();
|
||||
}
|
||||
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
return pci->event_count;
|
||||
return pci->event_count.load();
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -572,12 +605,12 @@ perf_mean(perf_counter_t handle)
|
||||
switch (handle->type) {
|
||||
case PC_ELAPSED: {
|
||||
struct perf_ctr_elapsed *pce = (struct perf_ctr_elapsed *)handle;
|
||||
return pce->mean;
|
||||
return pce->mean.load();
|
||||
}
|
||||
|
||||
case PC_INTERVAL: {
|
||||
struct perf_ctr_interval *pci = (struct perf_ctr_interval *)handle;
|
||||
return pci->mean;
|
||||
return pci->mean.load();
|
||||
}
|
||||
|
||||
default:
|
||||
|
||||
@@ -57,6 +57,8 @@
|
||||
#include <uORB/topics/dataman_request.h>
|
||||
#include <uORB/topics/dataman_response.h>
|
||||
|
||||
#include <px4_platform_common/atomic.h>
|
||||
|
||||
#include "dataman.h"
|
||||
|
||||
__BEGIN_DECLS
|
||||
@@ -126,7 +128,7 @@ static struct {
|
||||
uint8_t *data_end;
|
||||
} ram;
|
||||
};
|
||||
bool running;
|
||||
px4::atomic_bool running{false};
|
||||
bool silence = false;
|
||||
} dm_operations_data;
|
||||
|
||||
@@ -172,13 +174,14 @@ static enum {
|
||||
|
||||
static px4_sem_t g_init_sema;
|
||||
|
||||
static bool g_task_should_exit; /**< if true, dataman task should exit */
|
||||
static px4::atomic_bool g_task_should_exit{false}; /**< if true, dataman task should exit */
|
||||
static px4_task_t g_task_id = -1; /**< dataman task handle for join */
|
||||
|
||||
/* Work queue management functions */
|
||||
|
||||
static bool is_running()
|
||||
{
|
||||
return dm_operations_data.running;
|
||||
return dm_operations_data.running.load();
|
||||
}
|
||||
|
||||
/* Calculate the offset in file of specific item */
|
||||
@@ -607,7 +610,7 @@ _file_initialize(unsigned max_offset)
|
||||
g_dm_ops->write(DM_KEY_SAFE_POINTS_STATE, 0, reinterpret_cast<uint8_t *>(&stats), sizeof(mission_stats_entry_s));
|
||||
}
|
||||
|
||||
dm_operations_data.running = true;
|
||||
dm_operations_data.running.store(true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -627,7 +630,7 @@ _ram_initialize(unsigned max_offset)
|
||||
|
||||
memset(dm_operations_data.ram.data, 0, max_offset);
|
||||
dm_operations_data.ram.data_end = &dm_operations_data.ram.data[max_offset - 1];
|
||||
dm_operations_data.running = true;
|
||||
dm_operations_data.running.store(true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -637,7 +640,7 @@ static void
|
||||
_file_shutdown()
|
||||
{
|
||||
close(dm_operations_data.file.fd);
|
||||
dm_operations_data.running = false;
|
||||
dm_operations_data.running.store(false);
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -645,7 +648,7 @@ static void
|
||||
_ram_shutdown()
|
||||
{
|
||||
free(dm_operations_data.ram.data);
|
||||
dm_operations_data.running = false;
|
||||
dm_operations_data.running.store(false);
|
||||
}
|
||||
|
||||
static int
|
||||
@@ -683,7 +686,7 @@ task_main(int argc, char *argv[])
|
||||
g_func_counts[i] = 0;
|
||||
}
|
||||
|
||||
g_task_should_exit = false;
|
||||
g_task_should_exit.store(false);
|
||||
|
||||
uORB::Publication<dataman_response_s> dataman_response_pub{ORB_ID(dataman_response)};
|
||||
const int dataman_request_sub = orb_subscribe(ORB_ID(dataman_request));
|
||||
@@ -698,7 +701,7 @@ task_main(int argc, char *argv[])
|
||||
int ret = g_dm_ops->initialize(max_offset);
|
||||
|
||||
if (ret) {
|
||||
g_task_should_exit = true;
|
||||
g_task_should_exit.store(true);
|
||||
goto end;
|
||||
}
|
||||
|
||||
@@ -825,7 +828,7 @@ task_main(int argc, char *argv[])
|
||||
}
|
||||
|
||||
/* time to go???? */
|
||||
if (g_task_should_exit) {
|
||||
if (g_task_should_exit.load()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -856,9 +859,11 @@ start()
|
||||
px4_sem_setprotocol(&g_init_sema, SEM_PRIO_NONE);
|
||||
|
||||
/* start the worker thread with low priority for disk IO */
|
||||
if (px4_task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT - 10,
|
||||
PX4_STACK_ADJUSTED(TASK_STACK_SIZE), task_main,
|
||||
nullptr) < 0) {
|
||||
g_task_id = px4_task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT - 10,
|
||||
PX4_STACK_ADJUSTED(TASK_STACK_SIZE), task_main,
|
||||
nullptr);
|
||||
|
||||
if (g_task_id < 0) {
|
||||
px4_sem_destroy(&g_init_sema);
|
||||
PX4_ERR("task start failed");
|
||||
return -1;
|
||||
@@ -887,7 +892,7 @@ static void
|
||||
stop()
|
||||
{
|
||||
/* Tell the worker task to shut down */
|
||||
g_task_should_exit = true;
|
||||
g_task_should_exit.store(true);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -1017,6 +1022,8 @@ dataman_main(int argc, char *argv[])
|
||||
|
||||
if (!strcmp(argv[1], "stop")) {
|
||||
stop();
|
||||
px4_task_join(g_task_id);
|
||||
g_task_id = -1;
|
||||
#ifdef CONFIG_DATAMAN_PERSISTENT_STORAGE
|
||||
free(k_data_manager_device_path);
|
||||
k_data_manager_device_path = nullptr;
|
||||
|
||||
@@ -1134,7 +1134,7 @@ bool Logger::start_stop_logging()
|
||||
}
|
||||
}
|
||||
|
||||
desired_state = desired_state || _manually_logging_override;
|
||||
desired_state = desired_state || _manually_logging_override.load();
|
||||
|
||||
// only start/stop if this is a state transition
|
||||
if (updated && _prev_file_log_start_state != desired_state) {
|
||||
|
||||
@@ -144,7 +144,7 @@ public:
|
||||
|
||||
void print_statistics(LogType type);
|
||||
|
||||
void set_arm_override(bool override) { _manually_logging_override = override; }
|
||||
void set_arm_override(bool override) { _manually_logging_override.store(override); }
|
||||
|
||||
void trigger_watchdog_now()
|
||||
{
|
||||
@@ -345,7 +345,7 @@ private:
|
||||
LogFileName _file_name[(int)LogType::Count];
|
||||
|
||||
bool _prev_file_log_start_state{false}; ///< previous state depending on logging mode (arming or aux1 state)
|
||||
bool _manually_logging_override{false};
|
||||
px4::atomic_bool _manually_logging_override{false};
|
||||
|
||||
Statistics _statistics[(int)LogType::Count];
|
||||
hrt_abstime _last_sync_time{0}; ///< last time a sync msg was sent
|
||||
|
||||
@@ -89,11 +89,73 @@ events::EventBuffer *Mavlink::_event_buffer = nullptr;
|
||||
|
||||
Mavlink *mavlink_module_instances[MAVLINK_COMM_NUM_BUFFERS] {};
|
||||
|
||||
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length) { mavlink_module_instances[chan]->send_bytes(ch, length); }
|
||||
void mavlink_start_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_start(length); }
|
||||
void mavlink_end_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_finish(); }
|
||||
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return mavlink_module_instances[channel]->get_status(); }
|
||||
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return mavlink_module_instances[channel]->get_buffer(); }
|
||||
// Per-channel status, buffer, and send mutex live in static arrays so they survive instance teardown.
|
||||
// This prevents use-after-free when the MAVLink C library accesses channel status/buffer during a send
|
||||
// that races with instance destruction.
|
||||
static mavlink_status_t mavlink_channel_statuses[MAVLINK_COMM_NUM_BUFFERS] {};
|
||||
static mavlink_message_t mavlink_channel_buffers[MAVLINK_COMM_NUM_BUFFERS] {};
|
||||
// Recursive mutex: allows callers to hold lock_send() while send_start()/send_finish()
|
||||
// re-lock internally (called from _mav_finalize_message_chan_send via MAVLINK_START_UART_SEND).
|
||||
pthread_mutex_t mavlink_channel_send_mutexes[MAVLINK_COMM_NUM_BUFFERS];
|
||||
static pthread_once_t mavlink_channel_mutexes_once = PTHREAD_ONCE_INIT;
|
||||
|
||||
static void init_channel_send_mutexes()
|
||||
{
|
||||
pthread_mutexattr_t attr;
|
||||
pthread_mutexattr_init(&attr);
|
||||
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||
|
||||
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
|
||||
pthread_mutex_init(&mavlink_channel_send_mutexes[i], &attr);
|
||||
}
|
||||
|
||||
pthread_mutexattr_destroy(&attr);
|
||||
}
|
||||
|
||||
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length)
|
||||
{
|
||||
Mavlink *inst = mavlink_module_instances[chan];
|
||||
|
||||
if (inst) { inst->send_bytes(ch, length); }
|
||||
}
|
||||
|
||||
void mavlink_start_uart_send(mavlink_channel_t chan, int length)
|
||||
{
|
||||
Mavlink *inst = mavlink_module_instances[chan];
|
||||
|
||||
if (inst) { inst->send_start(length); }
|
||||
}
|
||||
|
||||
void mavlink_end_uart_send(mavlink_channel_t chan, int length)
|
||||
{
|
||||
Mavlink *inst = mavlink_module_instances[chan];
|
||||
|
||||
if (inst) { inst->send_finish(); }
|
||||
}
|
||||
|
||||
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return &mavlink_channel_statuses[channel]; }
|
||||
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return &mavlink_channel_buffers[channel]; }
|
||||
|
||||
mavlink_status_t *Mavlink::get_status()
|
||||
{
|
||||
if (_instance_id >= 0) {
|
||||
return &mavlink_channel_statuses[_instance_id];
|
||||
}
|
||||
|
||||
return &_mavlink_status_early;
|
||||
}
|
||||
|
||||
mavlink_message_t *Mavlink::get_buffer()
|
||||
{
|
||||
if (_instance_id >= 0) {
|
||||
return &mavlink_channel_buffers[_instance_id];
|
||||
}
|
||||
|
||||
return &_mavlink_buffer_early;
|
||||
}
|
||||
|
||||
void Mavlink::lock_send() { pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]); }
|
||||
void Mavlink::unlock_send() { pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]); }
|
||||
|
||||
static void usage();
|
||||
|
||||
@@ -245,6 +307,8 @@ bool Mavlink::set_channel()
|
||||
bool
|
||||
Mavlink::set_instance_id()
|
||||
{
|
||||
pthread_once(&mavlink_channel_mutexes_once, init_channel_send_mutexes);
|
||||
|
||||
LockGuard lg{mavlink_module_mutex};
|
||||
|
||||
// instance count
|
||||
@@ -263,6 +327,10 @@ Mavlink::set_instance_id()
|
||||
|
||||
for (int instance_id = 0; instance_id < MAVLINK_COMM_NUM_BUFFERS; instance_id++) {
|
||||
if (mavlink_module_instances[instance_id] == nullptr) {
|
||||
// Copy early-init status/buffer to static per-channel arrays
|
||||
mavlink_channel_statuses[instance_id] = _mavlink_status_early;
|
||||
mavlink_channel_buffers[instance_id] = _mavlink_buffer_early;
|
||||
|
||||
mavlink_module_instances[instance_id] = this;
|
||||
_instance_id = instance_id;
|
||||
mavlink_instance_count.fetch_add(1);
|
||||
@@ -377,7 +445,12 @@ Mavlink::destroy_all_instances()
|
||||
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
|
||||
if (mavlink_module_instances[i] != nullptr) {
|
||||
Mavlink *inst = mavlink_module_instances[i];
|
||||
|
||||
// Lock the channel send mutex to ensure no in-flight sends
|
||||
pthread_mutex_lock(&mavlink_channel_send_mutexes[i]);
|
||||
mavlink_module_instances[i] = nullptr;
|
||||
pthread_mutex_unlock(&mavlink_channel_send_mutexes[i]);
|
||||
|
||||
mavlink_instance_count.fetch_sub(1);
|
||||
delete inst;
|
||||
}
|
||||
@@ -744,7 +817,7 @@ Mavlink::get_free_tx_buf()
|
||||
|
||||
void Mavlink::send_start(int length)
|
||||
{
|
||||
pthread_mutex_lock(&_send_mutex);
|
||||
pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]);
|
||||
_last_write_try_time = hrt_absolute_time();
|
||||
|
||||
// check if there is space in the buffer
|
||||
@@ -752,7 +825,9 @@ void Mavlink::send_start(int length)
|
||||
// not enough space in buffer to send
|
||||
count_txerrbytes(length);
|
||||
|
||||
pthread_mutex_lock(&_tstatus_mutex);
|
||||
_tstatus.tx_buffer_overruns++;
|
||||
pthread_mutex_unlock(&_tstatus_mutex);
|
||||
|
||||
// prevent writes
|
||||
_tx_buffer_low = true;
|
||||
@@ -765,7 +840,7 @@ void Mavlink::send_start(int length)
|
||||
void Mavlink::send_finish()
|
||||
{
|
||||
if (_tx_buffer_low || (_buf_fill == 0)) {
|
||||
pthread_mutex_unlock(&_send_mutex);
|
||||
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -817,7 +892,9 @@ void Mavlink::send_finish()
|
||||
#endif // MAVLINK_UDP
|
||||
|
||||
if (ret == (int)_buf_fill) {
|
||||
pthread_mutex_lock(&_tstatus_mutex);
|
||||
_tstatus.tx_message_count++;
|
||||
pthread_mutex_unlock(&_tstatus_mutex);
|
||||
count_txbytes(_buf_fill);
|
||||
_last_write_success_time = _last_write_try_time;
|
||||
|
||||
@@ -827,7 +904,7 @@ void Mavlink::send_finish()
|
||||
|
||||
_buf_fill = 0;
|
||||
|
||||
pthread_mutex_unlock(&_send_mutex);
|
||||
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
|
||||
}
|
||||
|
||||
void Mavlink::send_bytes(const uint8_t *buf, unsigned packet_len)
|
||||
@@ -2277,8 +2354,8 @@ Mavlink::task_main(int argc, char *argv[])
|
||||
|
||||
pthread_mutex_init(&_mavlink_shell_mutex, nullptr);
|
||||
pthread_mutex_init(&_message_buffer_mutex, nullptr);
|
||||
pthread_mutex_init(&_send_mutex, nullptr);
|
||||
pthread_mutex_init(&_radio_status_mutex, nullptr);
|
||||
pthread_mutex_init(&_tstatus_mutex, nullptr);
|
||||
|
||||
/* if we are passing on mavlink messages, we need to prepare a buffer for this instance */
|
||||
if (get_forwarding_on()) {
|
||||
@@ -2502,20 +2579,16 @@ Mavlink::task_main(int argc, char *argv[])
|
||||
if (_bytes_timestamp != 0) {
|
||||
const float dt = (t - _bytes_timestamp) * 1e-6f;
|
||||
|
||||
_tstatus.tx_rate_avg = _bytes_tx / dt;
|
||||
_tstatus.tx_error_rate_avg = _bytes_txerr / dt;
|
||||
_tstatus.rx_rate_avg = _bytes_rx / dt;
|
||||
|
||||
_bytes_tx = 0;
|
||||
_bytes_txerr = 0;
|
||||
_bytes_rx = 0;
|
||||
_tstatus.tx_rate_avg = _bytes_tx.fetch_and(0) / dt;
|
||||
_tstatus.tx_error_rate_avg = _bytes_txerr.fetch_and(0) / dt;
|
||||
_tstatus.rx_rate_avg = _bytes_rx.fetch_and(0) / dt;
|
||||
}
|
||||
|
||||
_bytes_timestamp = t;
|
||||
}
|
||||
|
||||
// publish status at 1 Hz, or sooner if HEARTBEAT has updated
|
||||
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated) {
|
||||
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated.load()) {
|
||||
publish_telemetry_status();
|
||||
}
|
||||
|
||||
@@ -2548,9 +2621,9 @@ Mavlink::task_main(int argc, char *argv[])
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&_mavlink_shell_mutex);
|
||||
pthread_mutex_destroy(&_send_mutex);
|
||||
pthread_mutex_destroy(&_radio_status_mutex);
|
||||
pthread_mutex_destroy(&_message_buffer_mutex);
|
||||
pthread_mutex_destroy(&_tstatus_mutex);
|
||||
|
||||
PX4_INFO("exiting channel %i", (int)_channel);
|
||||
|
||||
@@ -2833,6 +2906,7 @@ void Mavlink::check_requested_subscriptions()
|
||||
void Mavlink::publish_telemetry_status()
|
||||
{
|
||||
// many fields are populated in place
|
||||
pthread_mutex_lock(&_tstatus_mutex);
|
||||
|
||||
_tstatus.mode = _mode;
|
||||
_tstatus.data_rate = _datarate;
|
||||
@@ -2844,10 +2918,11 @@ void Mavlink::publish_telemetry_status()
|
||||
|
||||
_tstatus.streams = _streams.size();
|
||||
|
||||
// telemetry_status is also updated from the receiver thread, but never the same fields
|
||||
_tstatus.timestamp = hrt_absolute_time();
|
||||
_telemetry_status_pub.publish(_tstatus);
|
||||
_tstatus_updated = false;
|
||||
|
||||
pthread_mutex_unlock(&_tstatus_mutex);
|
||||
_tstatus_updated.store(false);
|
||||
}
|
||||
|
||||
void Mavlink::configure_sik_radio()
|
||||
@@ -2923,7 +2998,9 @@ int Mavlink::start_helper(int argc, char *argv[])
|
||||
int instance_id = instance->get_instance_id();
|
||||
|
||||
if (instance_id >= 0) {
|
||||
pthread_mutex_lock(&mavlink_channel_send_mutexes[instance_id]);
|
||||
mavlink_module_instances[instance_id] = nullptr;
|
||||
pthread_mutex_unlock(&mavlink_channel_send_mutexes[instance_id]);
|
||||
mavlink_instance_count.fetch_sub(1);
|
||||
}
|
||||
|
||||
@@ -3241,7 +3318,9 @@ Mavlink::stop_command(int argc, char *argv[])
|
||||
|
||||
for (int mavlink_instance = 0; mavlink_instance < MAVLINK_COMM_NUM_BUFFERS; mavlink_instance++) {
|
||||
if (mavlink_module_instances[mavlink_instance] == inst) {
|
||||
pthread_mutex_lock(&mavlink_channel_send_mutexes[mavlink_instance]);
|
||||
mavlink_module_instances[mavlink_instance] = nullptr;
|
||||
pthread_mutex_unlock(&mavlink_channel_send_mutexes[mavlink_instance]);
|
||||
mavlink_instance_count.fetch_sub(1);
|
||||
delete inst;
|
||||
return PX4_OK;
|
||||
|
||||
@@ -62,6 +62,7 @@
|
||||
#include <parameters/param.h>
|
||||
#include <lib/variable_length_ringbuffer/VariableLengthRingbuffer.hpp>
|
||||
#include <perf/perf_counter.h>
|
||||
#include <px4_platform_common/atomic.h>
|
||||
#include <px4_platform_common/cli.h>
|
||||
#include <px4_platform_common/px4_config.h>
|
||||
#include <px4_platform_common/defines.h>
|
||||
@@ -132,8 +133,8 @@ public:
|
||||
static Mavlink *new_instance();
|
||||
static Mavlink *get_instance_for_device(const char *device_name);
|
||||
|
||||
mavlink_message_t *get_buffer() { return &_mavlink_buffer; }
|
||||
mavlink_status_t *get_status() { return &_mavlink_status; }
|
||||
mavlink_message_t *get_buffer();
|
||||
mavlink_status_t *get_status();
|
||||
|
||||
void setProtocolVersion(uint8_t version);
|
||||
uint8_t getProtocolVersion() const { return _protocol_version; };
|
||||
@@ -255,7 +256,7 @@ public:
|
||||
|
||||
bool get_forwarding_on() { return _forwarding_on; }
|
||||
|
||||
bool is_gcs_connected() { return _tstatus.heartbeat_type_gcs; }
|
||||
bool is_gcs_connected() { lock_telemetry_status(); bool r = _tstatus.heartbeat_type_gcs; unlock_telemetry_status(); return r; }
|
||||
|
||||
#if defined(MAVLINK_UDP)
|
||||
static Mavlink *get_instance_for_network_port(unsigned long port);
|
||||
@@ -383,34 +384,39 @@ public:
|
||||
float get_baudrate() { return _baudrate; }
|
||||
|
||||
/* Functions for waiting to start transmission until message received. */
|
||||
void set_has_received_messages(bool received_messages) { _received_messages = received_messages; }
|
||||
bool get_has_received_messages() { return _received_messages; }
|
||||
void set_has_received_messages(bool received_messages) { _received_messages.store(received_messages); }
|
||||
bool get_has_received_messages() { return _received_messages.load(); }
|
||||
void set_wait_to_transmit(bool wait) { _wait_to_transmit = wait; }
|
||||
bool get_wait_to_transmit() { return _wait_to_transmit; }
|
||||
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); }
|
||||
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages.load()))); }
|
||||
|
||||
/**
|
||||
* Count transmitted bytes
|
||||
*/
|
||||
void count_txbytes(unsigned n) { _bytes_tx += n; };
|
||||
void count_txbytes(unsigned n) { _bytes_tx.fetch_add(n); };
|
||||
|
||||
/**
|
||||
* Count bytes not transmitted because of errors
|
||||
*/
|
||||
void count_txerrbytes(unsigned n) { _bytes_txerr += n; };
|
||||
void count_txerrbytes(unsigned n) { _bytes_txerr.fetch_add(n); };
|
||||
|
||||
/**
|
||||
* Count received bytes
|
||||
*/
|
||||
void count_rxbytes(unsigned n) { _bytes_rx += n; };
|
||||
void count_rxbytes(unsigned n) { _bytes_rx.fetch_add(n); };
|
||||
|
||||
/**
|
||||
* Get the receive status of this MAVLink link
|
||||
*/
|
||||
telemetry_status_s &telemetry_status() { return _tstatus; }
|
||||
void telemetry_status_updated() { _tstatus_updated = true; }
|
||||
void lock_telemetry_status() { pthread_mutex_lock(&_tstatus_mutex); }
|
||||
void unlock_telemetry_status() { pthread_mutex_unlock(&_tstatus_mutex); }
|
||||
void telemetry_status_updated() { _tstatus_updated.store(true); }
|
||||
|
||||
void set_telemetry_status_type(uint8_t type) { _tstatus.type = type; }
|
||||
void set_telemetry_status_type(uint8_t type) { pthread_mutex_lock(&_tstatus_mutex); _tstatus.type = type; pthread_mutex_unlock(&_tstatus_mutex); }
|
||||
|
||||
void lock_send();
|
||||
void unlock_send();
|
||||
|
||||
void update_radio_status(const radio_status_s &radio_status);
|
||||
|
||||
@@ -521,14 +527,15 @@ private:
|
||||
static constexpr int MAVLINK_MAX_INTERVAL{10000};
|
||||
static constexpr float MAVLINK_MIN_MULTIPLIER{0.0005f};
|
||||
|
||||
mavlink_message_t _mavlink_buffer {};
|
||||
mavlink_status_t _mavlink_status {};
|
||||
// Used before _instance_id is assigned; copied to static per-channel arrays in set_instance_id()
|
||||
mavlink_message_t _mavlink_buffer_early {};
|
||||
mavlink_status_t _mavlink_status_early {};
|
||||
|
||||
/* states */
|
||||
bool _hil_enabled{false}; /**< Hardware In the Loop mode */
|
||||
bool _is_usb_uart{false}; /**< Port is USB */
|
||||
bool _wait_to_transmit{false}; /**< Wait to transmit until received messages. */
|
||||
bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
|
||||
px4::atomic_bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
|
||||
|
||||
px4::atomic_bool _should_check_events{false}; /**< Events subscription: only one MAVLink instance should check */
|
||||
px4::atomic_bool _sending_parameters{false}; /**< True if parameters are currently sent out */
|
||||
@@ -580,9 +587,9 @@ private:
|
||||
uint64_t _mavlink_start_time{0};
|
||||
uint8_t _protocol_version = 0; ///< after initialization the only values are 1 and 2
|
||||
|
||||
unsigned _bytes_tx{0};
|
||||
unsigned _bytes_txerr{0};
|
||||
unsigned _bytes_rx{0};
|
||||
px4::atomic<unsigned> _bytes_tx{0};
|
||||
px4::atomic<unsigned> _bytes_txerr{0};
|
||||
px4::atomic<unsigned> _bytes_rx{0};
|
||||
hrt_abstime _bytes_timestamp{0};
|
||||
|
||||
#if defined(MAVLINK_UDP)
|
||||
@@ -613,14 +620,14 @@ private:
|
||||
|
||||
radio_status_s _rstatus {};
|
||||
telemetry_status_s _tstatus {};
|
||||
bool _tstatus_updated{false};
|
||||
px4::atomic_bool _tstatus_updated{false};
|
||||
pthread_mutex_t _tstatus_mutex {};
|
||||
|
||||
ping_statistics_s _ping_stats {};
|
||||
|
||||
pthread_mutex_t _message_buffer_mutex{};
|
||||
VariableLengthRingbuffer _message_buffer{};
|
||||
|
||||
pthread_mutex_t _send_mutex {};
|
||||
pthread_mutex_t _radio_status_mutex {};
|
||||
|
||||
DEFINE_PARAMETERS(
|
||||
|
||||
@@ -1732,7 +1732,9 @@ MavlinkReceiver::handle_message_ping(mavlink_message_t *msg)
|
||||
|
||||
ping.target_system = msg->sysid;
|
||||
ping.target_component = msg->compid;
|
||||
_mavlink.lock_send();
|
||||
mavlink_msg_ping_send_struct(_mavlink.get_channel(), &ping);
|
||||
_mavlink.unlock_send();
|
||||
|
||||
} else if ((ping.target_system == mavlink_system.sysid) &&
|
||||
(ping.target_component ==
|
||||
@@ -2304,7 +2306,9 @@ MavlinkReceiver::get_message_interval(int msgId)
|
||||
}
|
||||
|
||||
// send back this value...
|
||||
_mavlink.lock_send();
|
||||
mavlink_msg_message_interval_send(_mavlink.get_channel(), msgId, interval);
|
||||
_mavlink.unlock_send();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2918,6 +2922,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
|
||||
}
|
||||
|
||||
if ((t >= _last_heartbeat_check + (TIMEOUT / 2)) || force) {
|
||||
_mavlink.lock_telemetry_status();
|
||||
telemetry_status_s &tstatus = _mavlink.telemetry_status();
|
||||
|
||||
tstatus.heartbeat_type_antenna_tracker = (t <= TIMEOUT + _heartbeat_type_antenna_tracker);
|
||||
@@ -2937,6 +2942,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
|
||||
tstatus.heartbeat_component_udp_bridge = (t <= TIMEOUT + _heartbeat_component_udp_bridge);
|
||||
tstatus.heartbeat_component_uart_bridge = (t <= TIMEOUT + _heartbeat_component_uart_bridge);
|
||||
|
||||
_mavlink.unlock_telemetry_status();
|
||||
_mavlink.telemetry_status_updated();
|
||||
_last_heartbeat_check = t;
|
||||
}
|
||||
@@ -3295,6 +3301,7 @@ MavlinkReceiver::run()
|
||||
|
||||
if (t - last_send_update > timeout * 1000) {
|
||||
_mission_manager.check_active_mission();
|
||||
_mavlink.lock_send();
|
||||
_mission_manager.send();
|
||||
|
||||
if (_mavlink.get_mode() != Mavlink::MAVLINK_MODE::MAVLINK_MODE_IRIDIUM) {
|
||||
@@ -3307,6 +3314,7 @@ MavlinkReceiver::run()
|
||||
}
|
||||
|
||||
_mavlink_log_handler.send();
|
||||
_mavlink.unlock_send();
|
||||
last_send_update = t;
|
||||
}
|
||||
|
||||
|
||||
@@ -969,13 +969,15 @@ bool MavlinkFtpTest::_setup_ftp_msg(const MavlinkFTP::PayloadHeader *payload_hea
|
||||
payload->padding = 0;
|
||||
|
||||
msg->checksum = 0;
|
||||
mavlink_msg_file_transfer_protocol_pack(clientSystemId, // Sender system id
|
||||
clientComponentId, // Sender component id
|
||||
msg, // Message to pack payload into
|
||||
0, // Target network
|
||||
serverSystemId, // Target system id
|
||||
serverComponentId, // Target component id
|
||||
payload_bytes); // Payload to pack into message
|
||||
mavlink_status_t local_status{};
|
||||
mavlink_msg_file_transfer_protocol_pack_status(clientSystemId, // Sender system id
|
||||
clientComponentId, // Sender component id
|
||||
&local_status, // Local status to avoid global race
|
||||
msg, // Message to pack payload into
|
||||
0, // Target network
|
||||
serverSystemId, // Target system id
|
||||
serverComponentId, // Target component id
|
||||
payload_bytes); // Payload to pack into message
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -73,16 +73,17 @@ bool VehicleAngularVelocity::Start()
|
||||
// force initial updates
|
||||
ParametersUpdate(true);
|
||||
|
||||
// initial sensor selection before registering callbacks to avoid data race with Run()
|
||||
if (!SensorSelectionUpdate(hrt_absolute_time(), true)) {
|
||||
ScheduleNow();
|
||||
}
|
||||
|
||||
// sensor_selection needed to change the active sensor if the primary stops updating
|
||||
if (!_sensor_selection_sub.registerCallback()) {
|
||||
PX4_ERR("callback registration failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!SensorSelectionUpdate(hrt_absolute_time(), true)) {
|
||||
ScheduleNow();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -281,7 +282,7 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
|
||||
}
|
||||
|
||||
if (sensor_gyro_fifo_sub.get().device_id == device_id) {
|
||||
if (_sensor_gyro_fifo_sub.ChangeInstance(i) && _sensor_gyro_fifo_sub.registerCallback()) {
|
||||
if (_sensor_gyro_fifo_sub.ChangeInstance(i)) {
|
||||
// make sure non-FIFO sub is unregistered
|
||||
_sensor_sub.unregisterCallback();
|
||||
|
||||
@@ -296,13 +297,16 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
|
||||
_bias.zero();
|
||||
_fifo_available = true;
|
||||
|
||||
perf_count(_selection_changed_perf);
|
||||
PX4_DEBUG("selecting sensor_gyro_fifo:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
|
||||
return true;
|
||||
// register callback after all member writes to avoid data race with Run()
|
||||
if (_sensor_gyro_fifo_sub.registerCallback()) {
|
||||
perf_count(_selection_changed_perf);
|
||||
PX4_DEBUG("selecting sensor_gyro_fifo:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
|
||||
return true;
|
||||
|
||||
} else {
|
||||
PX4_ERR("unable to register callback for sensor_gyro_fifo:%" PRIu8 " %" PRIu32,
|
||||
i, sensor_gyro_fifo_sub.get().device_id);
|
||||
} else {
|
||||
PX4_ERR("unable to register callback for sensor_gyro_fifo:%" PRIu8 " %" PRIu32,
|
||||
i, sensor_gyro_fifo_sub.get().device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -323,7 +327,7 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
|
||||
}
|
||||
|
||||
if (sensor_gyro_sub.get().device_id == device_id) {
|
||||
if (_sensor_sub.ChangeInstance(i) && _sensor_sub.registerCallback()) {
|
||||
if (_sensor_sub.ChangeInstance(i)) {
|
||||
// make sure FIFO sub is unregistered
|
||||
_sensor_gyro_fifo_sub.unregisterCallback();
|
||||
|
||||
@@ -338,13 +342,16 @@ bool VehicleAngularVelocity::SensorSelectionUpdate(const hrt_abstime &time_now_u
|
||||
_bias.zero();
|
||||
_fifo_available = false;
|
||||
|
||||
perf_count(_selection_changed_perf);
|
||||
PX4_DEBUG("selecting sensor_gyro:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
|
||||
return true;
|
||||
// register callback after all member writes to avoid data race with Run()
|
||||
if (_sensor_sub.registerCallback()) {
|
||||
perf_count(_selection_changed_perf);
|
||||
PX4_DEBUG("selecting sensor_gyro:%" PRIu8 " %" PRIu32, i, _selected_sensor_device_id);
|
||||
return true;
|
||||
|
||||
} else {
|
||||
PX4_ERR("unable to register callback for sensor_gyro:%" PRIu8 " %" PRIu32,
|
||||
i, sensor_gyro_sub.get().device_id);
|
||||
} else {
|
||||
PX4_ERR("unable to register callback for sensor_gyro:%" PRIu8 " %" PRIu32,
|
||||
i, sensor_gyro_sub.get().device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1687,6 +1687,7 @@ int simulator_mavlink_main(int argc, char *argv[])
|
||||
|
||||
} else {
|
||||
px4_task_delete(g_sim_task);
|
||||
px4_task_join(g_sim_task);
|
||||
g_sim_task = -1;
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@
|
||||
px4_add_module(
|
||||
MODULE systemcmds__microbench
|
||||
MAIN microbench
|
||||
STACK_MAIN 4096
|
||||
STACK_MAIN 16384
|
||||
COMPILE_FLAGS
|
||||
-Wno-double-promotion
|
||||
-Wno-unused-but-set-variable
|
||||
|
||||
+9
-5
@@ -1,8 +1,12 @@
|
||||
px4_add_git_submodule(TARGET git_fuzztest PATH "fuzztest")
|
||||
|
||||
message(STATUS "Adding fuzztest")
|
||||
# This will also add GTest
|
||||
add_subdirectory(fuzztest EXCLUDE_FROM_ALL)
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "ThreadSanitizer")
|
||||
message(STATUS "TSAN build: skipping fuzztest, fetching GTest only")
|
||||
px4_setup_gtest_without_fuzztest()
|
||||
else()
|
||||
px4_add_git_submodule(TARGET git_fuzztest PATH "fuzztest")
|
||||
message(STATUS "Adding fuzztest")
|
||||
# This will also add GTest
|
||||
add_subdirectory(fuzztest EXCLUDE_FROM_ALL)
|
||||
endif()
|
||||
|
||||
# Ensure there's no -R without any filter expression since that trips newer ctest versions
|
||||
if(TESTFILTER)
|
||||
|
||||
Reference in New Issue
Block a user