muorb: Added statistics and status reporting. Improved aggregator send performance.

This commit is contained in:
Eric Katzfey 2026-02-18 16:58:52 -07:00 committed by Eric Katzfey
parent a120773793
commit 996060f581
9 changed files with 202 additions and 15 deletions

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -76,6 +76,8 @@ int16_t mUORB::Aggregator::SendData()
if (sendFunc) {
if (aggregationEnabled) {
if (bufferWriteIndex) {
// Record the time when we send out any aggregated data
_last_send_time = hrt_absolute_time();
rc = sendFunc(topicName.c_str(), buffer[bufferId], bufferWriteIndex);
MoveToNextBuffer();
}

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -35,6 +35,7 @@
#include <string>
#include <string.h>
#include <drivers/drv_hrt.h>
#include "uORB/uORBCommunicator.hpp"
namespace mUORB
@ -55,6 +56,8 @@ public:
int16_t SendData();
hrt_abstime GetLastSendTime() { return _last_send_time; }
private:
static const bool debugFlag;
@ -79,6 +82,8 @@ private:
sendFuncPtr sendFunc;
hrt_abstime _last_send_time;
bool isAggregate(const char *name) { return (strcmp(name, topicName.c_str()) == 0); }
bool NewRecordOverflows(const char *messageName, int32_t length);

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (c) 2022 ModalAI, Inc. All rights reserved.
* Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -45,6 +45,11 @@ static bool enable_debug = false;
int
muorb_main(int argc, char *argv[])
{
if (uORB::AppsProtobufChannel::isInstance()) {
uORB::AppsProtobufChannel::GetInstance()->PrintStatus();
return 0;
}
return muorb_init();
}

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -47,6 +47,11 @@ std::map<std::string, int> uORB::AppsProtobufChannel::_SlpiSubscriberCache;
pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
bool uORB::AppsProtobufChannel::_Debug = false;
uint32_t uORB::AppsProtobufChannel::_total_bytes_sent = 0;
uint32_t uORB::AppsProtobufChannel::_bytes_sent_since_last_status_check = 0;
uint32_t uORB::AppsProtobufChannel::_total_bytes_received = 0;
uint32_t uORB::AppsProtobufChannel::_bytes_received_since_last_status_check = 0;
hrt_abstime uORB::AppsProtobufChannel::_last_status_check_time = 0;
hrt_abstime uORB::AppsProtobufChannel::_last_keepalive = 0;
@ -54,6 +59,8 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic,
const uint8_t *data,
uint32_t length_in_bytes)
{
_total_bytes_received += length_in_bytes;
_bytes_received_since_last_status_check += length_in_bytes;
if (_Debug) { PX4_INFO("Got Receive callback for topic %s", topic); }
@ -364,6 +371,9 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len
PX4_INFO("Sending data for topic %s", messageName);
}
_total_bytes_sent += length;
_bytes_sent_since_last_status_check += length;
pthread_mutex_lock(&_tx_mutex);
int16_t rc = fc_sensor_send_data(messageName, data, length);
pthread_mutex_unlock(&_tx_mutex);
@ -380,3 +390,21 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len
return -1;
}
void uORB::AppsProtobufChannel::PrintStatus()
{
PX4_INFO("total bytes sent: %u, total bytes received: %u", _total_bytes_sent, _total_bytes_received);
PX4_INFO("sent since last status: %u, received since last status: %u", _bytes_sent_since_last_status_check,
_bytes_received_since_last_status_check);
hrt_abstime elapsed = hrt_elapsed_time(&_last_status_check_time);
double seconds = (double) elapsed / 1000000.0;
double sent_kbps = ((double) _bytes_sent_since_last_status_check / seconds) / 1000.0;
double rxed_kbps = ((double) _bytes_received_since_last_status_check / seconds) / 1000.0;
PX4_INFO("Current tx rate: %.2f KBps, rx rate %.2f KBps", sent_kbps, rxed_kbps);
_bytes_sent_since_last_status_check = 0;
_bytes_received_since_last_status_check = 0;
_last_status_check_time = hrt_absolute_time();
}

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -158,6 +158,8 @@ public:
*/
int16_t shutdown();
void PrintStatus();
/**
* @brief Interface to test the functions of the protobuf channel.
*
@ -181,6 +183,15 @@ private:
static pthread_mutex_t _tx_mutex;
static pthread_mutex_t _rx_mutex;
static bool _Debug;
/*
* Status
*/
static uint32_t _total_bytes_sent;
static uint32_t _bytes_sent_since_last_status_check;
static uint32_t _total_bytes_received;
static uint32_t _bytes_received_since_last_status_check;
static hrt_abstime _last_status_check_time;
static hrt_abstime _last_keepalive;
bool _Initialized;

View File

@ -1,6 +1,6 @@
############################################################################
#
# Copyright (c) 2022 ModalAI, Inc. All rights reserved.
# Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
@ -31,6 +31,17 @@
#
############################################################################
px4_add_module(
MODULE modules__muorb__slpi__main
MAIN muorb
COMPILE_FLAGS
-Wno-cast-align # TODO: fix and enable
INCLUDES
../aggregator
SRCS
muorb_main.cpp
)
px4_add_library(modules__muorb__slpi
uORBProtobufChannel.cpp
../test/MUORBTest.cpp

View File

@ -0,0 +1,51 @@
/****************************************************************************
*
* Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
#include <string.h>
#include "uORBProtobufChannel.hpp"
extern "C" {
__EXPORT int muorb_main(int argc, char *argv[]);
}
int
muorb_main(int argc, char *argv[])
{
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
if (channel) {
channel->PrintStatus();
}
return 0;
}

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -66,6 +66,11 @@ pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
bool uORB::ProtobufChannel::_debug = false;
uint32_t uORB::ProtobufChannel::_total_bytes_sent = 0;
uint32_t uORB::ProtobufChannel::_bytes_sent_since_last_status_check = 0;
uint32_t uORB::ProtobufChannel::_total_bytes_received = 0;
uint32_t uORB::ProtobufChannel::_bytes_received_since_last_status_check = 0;
hrt_abstime uORB::ProtobufChannel::_last_status_check_time = 0;
hrt_abstime uORB::ProtobufChannel::_last_keepalive = 0;
char uORB::ProtobufChannel::_keepalive_filename[] = "/data/px4/slpi/keepalive_fail";
@ -94,11 +99,13 @@ static void aggregator_thread_func(void *ptr)
uORB::ProtobufChannel *muorb = uORB::ProtobufChannel::GetInstance();
const uint64_t SEND_TIMEOUT = 3000; // 3 ms
while (true) {
// Check for timeout. Send buffer if timeout happened.
muorb->SendAggregateData();
muorb->SendAggregateData(SEND_TIMEOUT);
qurt_timer_sleep(2000);
qurt_timer_sleep(SEND_TIMEOUT);
}
qurt_thread_exit(QURT_EOK);
@ -248,6 +255,9 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len
rc = _Aggregator.ProcessTransmitTopic(messageName, data, length);
} else {
_total_bytes_sent += length;
_bytes_sent_since_last_status_check += length;
// SLPI logs don't go through the aggregator
rc = muorb_func_ptrs.topic_data_func_ptr(messageName, data, length);
}
@ -270,6 +280,38 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len
return -1;
}
void uORB::ProtobufChannel::PrintStatus()
{
PX4_INFO("total bytes sent: %u, total bytes received: %u", _total_bytes_sent, _total_bytes_received);
PX4_INFO("sent since last status: %u, received since last status: %u", _bytes_sent_since_last_status_check,
_bytes_received_since_last_status_check);
hrt_abstime elapsed = hrt_elapsed_time(&_last_status_check_time);
double seconds = (double) elapsed / 1000000.0;
double sent_kbps = ((double) _bytes_sent_since_last_status_check / seconds) / 1000.0;
double rxed_kbps = ((double) _bytes_received_since_last_status_check / seconds) / 1000.0;
PX4_INFO("Current tx rate: %.2f KBps, rx rate %.2f KBps", sent_kbps, rxed_kbps);
_bytes_sent_since_last_status_check = 0;
_bytes_received_since_last_status_check = 0;
_last_status_check_time = hrt_absolute_time();
}
void uORB::ProtobufChannel::SendAggregateData(hrt_abstime timeout)
{
const hrt_abstime last = _Aggregator.GetLastSendTime();
// The aggregator buffer will get sent out whenever it fills up. If that
// hasn't happened for awhile then just send what we have now to avoid
// large periods of time with no topic data
if (hrt_elapsed_time(&last) > timeout) {
pthread_mutex_lock(&_tx_mutex);
_Aggregator.SendData();
pthread_mutex_unlock(&_tx_mutex);
}
}
static void *test_runner(void *)
{
if (_px4_muorb_debug) { PX4_INFO("test_runner called"); }
@ -310,6 +352,18 @@ __BEGIN_DECLS
extern int slpi_main(int argc, char *argv[]);
__END_DECLS
static int slpi_send_aggregated_topics(const char *name, const uint8_t *data, int len)
{
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
if (channel) { channel->RecordAggregateSend(len); }
muorb_func_ptrs.topic_data_func_ptr(name, data, len);
return 0;
}
int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us)
{
hrt_set_absolute_time_offset(clock_offset_us);
@ -350,7 +404,7 @@ int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us)
param_init();
uORB::ProtobufChannel::GetInstance()->RegisterSendHandler(muorb_func_ptrs.topic_data_func_ptr);
uORB::ProtobufChannel::GetInstance()->RegisterSendHandler(slpi_send_aggregated_topics);
// Configure the I2C driver function pointers
device::I2C::configure_callbacks(muorb_func_ptrs._config_i2c_bus_func_t, muorb_func_ptrs._set_i2c_address_func_t,
@ -550,6 +604,8 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data,
return 0;
}
channel->UpdateRxStatistics(data_len_in_bytes);
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
if (rxHandler) {

View File

@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2022 ModalAI, Inc. All rights reserved.
* Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -155,11 +155,20 @@ public:
bool DebugEnabled() { return _debug; }
void SendAggregateData()
void SendAggregateData(hrt_abstime timeout);
void PrintStatus();
void UpdateRxStatistics(uint32_t cnt)
{
pthread_mutex_lock(&_tx_mutex);
_Aggregator.SendData();
pthread_mutex_unlock(&_tx_mutex);
_total_bytes_received += cnt;
_bytes_received_since_last_status_check += cnt;
}
void RecordAggregateSend(int length)
{
_total_bytes_sent += length;
_bytes_sent_since_last_status_check += length;
}
static void keepalive_thread_func(void *ptr);
@ -177,6 +186,15 @@ private:
static pthread_mutex_t _tx_mutex;
static pthread_mutex_t _rx_mutex;
static bool _debug;
/*
* Status
*/
static uint32_t _total_bytes_sent;
static uint32_t _bytes_sent_since_last_status_check;
static uint32_t _total_bytes_received;
static uint32_t _bytes_received_since_last_status_check;
static hrt_abstime _last_status_check_time;
static hrt_abstime _last_keepalive;
static char _keepalive_filename[];