muorb: implemented a basic keepalive mechanism

This commit is contained in:
Eric Katzfey 2026-02-18 08:47:21 -07:00 committed by Eric Katzfey
parent 7332f264f0
commit 368dd362c5
4 changed files with 143 additions and 4 deletions

View File

@ -37,6 +37,7 @@
#include "fc_sensor.h"
bool uORB::AppsProtobufChannel::test_flag = false;
px4_task_t uORB::AppsProtobufChannel::_task_handle = -1;
// Initialize the static members
uORB::AppsProtobufChannel *uORB::AppsProtobufChannel::_InstancePtr = nullptr;
@ -46,6 +47,7 @@ std::map<std::string, int> uORB::AppsProtobufChannel::_SlpiSubscriberCache;
pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
bool uORB::AppsProtobufChannel::_Debug = false;
hrt_abstime uORB::AppsProtobufChannel::_last_keepalive = 0;
void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic,
@ -61,6 +63,9 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic,
} else if (strcmp(topic, "slpi_error") == 0) {
PX4_ERR("%s", (const char *) data);
} else if (strcmp(topic, "keepalive") == 0) {
_last_keepalive = hrt_absolute_time();
} else if (IS_MUORB_TEST(topic)) {
// Validate the test data received
bool test_passed = true;
@ -221,6 +226,28 @@ bool uORB::AppsProtobufChannel::Test()
return true;
}
void uORB::AppsProtobufChannel::keepalive_task()
{
// Messages cannot be sent with no data
uint8_t data[1] {0x5A};
while (true) {
uORB::AppsProtobufChannel::GetInstance()->send_message("keepalive", 1, data);
usleep(100000); // Update every 100ms
if (_last_keepalive) {
hrt_abstime elapsed_time = hrt_elapsed_time(&_last_keepalive);
if (elapsed_time > 1000000) {
PX4_ERR("Keep alive timeout from DSP: %lu ms", elapsed_time);
_last_keepalive = hrt_absolute_time();
}
}
}
}
bool uORB::AppsProtobufChannel::Initialize(bool enable_debug)
{
if (! _Initialized) {
@ -233,6 +260,19 @@ bool uORB::AppsProtobufChannel::Initialize(bool enable_debug)
} else {
PX4_INFO("muorb protobuf initalize method succeeded");
_task_handle = px4_task_spawn_cmd("muorb_keepalive",
SCHED_DEFAULT,
SCHED_PRIORITY_DEFAULT,
1024,
(px4_main_t) &keepalive_task,
nullptr);
if (_task_handle < 0) {
PX4_ERR("task start failed");
return false;
}
_Initialized = true;
}
@ -319,7 +359,7 @@ int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int len
int has_subscribers = _SlpiSubscriberCache[messageName];
pthread_mutex_unlock(&_rx_mutex);
if (has_subscribers) {
if ((has_subscribers) || (strcmp("keepalive", messageName) == 0)) {
if (_Debug && enable_debug) {
PX4_INFO("Sending data for topic %s", messageName);
}

View File

@ -39,6 +39,7 @@
#include <map>
#include <px4_platform_common/log.h>
#include <px4_platform_common/tasks.h>
#include "MUORBTest.hpp"
#include "uORB/uORBCommunicator.hpp"
@ -167,6 +168,9 @@ public:
bool Test();
private:
static void keepalive_task();
/**
* Data Members
*/
@ -177,6 +181,7 @@ private:
static pthread_mutex_t _tx_mutex;
static pthread_mutex_t _rx_mutex;
static bool _Debug;
static hrt_abstime _last_keepalive;
bool _Initialized;
bool _ShutdownRequested{false};
@ -191,6 +196,7 @@ private:
bool Test(MUORBTestType test_type);
static bool test_flag;
static px4_task_t _task_handle;
static void ReceiveCallback(const char *topic,
const uint8_t *data,

View File

@ -35,6 +35,9 @@
#include "uORB/uORBManager.hpp"
#include "MUORBTest.hpp"
#include <string>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <drivers/drv_hrt.h>
#include <drivers/device/spi.h>
@ -63,6 +66,9 @@ pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
bool uORB::ProtobufChannel::_debug = false;
hrt_abstime uORB::ProtobufChannel::_last_keepalive = 0;
char uORB::ProtobufChannel::_keepalive_filename[] = "/data/px4/slpi/keepalive_fail";
bool _px4_muorb_debug = false;
static bool px4muorb_orb_initialized = false;
@ -74,6 +80,14 @@ const uint32_t aggregator_thread_priority = 240;
const uint32_t aggregator_stack_size = 8096;
char aggregator_stack[aggregator_stack_size];
// Thread for keep alives
qurt_thread_t keepalive_tid;
qurt_thread_attr_t keepalive_attr;
// 1 is highest priority, 255 is lowest. Set it very low.
const uint32_t keepalive_thread_priority = 250;
const uint32_t keepalive_stack_size = 4096;
char keepalive_stack[keepalive_stack_size];
static void aggregator_thread_func(void *ptr)
{
PX4_INFO("muorb aggregator thread running");
@ -90,6 +104,58 @@ static void aggregator_thread_func(void *ptr)
qurt_thread_exit(QURT_EOK);
}
void uORB::ProtobufChannel::keepalive_thread_func(void *ptr)
{
PX4_INFO("muorb keepalive thread running");
// Delete any keepalive fail file that may exist from a previous error
struct stat buffer;
if (stat(_keepalive_filename, &buffer) == 0) {
PX4_INFO("Deleting %s", _keepalive_filename);
if (remove(_keepalive_filename)) {
PX4_ERR("Could not delete %s", _keepalive_filename);
}
}
uORB::ProtobufChannel *muorb = uORB::ProtobufChannel::GetInstance();
const uint64_t SEND_TIMEOUT = 100000; // 100 ms
while (true) {
// Check for timeout. Send a keepalive if timeout happened.
muorb->send_message("keepalive", 0, nullptr);
qurt_timer_sleep(SEND_TIMEOUT);
if (_last_keepalive) {
hrt_abstime elapsed_time = hrt_elapsed_time(&_last_keepalive);
if (elapsed_time > 1000000) {
PX4_ERR("Keep alive timeout from Apps: %lu ms", elapsed_time);
// Create a file in the file system to indicate a keepalive failure happened
if (stat(_keepalive_filename, &buffer)) {
FILE *fptr = fopen(_keepalive_filename, "w");
if (fptr == NULL) {
PX4_ERR("Error creating file %s", _keepalive_filename);
} else {
PX4_INFO("Created file %s", _keepalive_filename);
fclose(fptr);
}
}
_last_keepalive = hrt_absolute_time();
}
}
}
qurt_thread_exit(QURT_EOK);
}
int16_t uORB::ProtobufChannel::topic_advertised(const char *messageName)
{
if (_debug) { PX4_INFO("Advertising %s on remote side", messageName); }
@ -154,6 +220,11 @@ int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t len
is_not_slpi_log = false;
}
// keep alives also get special treatment
if (strcmp(messageName, "keepalive") == 0) {
is_not_slpi_log = false;
}
if (muorb_func_ptrs.topic_data_func_ptr) {
if ((_debug) && (is_not_slpi_log)) {
PX4_INFO("Got message for topic %s", messageName);
@ -312,12 +383,22 @@ int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us)
qurt_thread_attr_init(&aggregator_attr);
qurt_thread_attr_set_stack_addr(&aggregator_attr, aggregator_stack);
qurt_thread_attr_set_stack_size(&aggregator_attr, aggregator_stack_size);
char thread_name[QURT_THREAD_ATTR_NAME_MAXLEN];
strncpy(thread_name, "PX4_muorb_agg", QURT_THREAD_ATTR_NAME_MAXLEN);
qurt_thread_attr_set_name(&aggregator_attr, thread_name);
char agg_thread_name[QURT_THREAD_ATTR_NAME_MAXLEN];
strncpy(agg_thread_name, "PX4_muorb_agg", QURT_THREAD_ATTR_NAME_MAXLEN);
qurt_thread_attr_set_name(&aggregator_attr, agg_thread_name);
qurt_thread_attr_set_priority(&aggregator_attr, aggregator_thread_priority);
(void) qurt_thread_create(&aggregator_tid, &aggregator_attr, aggregator_thread_func, NULL);
// Setup the thread to send keep alives to the apps proc
qurt_thread_attr_init(&keepalive_attr);
qurt_thread_attr_set_stack_addr(&keepalive_attr, keepalive_stack);
qurt_thread_attr_set_stack_size(&keepalive_attr, keepalive_stack_size);
char ka_thread_name[QURT_THREAD_ATTR_NAME_MAXLEN];
strncpy(ka_thread_name, "PX4_muorb_keepalive", QURT_THREAD_ATTR_NAME_MAXLEN);
qurt_thread_attr_set_name(&keepalive_attr, ka_thread_name);
qurt_thread_attr_set_priority(&keepalive_attr, keepalive_thread_priority);
(void) qurt_thread_create(&keepalive_tid, &keepalive_attr, uORB::ProtobufChannel::keepalive_thread_func, NULL);
px4muorb_orb_initialized = true;
if (_px4_muorb_debug) { PX4_INFO("px4muorb_orb_initialize called"); }
@ -464,6 +545,11 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data,
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
if (channel) {
if (strcmp("keepalive", topic_name) == 0) {
uORB::ProtobufChannel::keepalive();
return 0;
}
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
if (rxHandler) {

View File

@ -40,6 +40,7 @@
#include <pthread.h>
#include <termios.h>
#include <drivers/drv_hrt.h>
#include "uORB/uORBCommunicator.hpp"
#include "mUORBAggregator.hpp"
@ -161,6 +162,10 @@ public:
pthread_mutex_unlock(&_tx_mutex);
}
static void keepalive_thread_func(void *ptr);
static void keepalive() { _last_keepalive = hrt_absolute_time(); }
private:
/**
* Data Members
@ -172,6 +177,8 @@ private:
static pthread_mutex_t _tx_mutex;
static pthread_mutex_t _rx_mutex;
static bool _debug;
static hrt_abstime _last_keepalive;
static char _keepalive_filename[];
/**
* Class Members