From 996060f581cdf63c31789e9066c0ce3e5495b130 Mon Sep 17 00:00:00 2001 From: Eric Katzfey Date: Wed, 18 Feb 2026 16:58:52 -0700 Subject: [PATCH] muorb: Added statistics and status reporting. Improved aggregator send performance. --- .../muorb/aggregator/mUORBAggregator.cpp | 4 +- .../muorb/aggregator/mUORBAggregator.hpp | 7 +- src/modules/muorb/apps/muorb_main.cpp | 7 +- .../muorb/apps/uORBAppsProtobufChannel.cpp | 30 ++++++++- .../muorb/apps/uORBAppsProtobufChannel.hpp | 13 +++- src/modules/muorb/slpi/CMakeLists.txt | 13 +++- src/modules/muorb/slpi/muorb_main.cpp | 51 +++++++++++++++ .../muorb/slpi/uORBProtobufChannel.cpp | 64 +++++++++++++++++-- .../muorb/slpi/uORBProtobufChannel.hpp | 28 ++++++-- 9 files changed, 202 insertions(+), 15 deletions(-) create mode 100644 src/modules/muorb/slpi/muorb_main.cpp diff --git a/src/modules/muorb/aggregator/mUORBAggregator.cpp b/src/modules/muorb/aggregator/mUORBAggregator.cpp index 5a401bb4e9..19c4948807 100644 --- a/src/modules/muorb/aggregator/mUORBAggregator.cpp +++ b/src/modules/muorb/aggregator/mUORBAggregator.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -76,6 +76,8 @@ int16_t mUORB::Aggregator::SendData() if (sendFunc) { if (aggregationEnabled) { if (bufferWriteIndex) { + // Record the time when we send out any aggregated data + _last_send_time = hrt_absolute_time(); rc = sendFunc(topicName.c_str(), buffer[bufferId], bufferWriteIndex); MoveToNextBuffer(); } diff --git a/src/modules/muorb/aggregator/mUORBAggregator.hpp b/src/modules/muorb/aggregator/mUORBAggregator.hpp index 63bcf5845c..4d40a7de35 100644 --- a/src/modules/muorb/aggregator/mUORBAggregator.hpp +++ b/src/modules/muorb/aggregator/mUORBAggregator.hpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -35,6 +35,7 @@ #include #include +#include #include "uORB/uORBCommunicator.hpp" namespace mUORB @@ -55,6 +56,8 @@ public: int16_t SendData(); + hrt_abstime GetLastSendTime() { return _last_send_time; } + private: static const bool debugFlag; @@ -79,6 +82,8 @@ private: sendFuncPtr sendFunc; + hrt_abstime _last_send_time; + bool isAggregate(const char *name) { return (strcmp(name, topicName.c_str()) == 0); } bool NewRecordOverflows(const char *messageName, int32_t length); diff --git a/src/modules/muorb/apps/muorb_main.cpp b/src/modules/muorb/apps/muorb_main.cpp index 8db18141e0..5957b7c84c 100644 --- a/src/modules/muorb/apps/muorb_main.cpp +++ b/src/modules/muorb/apps/muorb_main.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2022 ModalAI, Inc. All rights reserved. + * Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -45,6 +45,11 @@ static bool enable_debug = false; int muorb_main(int argc, char *argv[]) { + if (uORB::AppsProtobufChannel::isInstance()) { + uORB::AppsProtobufChannel::GetInstance()->PrintStatus(); + return 0; + } + return muorb_init(); } diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp index f5f80fbb35..53df2863a4 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -47,6 +47,11 @@ std::map uORB::AppsProtobufChannel::_SlpiSubscriberCache; pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; bool uORB::AppsProtobufChannel::_Debug = false; +uint32_t uORB::AppsProtobufChannel::_total_bytes_sent = 0; +uint32_t uORB::AppsProtobufChannel::_bytes_sent_since_last_status_check = 0; +uint32_t uORB::AppsProtobufChannel::_total_bytes_received = 0; +uint32_t uORB::AppsProtobufChannel::_bytes_received_since_last_status_check = 0; +hrt_abstime uORB::AppsProtobufChannel::_last_status_check_time = 0; hrt_abstime uORB::AppsProtobufChannel::_last_keepalive = 0; @@ -54,6 +59,8 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic, const uint8_t *data, uint32_t length_in_bytes) { + _total_bytes_received += length_in_bytes; + _bytes_received_since_last_status_check += length_in_bytes; if (_Debug) { PX4_INFO("Got Receive callback for topic %s", topic); } @@ -364,6 +371,9 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len PX4_INFO("Sending data for topic %s", messageName); } + _total_bytes_sent += length; + _bytes_sent_since_last_status_check += length; + pthread_mutex_lock(&_tx_mutex); int16_t rc = fc_sensor_send_data(messageName, data, length); pthread_mutex_unlock(&_tx_mutex); @@ -380,3 +390,21 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len return -1; } + +void uORB::AppsProtobufChannel::PrintStatus() +{ + PX4_INFO("total bytes sent: %u, total bytes received: %u", _total_bytes_sent, _total_bytes_received); + PX4_INFO("sent since last status: %u, received since last status: %u", _bytes_sent_since_last_status_check, + _bytes_received_since_last_status_check); + + hrt_abstime elapsed = hrt_elapsed_time(&_last_status_check_time); + double seconds = (double) elapsed / 1000000.0; + double sent_kbps = ((double) _bytes_sent_since_last_status_check / seconds) / 1000.0; + double rxed_kbps = ((double) _bytes_received_since_last_status_check / seconds) / 1000.0; + + PX4_INFO("Current tx rate: %.2f KBps, rx rate %.2f KBps", sent_kbps, rxed_kbps); + + _bytes_sent_since_last_status_check = 0; + _bytes_received_since_last_status_check = 0; + _last_status_check_time = hrt_absolute_time(); +} diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp index 533395835c..c3732dc966 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -158,6 +158,8 @@ public: */ int16_t shutdown(); + void PrintStatus(); + /** * @brief Interface to test the functions of the protobuf channel. * @@ -181,6 +183,15 @@ private: static pthread_mutex_t _tx_mutex; static pthread_mutex_t _rx_mutex; static bool _Debug; + + /* + * Status + */ + static uint32_t _total_bytes_sent; + static uint32_t _bytes_sent_since_last_status_check; + static uint32_t _total_bytes_received; + static uint32_t _bytes_received_since_last_status_check; + static hrt_abstime _last_status_check_time; static hrt_abstime _last_keepalive; bool _Initialized; diff --git a/src/modules/muorb/slpi/CMakeLists.txt b/src/modules/muorb/slpi/CMakeLists.txt index fdbf9147cf..f820e4ca2d 100644 --- a/src/modules/muorb/slpi/CMakeLists.txt +++ b/src/modules/muorb/slpi/CMakeLists.txt @@ -1,6 +1,6 @@ ############################################################################ # -# Copyright (c) 2022 ModalAI, Inc. All rights reserved. +# Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -31,6 +31,17 @@ # ############################################################################ +px4_add_module( + MODULE modules__muorb__slpi__main + MAIN muorb + COMPILE_FLAGS + -Wno-cast-align # TODO: fix and enable + INCLUDES + ../aggregator + SRCS + muorb_main.cpp +) + px4_add_library(modules__muorb__slpi uORBProtobufChannel.cpp ../test/MUORBTest.cpp diff --git a/src/modules/muorb/slpi/muorb_main.cpp b/src/modules/muorb/slpi/muorb_main.cpp new file mode 100644 index 0000000000..e31c7d7432 --- /dev/null +++ b/src/modules/muorb/slpi/muorb_main.cpp @@ -0,0 +1,51 @@ +/**************************************************************************** + * + * Copyright (c) 2022-2026 ModalAI, Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +#include +#include "uORBProtobufChannel.hpp" + +extern "C" { + __EXPORT int muorb_main(int argc, char *argv[]); +} + +int +muorb_main(int argc, char *argv[]) +{ + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); + + if (channel) { + channel->PrintStatus(); + } + + return 0; +} diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.cpp b/src/modules/muorb/slpi/uORBProtobufChannel.cpp index 6bc0e136a9..891c2a756e 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.cpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -66,6 +66,11 @@ pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; bool uORB::ProtobufChannel::_debug = false; +uint32_t uORB::ProtobufChannel::_total_bytes_sent = 0; +uint32_t uORB::ProtobufChannel::_bytes_sent_since_last_status_check = 0; +uint32_t uORB::ProtobufChannel::_total_bytes_received = 0; +uint32_t uORB::ProtobufChannel::_bytes_received_since_last_status_check = 0; +hrt_abstime uORB::ProtobufChannel::_last_status_check_time = 0; hrt_abstime uORB::ProtobufChannel::_last_keepalive = 0; char uORB::ProtobufChannel::_keepalive_filename[] = "/data/px4/slpi/keepalive_fail"; @@ -94,11 +99,13 @@ static void aggregator_thread_func(void *ptr) uORB::ProtobufChannel *muorb = uORB::ProtobufChannel::GetInstance(); + const uint64_t SEND_TIMEOUT = 3000; // 3 ms + while (true) { // Check for timeout. Send buffer if timeout happened. - muorb->SendAggregateData(); + muorb->SendAggregateData(SEND_TIMEOUT); - qurt_timer_sleep(2000); + qurt_timer_sleep(SEND_TIMEOUT); } qurt_thread_exit(QURT_EOK); @@ -248,6 +255,9 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len rc = _Aggregator.ProcessTransmitTopic(messageName, data, length); } else { + _total_bytes_sent += length; + _bytes_sent_since_last_status_check += length; + // SLPI logs don't go through the aggregator rc = muorb_func_ptrs.topic_data_func_ptr(messageName, data, length); } @@ -270,6 +280,38 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len return -1; } +void uORB::ProtobufChannel::PrintStatus() +{ + PX4_INFO("total bytes sent: %u, total bytes received: %u", _total_bytes_sent, _total_bytes_received); + PX4_INFO("sent since last status: %u, received since last status: %u", _bytes_sent_since_last_status_check, + _bytes_received_since_last_status_check); + + hrt_abstime elapsed = hrt_elapsed_time(&_last_status_check_time); + double seconds = (double) elapsed / 1000000.0; + double sent_kbps = ((double) _bytes_sent_since_last_status_check / seconds) / 1000.0; + double rxed_kbps = ((double) _bytes_received_since_last_status_check / seconds) / 1000.0; + + PX4_INFO("Current tx rate: %.2f KBps, rx rate %.2f KBps", sent_kbps, rxed_kbps); + + _bytes_sent_since_last_status_check = 0; + _bytes_received_since_last_status_check = 0; + _last_status_check_time = hrt_absolute_time(); +} + +void uORB::ProtobufChannel::SendAggregateData(hrt_abstime timeout) +{ + const hrt_abstime last = _Aggregator.GetLastSendTime(); + + // The aggregator buffer will get sent out whenever it fills up. If that + // hasn't happened for awhile then just send what we have now to avoid + // large periods of time with no topic data + if (hrt_elapsed_time(&last) > timeout) { + pthread_mutex_lock(&_tx_mutex); + _Aggregator.SendData(); + pthread_mutex_unlock(&_tx_mutex); + } +} + static void *test_runner(void *) { if (_px4_muorb_debug) { PX4_INFO("test_runner called"); } @@ -310,6 +352,18 @@ __BEGIN_DECLS extern int slpi_main(int argc, char *argv[]); __END_DECLS +static int slpi_send_aggregated_topics(const char *name, const uint8_t *data, int len) +{ + + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); + + if (channel) { channel->RecordAggregateSend(len); } + + muorb_func_ptrs.topic_data_func_ptr(name, data, len); + + return 0; +} + int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us) { hrt_set_absolute_time_offset(clock_offset_us); @@ -350,7 +404,7 @@ int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us) param_init(); - uORB::ProtobufChannel::GetInstance()->RegisterSendHandler(muorb_func_ptrs.topic_data_func_ptr); + uORB::ProtobufChannel::GetInstance()->RegisterSendHandler(slpi_send_aggregated_topics); // Configure the I2C driver function pointers device::I2C::configure_callbacks(muorb_func_ptrs._config_i2c_bus_func_t, muorb_func_ptrs._set_i2c_address_func_t, @@ -550,6 +604,8 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data, return 0; } + channel->UpdateRxStatistics(data_len_in_bytes); + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); if (rxHandler) { diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.hpp b/src/modules/muorb/slpi/uORBProtobufChannel.hpp index a1c3b35462..186c913ea3 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.hpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.hpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2022 ModalAI, Inc. All rights reserved. + * Copyright (C) 2022-2026 ModalAI, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -155,11 +155,20 @@ public: bool DebugEnabled() { return _debug; } - void SendAggregateData() + void SendAggregateData(hrt_abstime timeout); + + void PrintStatus(); + + void UpdateRxStatistics(uint32_t cnt) { - pthread_mutex_lock(&_tx_mutex); - _Aggregator.SendData(); - pthread_mutex_unlock(&_tx_mutex); + _total_bytes_received += cnt; + _bytes_received_since_last_status_check += cnt; + } + + void RecordAggregateSend(int length) + { + _total_bytes_sent += length; + _bytes_sent_since_last_status_check += length; } static void keepalive_thread_func(void *ptr); @@ -177,6 +186,15 @@ private: static pthread_mutex_t _tx_mutex; static pthread_mutex_t _rx_mutex; static bool _debug; + + /* + * Status + */ + static uint32_t _total_bytes_sent; + static uint32_t _bytes_sent_since_last_status_check; + static uint32_t _total_bytes_received; + static uint32_t _bytes_received_since_last_status_check; + static hrt_abstime _last_status_check_time; static hrt_abstime _last_keepalive; static char _keepalive_filename[];