From 368dd362c57a4603ffc78f7f128cf5e1300633b9 Mon Sep 17 00:00:00 2001 From: Eric Katzfey Date: Wed, 18 Feb 2026 08:47:21 -0700 Subject: [PATCH] muorb: implemented a basic keepalive mechanism --- .../muorb/apps/uORBAppsProtobufChannel.cpp | 42 ++++++++- .../muorb/apps/uORBAppsProtobufChannel.hpp | 6 ++ .../muorb/slpi/uORBProtobufChannel.cpp | 92 ++++++++++++++++++- .../muorb/slpi/uORBProtobufChannel.hpp | 7 ++ 4 files changed, 143 insertions(+), 4 deletions(-) diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp index df2376da27..f5f80fbb35 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp @@ -37,6 +37,7 @@ #include "fc_sensor.h" bool uORB::AppsProtobufChannel::test_flag = false; +px4_task_t uORB::AppsProtobufChannel::_task_handle = -1; // Initialize the static members uORB::AppsProtobufChannel *uORB::AppsProtobufChannel::_InstancePtr = nullptr; @@ -46,6 +47,7 @@ std::map uORB::AppsProtobufChannel::_SlpiSubscriberCache; pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; bool uORB::AppsProtobufChannel::_Debug = false; +hrt_abstime uORB::AppsProtobufChannel::_last_keepalive = 0; void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic, @@ -61,6 +63,9 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic, } else if (strcmp(topic, "slpi_error") == 0) { PX4_ERR("%s", (const char *) data); + } else if (strcmp(topic, "keepalive") == 0) { + _last_keepalive = hrt_absolute_time(); + } else if (IS_MUORB_TEST(topic)) { // Validate the test data received bool test_passed = true; @@ -221,6 +226,28 @@ bool uORB::AppsProtobufChannel::Test() return true; } +void uORB::AppsProtobufChannel::keepalive_task() +{ + + // Messages cannot be sent with no data + uint8_t data[1] {0x5A}; + + while (true) { + uORB::AppsProtobufChannel::GetInstance()->send_message("keepalive", 1, data); + + usleep(100000); // Update every 100ms + + if (_last_keepalive) { + hrt_abstime elapsed_time = hrt_elapsed_time(&_last_keepalive); + + if (elapsed_time > 1000000) { + PX4_ERR("Keep alive timeout from DSP: %lu ms", elapsed_time); + _last_keepalive = hrt_absolute_time(); + } + } + } +} + bool uORB::AppsProtobufChannel::Initialize(bool enable_debug) { if (! _Initialized) { @@ -233,6 +260,19 @@ bool uORB::AppsProtobufChannel::Initialize(bool enable_debug) } else { PX4_INFO("muorb protobuf initalize method succeeded"); + + _task_handle = px4_task_spawn_cmd("muorb_keepalive", + SCHED_DEFAULT, + SCHED_PRIORITY_DEFAULT, + 1024, + (px4_main_t) &keepalive_task, + nullptr); + + if (_task_handle < 0) { + PX4_ERR("task start failed"); + return false; + } + _Initialized = true; } @@ -319,7 +359,7 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len int has_subscribers = _SlpiSubscriberCache[messageName]; pthread_mutex_unlock(&_rx_mutex); - if (has_subscribers) { + if ((has_subscribers) || (strcmp("keepalive", messageName) == 0)) { if (_Debug && enable_debug) { PX4_INFO("Sending data for topic %s", messageName); } diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp index 006a8714ef..533395835c 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp @@ -39,6 +39,7 @@ #include #include +#include #include "MUORBTest.hpp" #include "uORB/uORBCommunicator.hpp" @@ -167,6 +168,9 @@ public: bool Test(); private: + + static void keepalive_task(); + /** * Data Members */ @@ -177,6 +181,7 @@ private: static pthread_mutex_t _tx_mutex; static pthread_mutex_t _rx_mutex; static bool _Debug; + static hrt_abstime _last_keepalive; bool _Initialized; bool _ShutdownRequested{false}; @@ -191,6 +196,7 @@ private: bool Test(MUORBTestType test_type); static bool test_flag; + static px4_task_t _task_handle; static void ReceiveCallback(const char *topic, const uint8_t *data, diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.cpp b/src/modules/muorb/slpi/uORBProtobufChannel.cpp index d85fb65e48..6bc0e136a9 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.cpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.cpp @@ -35,6 +35,9 @@ #include "uORB/uORBManager.hpp" #include "MUORBTest.hpp" #include +#include +#include +#include #include #include @@ -63,6 +66,9 @@ pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; bool uORB::ProtobufChannel::_debug = false; +hrt_abstime uORB::ProtobufChannel::_last_keepalive = 0; +char uORB::ProtobufChannel::_keepalive_filename[] = "/data/px4/slpi/keepalive_fail"; + bool _px4_muorb_debug = false; static bool px4muorb_orb_initialized = false; @@ -74,6 +80,14 @@ const uint32_t aggregator_thread_priority = 240; const uint32_t aggregator_stack_size = 8096; char aggregator_stack[aggregator_stack_size]; +// Thread for keep alives +qurt_thread_t keepalive_tid; +qurt_thread_attr_t keepalive_attr; +// 1 is highest priority, 255 is lowest. Set it very low. +const uint32_t keepalive_thread_priority = 250; +const uint32_t keepalive_stack_size = 4096; +char keepalive_stack[keepalive_stack_size]; + static void aggregator_thread_func(void *ptr) { PX4_INFO("muorb aggregator thread running"); @@ -90,6 +104,58 @@ static void aggregator_thread_func(void *ptr) qurt_thread_exit(QURT_EOK); } +void uORB::ProtobufChannel::keepalive_thread_func(void *ptr) +{ + PX4_INFO("muorb keepalive thread running"); + + // Delete any keepalive fail file that may exist from a previous error + struct stat buffer; + + if (stat(_keepalive_filename, &buffer) == 0) { + PX4_INFO("Deleting %s", _keepalive_filename); + + if (remove(_keepalive_filename)) { + PX4_ERR("Could not delete %s", _keepalive_filename); + } + } + + uORB::ProtobufChannel *muorb = uORB::ProtobufChannel::GetInstance(); + + const uint64_t SEND_TIMEOUT = 100000; // 100 ms + + while (true) { + // Check for timeout. Send a keepalive if timeout happened. + muorb->send_message("keepalive", 0, nullptr); + + qurt_timer_sleep(SEND_TIMEOUT); + + if (_last_keepalive) { + hrt_abstime elapsed_time = hrt_elapsed_time(&_last_keepalive); + + if (elapsed_time > 1000000) { + PX4_ERR("Keep alive timeout from Apps: %lu ms", elapsed_time); + + // Create a file in the file system to indicate a keepalive failure happened + if (stat(_keepalive_filename, &buffer)) { + FILE *fptr = fopen(_keepalive_filename, "w"); + + if (fptr == NULL) { + PX4_ERR("Error creating file %s", _keepalive_filename); + + } else { + PX4_INFO("Created file %s", _keepalive_filename); + fclose(fptr); + } + } + + _last_keepalive = hrt_absolute_time(); + } + } + } + + qurt_thread_exit(QURT_EOK); +} + int16_t uORB::ProtobufChannel::topic_advertised(const char *messageName) { if (_debug) { PX4_INFO("Advertising %s on remote side", messageName); } @@ -154,6 +220,11 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len is_not_slpi_log = false; } + // keep alives also get special treatment + if (strcmp(messageName, "keepalive") == 0) { + is_not_slpi_log = false; + } + if (muorb_func_ptrs.topic_data_func_ptr) { if ((_debug) && (is_not_slpi_log)) { PX4_INFO("Got message for topic %s", messageName); @@ -312,12 +383,22 @@ int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us) qurt_thread_attr_init(&aggregator_attr); qurt_thread_attr_set_stack_addr(&aggregator_attr, aggregator_stack); qurt_thread_attr_set_stack_size(&aggregator_attr, aggregator_stack_size); - char thread_name[QURT_THREAD_ATTR_NAME_MAXLEN]; - strncpy(thread_name, "PX4_muorb_agg", QURT_THREAD_ATTR_NAME_MAXLEN); - qurt_thread_attr_set_name(&aggregator_attr, thread_name); + char agg_thread_name[QURT_THREAD_ATTR_NAME_MAXLEN]; + strncpy(agg_thread_name, "PX4_muorb_agg", QURT_THREAD_ATTR_NAME_MAXLEN); + qurt_thread_attr_set_name(&aggregator_attr, agg_thread_name); qurt_thread_attr_set_priority(&aggregator_attr, aggregator_thread_priority); (void) qurt_thread_create(&aggregator_tid, &aggregator_attr, aggregator_thread_func, NULL); + // Setup the thread to send keep alives to the apps proc + qurt_thread_attr_init(&keepalive_attr); + qurt_thread_attr_set_stack_addr(&keepalive_attr, keepalive_stack); + qurt_thread_attr_set_stack_size(&keepalive_attr, keepalive_stack_size); + char ka_thread_name[QURT_THREAD_ATTR_NAME_MAXLEN]; + strncpy(ka_thread_name, "PX4_muorb_keepalive", QURT_THREAD_ATTR_NAME_MAXLEN); + qurt_thread_attr_set_name(&keepalive_attr, ka_thread_name); + qurt_thread_attr_set_priority(&keepalive_attr, keepalive_thread_priority); + (void) qurt_thread_create(&keepalive_tid, &keepalive_attr, uORB::ProtobufChannel::keepalive_thread_func, NULL); + px4muorb_orb_initialized = true; if (_px4_muorb_debug) { PX4_INFO("px4muorb_orb_initialize called"); } @@ -464,6 +545,11 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data, uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); if (channel) { + if (strcmp("keepalive", topic_name) == 0) { + uORB::ProtobufChannel::keepalive(); + return 0; + } + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); if (rxHandler) { diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.hpp b/src/modules/muorb/slpi/uORBProtobufChannel.hpp index 54ee99ed56..a1c3b35462 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.hpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.hpp @@ -40,6 +40,7 @@ #include #include +#include #include "uORB/uORBCommunicator.hpp" #include "mUORBAggregator.hpp" @@ -161,6 +162,10 @@ public: pthread_mutex_unlock(&_tx_mutex); } + static void keepalive_thread_func(void *ptr); + + static void keepalive() { _last_keepalive = hrt_absolute_time(); } + private: /** * Data Members @@ -172,6 +177,8 @@ private: static pthread_mutex_t _tx_mutex; static pthread_mutex_t _rx_mutex; static bool _debug; + static hrt_abstime _last_keepalive; + static char _keepalive_filename[]; /** * Class Members