mavlink: fix TSAN issues

This commit is contained in:
Julian Oes
2026-02-18 13:30:07 +13:00
parent 17ac592cd2
commit 2ad0b297ca
4 changed files with 142 additions and 46 deletions
+99 -20
View File
@@ -89,11 +89,73 @@ events::EventBuffer *Mavlink::_event_buffer = nullptr;
Mavlink *mavlink_module_instances[MAVLINK_COMM_NUM_BUFFERS] {};
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length) { mavlink_module_instances[chan]->send_bytes(ch, length); }
void mavlink_start_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_start(length); }
void mavlink_end_uart_send(mavlink_channel_t chan, int length) { mavlink_module_instances[chan]->send_finish(); }
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return mavlink_module_instances[channel]->get_status(); }
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return mavlink_module_instances[channel]->get_buffer(); }
// Per-channel status, buffer, and send mutex live in static arrays so they survive instance teardown.
// This prevents use-after-free when the MAVLink C library accesses channel status/buffer during a send
// that races with instance destruction.
static mavlink_status_t mavlink_channel_statuses[MAVLINK_COMM_NUM_BUFFERS] {};
static mavlink_message_t mavlink_channel_buffers[MAVLINK_COMM_NUM_BUFFERS] {};
// Recursive mutex: allows callers to hold lock_send() while send_start()/send_finish()
// re-lock internally (called from _mav_finalize_message_chan_send via MAVLINK_START_UART_SEND).
pthread_mutex_t mavlink_channel_send_mutexes[MAVLINK_COMM_NUM_BUFFERS];
static pthread_once_t mavlink_channel_mutexes_once = PTHREAD_ONCE_INIT;
static void init_channel_send_mutexes()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
pthread_mutex_init(&mavlink_channel_send_mutexes[i], &attr);
}
pthread_mutexattr_destroy(&attr);
}
void mavlink_send_uart_bytes(mavlink_channel_t chan, const uint8_t *ch, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_bytes(ch, length); }
}
void mavlink_start_uart_send(mavlink_channel_t chan, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_start(length); }
}
void mavlink_end_uart_send(mavlink_channel_t chan, int length)
{
Mavlink *inst = mavlink_module_instances[chan];
if (inst) { inst->send_finish(); }
}
mavlink_status_t *mavlink_get_channel_status(uint8_t channel) { return &mavlink_channel_statuses[channel]; }
mavlink_message_t *mavlink_get_channel_buffer(uint8_t channel) { return &mavlink_channel_buffers[channel]; }
mavlink_status_t *Mavlink::get_status()
{
if (_instance_id >= 0) {
return &mavlink_channel_statuses[_instance_id];
}
return &_mavlink_status_early;
}
mavlink_message_t *Mavlink::get_buffer()
{
if (_instance_id >= 0) {
return &mavlink_channel_buffers[_instance_id];
}
return &_mavlink_buffer_early;
}
void Mavlink::lock_send() { pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]); }
void Mavlink::unlock_send() { pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]); }
static void usage();
@@ -245,6 +307,8 @@ bool Mavlink::set_channel()
bool
Mavlink::set_instance_id()
{
pthread_once(&mavlink_channel_mutexes_once, init_channel_send_mutexes);
LockGuard lg{mavlink_module_mutex};
// instance count
@@ -263,6 +327,10 @@ Mavlink::set_instance_id()
for (int instance_id = 0; instance_id < MAVLINK_COMM_NUM_BUFFERS; instance_id++) {
if (mavlink_module_instances[instance_id] == nullptr) {
// Copy early-init status/buffer to static per-channel arrays
mavlink_channel_statuses[instance_id] = _mavlink_status_early;
mavlink_channel_buffers[instance_id] = _mavlink_buffer_early;
mavlink_module_instances[instance_id] = this;
_instance_id = instance_id;
mavlink_instance_count.fetch_add(1);
@@ -377,7 +445,12 @@ Mavlink::destroy_all_instances()
for (int i = 0; i < MAVLINK_COMM_NUM_BUFFERS; i++) {
if (mavlink_module_instances[i] != nullptr) {
Mavlink *inst = mavlink_module_instances[i];
// Lock the channel send mutex to ensure no in-flight sends
pthread_mutex_lock(&mavlink_channel_send_mutexes[i]);
mavlink_module_instances[i] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[i]);
mavlink_instance_count.fetch_sub(1);
delete inst;
}
@@ -744,7 +817,7 @@ Mavlink::get_free_tx_buf()
void Mavlink::send_start(int length)
{
pthread_mutex_lock(&_send_mutex);
pthread_mutex_lock(&mavlink_channel_send_mutexes[_instance_id]);
_last_write_try_time = hrt_absolute_time();
// check if there is space in the buffer
@@ -752,7 +825,9 @@ void Mavlink::send_start(int length)
// not enough space in buffer to send
count_txerrbytes(length);
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.tx_buffer_overruns++;
pthread_mutex_unlock(&_tstatus_mutex);
// prevent writes
_tx_buffer_low = true;
@@ -765,7 +840,7 @@ void Mavlink::send_start(int length)
void Mavlink::send_finish()
{
if (_tx_buffer_low || (_buf_fill == 0)) {
pthread_mutex_unlock(&_send_mutex);
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
return;
}
@@ -817,7 +892,9 @@ void Mavlink::send_finish()
#endif // MAVLINK_UDP
if (ret == (int)_buf_fill) {
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.tx_message_count++;
pthread_mutex_unlock(&_tstatus_mutex);
count_txbytes(_buf_fill);
_last_write_success_time = _last_write_try_time;
@@ -827,7 +904,7 @@ void Mavlink::send_finish()
_buf_fill = 0;
pthread_mutex_unlock(&_send_mutex);
pthread_mutex_unlock(&mavlink_channel_send_mutexes[_instance_id]);
}
void Mavlink::send_bytes(const uint8_t *buf, unsigned packet_len)
@@ -2277,8 +2354,8 @@ Mavlink::task_main(int argc, char *argv[])
pthread_mutex_init(&_mavlink_shell_mutex, nullptr);
pthread_mutex_init(&_message_buffer_mutex, nullptr);
pthread_mutex_init(&_send_mutex, nullptr);
pthread_mutex_init(&_radio_status_mutex, nullptr);
pthread_mutex_init(&_tstatus_mutex, nullptr);
/* if we are passing on mavlink messages, we need to prepare a buffer for this instance */
if (get_forwarding_on()) {
@@ -2502,20 +2579,16 @@ Mavlink::task_main(int argc, char *argv[])
if (_bytes_timestamp != 0) {
const float dt = (t - _bytes_timestamp) * 1e-6f;
_tstatus.tx_rate_avg = _bytes_tx / dt;
_tstatus.tx_error_rate_avg = _bytes_txerr / dt;
_tstatus.rx_rate_avg = _bytes_rx / dt;
_bytes_tx = 0;
_bytes_txerr = 0;
_bytes_rx = 0;
_tstatus.tx_rate_avg = _bytes_tx.fetch_and(0) / dt;
_tstatus.tx_error_rate_avg = _bytes_txerr.fetch_and(0) / dt;
_tstatus.rx_rate_avg = _bytes_rx.fetch_and(0) / dt;
}
_bytes_timestamp = t;
}
// publish status at 1 Hz, or sooner if HEARTBEAT has updated
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated) {
if ((hrt_elapsed_time(&_tstatus.timestamp) >= 1_s) || _tstatus_updated.load()) {
publish_telemetry_status();
}
@@ -2548,9 +2621,9 @@ Mavlink::task_main(int argc, char *argv[])
}
pthread_mutex_destroy(&_mavlink_shell_mutex);
pthread_mutex_destroy(&_send_mutex);
pthread_mutex_destroy(&_radio_status_mutex);
pthread_mutex_destroy(&_message_buffer_mutex);
pthread_mutex_destroy(&_tstatus_mutex);
PX4_INFO("exiting channel %i", (int)_channel);
@@ -2833,6 +2906,7 @@ void Mavlink::check_requested_subscriptions()
void Mavlink::publish_telemetry_status()
{
// many fields are populated in place
pthread_mutex_lock(&_tstatus_mutex);
_tstatus.mode = _mode;
_tstatus.data_rate = _datarate;
@@ -2844,10 +2918,11 @@ void Mavlink::publish_telemetry_status()
_tstatus.streams = _streams.size();
// telemetry_status is also updated from the receiver thread, but never the same fields
_tstatus.timestamp = hrt_absolute_time();
_telemetry_status_pub.publish(_tstatus);
_tstatus_updated = false;
pthread_mutex_unlock(&_tstatus_mutex);
_tstatus_updated.store(false);
}
void Mavlink::configure_sik_radio()
@@ -2923,7 +2998,9 @@ int Mavlink::start_helper(int argc, char *argv[])
int instance_id = instance->get_instance_id();
if (instance_id >= 0) {
pthread_mutex_lock(&mavlink_channel_send_mutexes[instance_id]);
mavlink_module_instances[instance_id] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[instance_id]);
mavlink_instance_count.fetch_sub(1);
}
@@ -3241,7 +3318,9 @@ Mavlink::stop_command(int argc, char *argv[])
for (int mavlink_instance = 0; mavlink_instance < MAVLINK_COMM_NUM_BUFFERS; mavlink_instance++) {
if (mavlink_module_instances[mavlink_instance] == inst) {
pthread_mutex_lock(&mavlink_channel_send_mutexes[mavlink_instance]);
mavlink_module_instances[mavlink_instance] = nullptr;
pthread_mutex_unlock(&mavlink_channel_send_mutexes[mavlink_instance]);
mavlink_instance_count.fetch_sub(1);
delete inst;
return PX4_OK;
+26 -19
View File
@@ -62,6 +62,7 @@
#include <parameters/param.h>
#include <lib/variable_length_ringbuffer/VariableLengthRingbuffer.hpp>
#include <perf/perf_counter.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/cli.h>
#include <px4_platform_common/px4_config.h>
#include <px4_platform_common/defines.h>
@@ -132,8 +133,8 @@ public:
static Mavlink *new_instance();
static Mavlink *get_instance_for_device(const char *device_name);
mavlink_message_t *get_buffer() { return &_mavlink_buffer; }
mavlink_status_t *get_status() { return &_mavlink_status; }
mavlink_message_t *get_buffer();
mavlink_status_t *get_status();
void setProtocolVersion(uint8_t version);
uint8_t getProtocolVersion() const { return _protocol_version; };
@@ -255,7 +256,7 @@ public:
bool get_forwarding_on() { return _forwarding_on; }
bool is_gcs_connected() { return _tstatus.heartbeat_type_gcs; }
bool is_gcs_connected() { lock_telemetry_status(); bool r = _tstatus.heartbeat_type_gcs; unlock_telemetry_status(); return r; }
#if defined(MAVLINK_UDP)
static Mavlink *get_instance_for_network_port(unsigned long port);
@@ -383,34 +384,39 @@ public:
float get_baudrate() { return _baudrate; }
/* Functions for waiting to start transmission until message received. */
void set_has_received_messages(bool received_messages) { _received_messages = received_messages; }
bool get_has_received_messages() { return _received_messages; }
void set_has_received_messages(bool received_messages) { _received_messages.store(received_messages); }
bool get_has_received_messages() { return _received_messages.load(); }
void set_wait_to_transmit(bool wait) { _wait_to_transmit = wait; }
bool get_wait_to_transmit() { return _wait_to_transmit; }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); }
bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages.load()))); }
/**
* Count transmitted bytes
*/
void count_txbytes(unsigned n) { _bytes_tx += n; };
void count_txbytes(unsigned n) { _bytes_tx.fetch_add(n); };
/**
* Count bytes not transmitted because of errors
*/
void count_txerrbytes(unsigned n) { _bytes_txerr += n; };
void count_txerrbytes(unsigned n) { _bytes_txerr.fetch_add(n); };
/**
* Count received bytes
*/
void count_rxbytes(unsigned n) { _bytes_rx += n; };
void count_rxbytes(unsigned n) { _bytes_rx.fetch_add(n); };
/**
* Get the receive status of this MAVLink link
*/
telemetry_status_s &telemetry_status() { return _tstatus; }
void telemetry_status_updated() { _tstatus_updated = true; }
void lock_telemetry_status() { pthread_mutex_lock(&_tstatus_mutex); }
void unlock_telemetry_status() { pthread_mutex_unlock(&_tstatus_mutex); }
void telemetry_status_updated() { _tstatus_updated.store(true); }
void set_telemetry_status_type(uint8_t type) { _tstatus.type = type; }
void set_telemetry_status_type(uint8_t type) { pthread_mutex_lock(&_tstatus_mutex); _tstatus.type = type; pthread_mutex_unlock(&_tstatus_mutex); }
void lock_send();
void unlock_send();
void update_radio_status(const radio_status_s &radio_status);
@@ -521,14 +527,15 @@ private:
static constexpr int MAVLINK_MAX_INTERVAL{10000};
static constexpr float MAVLINK_MIN_MULTIPLIER{0.0005f};
mavlink_message_t _mavlink_buffer {};
mavlink_status_t _mavlink_status {};
// Used before _instance_id is assigned; copied to static per-channel arrays in set_instance_id()
mavlink_message_t _mavlink_buffer_early {};
mavlink_status_t _mavlink_status_early {};
/* states */
bool _hil_enabled{false}; /**< Hardware In the Loop mode */
bool _is_usb_uart{false}; /**< Port is USB */
bool _wait_to_transmit{false}; /**< Wait to transmit until received messages. */
bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
px4::atomic_bool _received_messages{false}; /**< Whether we've received valid mavlink messages. */
px4::atomic_bool _should_check_events{false}; /**< Events subscription: only one MAVLink instance should check */
px4::atomic_bool _sending_parameters{false}; /**< True if parameters are currently sent out */
@@ -580,9 +587,9 @@ private:
uint64_t _mavlink_start_time{0};
uint8_t _protocol_version = 0; ///< after initialization the only values are 1 and 2
unsigned _bytes_tx{0};
unsigned _bytes_txerr{0};
unsigned _bytes_rx{0};
px4::atomic<unsigned> _bytes_tx{0};
px4::atomic<unsigned> _bytes_txerr{0};
px4::atomic<unsigned> _bytes_rx{0};
hrt_abstime _bytes_timestamp{0};
#if defined(MAVLINK_UDP)
@@ -613,14 +620,14 @@ private:
radio_status_s _rstatus {};
telemetry_status_s _tstatus {};
bool _tstatus_updated{false};
px4::atomic_bool _tstatus_updated{false};
pthread_mutex_t _tstatus_mutex {};
ping_statistics_s _ping_stats {};
pthread_mutex_t _message_buffer_mutex{};
VariableLengthRingbuffer _message_buffer{};
pthread_mutex_t _send_mutex {};
pthread_mutex_t _radio_status_mutex {};
DEFINE_PARAMETERS(
+8
View File
@@ -1732,7 +1732,9 @@ MavlinkReceiver::handle_message_ping(mavlink_message_t *msg)
ping.target_system = msg->sysid;
ping.target_component = msg->compid;
_mavlink.lock_send();
mavlink_msg_ping_send_struct(_mavlink.get_channel(), &ping);
_mavlink.unlock_send();
} else if ((ping.target_system == mavlink_system.sysid) &&
(ping.target_component ==
@@ -2304,7 +2306,9 @@ MavlinkReceiver::get_message_interval(int msgId)
}
// send back this value...
_mavlink.lock_send();
mavlink_msg_message_interval_send(_mavlink.get_channel(), msgId, interval);
_mavlink.unlock_send();
}
void
@@ -2918,6 +2922,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
}
if ((t >= _last_heartbeat_check + (TIMEOUT / 2)) || force) {
_mavlink.lock_telemetry_status();
telemetry_status_s &tstatus = _mavlink.telemetry_status();
tstatus.heartbeat_type_antenna_tracker = (t <= TIMEOUT + _heartbeat_type_antenna_tracker);
@@ -2937,6 +2942,7 @@ void MavlinkReceiver::CheckHeartbeats(const hrt_abstime &t, bool force)
tstatus.heartbeat_component_udp_bridge = (t <= TIMEOUT + _heartbeat_component_udp_bridge);
tstatus.heartbeat_component_uart_bridge = (t <= TIMEOUT + _heartbeat_component_uart_bridge);
_mavlink.unlock_telemetry_status();
_mavlink.telemetry_status_updated();
_last_heartbeat_check = t;
}
@@ -3295,6 +3301,7 @@ MavlinkReceiver::run()
if (t - last_send_update > timeout * 1000) {
_mission_manager.check_active_mission();
_mavlink.lock_send();
_mission_manager.send();
if (_mavlink.get_mode() != Mavlink::MAVLINK_MODE::MAVLINK_MODE_IRIDIUM) {
@@ -3307,6 +3314,7 @@ MavlinkReceiver::run()
}
_mavlink_log_handler.send();
_mavlink.unlock_send();
last_send_update = t;
}
@@ -969,13 +969,15 @@ bool MavlinkFtpTest::_setup_ftp_msg(const MavlinkFTP::PayloadHeader *payload_hea
payload->padding = 0;
msg->checksum = 0;
mavlink_msg_file_transfer_protocol_pack(clientSystemId, // Sender system id
clientComponentId, // Sender component id
msg, // Message to pack payload into
0, // Target network
serverSystemId, // Target system id
serverComponentId, // Target component id
payload_bytes); // Payload to pack into message
mavlink_status_t local_status{};
mavlink_msg_file_transfer_protocol_pack_status(clientSystemId, // Sender system id
clientComponentId, // Sender component id
&local_status, // Local status to avoid global race
msg, // Message to pack payload into
0, // Target network
serverSystemId, // Target system id
serverComponentId, // Target component id
payload_bytes); // Payload to pack into message
return true;
}