From b6ab7f159fbada823677d7be0d219a7fa648fe8f Mon Sep 17 00:00:00 2001 From: Zachary Lowell Date: Thu, 10 Nov 2022 13:10:18 -0600 Subject: [PATCH] Qurt MUORB Communication (#20584) --- src/modules/muorb/apps/muorb_main.cpp | 6 +- .../muorb/apps/uORBAppsProtobufChannel.cpp | 162 +++++++++- .../muorb/apps/uORBAppsProtobufChannel.hpp | 109 ++++++- src/modules/muorb/slpi/CMakeLists.txt | 1 + .../muorb/slpi/uORBProtobufChannel.cpp | 282 ++++++++++++++++-- .../muorb/slpi/uORBProtobufChannel.hpp | 128 +++++++- 6 files changed, 644 insertions(+), 44 deletions(-) diff --git a/src/modules/muorb/apps/muorb_main.cpp b/src/modules/muorb/apps/muorb_main.cpp index 0085bbd3ac..256a46bb55 100644 --- a/src/modules/muorb/apps/muorb_main.cpp +++ b/src/modules/muorb/apps/muorb_main.cpp @@ -33,6 +33,7 @@ #include #include "uORBAppsProtobufChannel.hpp" +#include "uORB/uORBManager.hpp" extern "C" { __EXPORT int muorb_main(int argc, char *argv[]); } @@ -56,7 +57,10 @@ muorb_main(int argc, char *argv[]) // Register the protobuf channel with UORB. uORB::AppsProtobufChannel *channel = uORB::AppsProtobufChannel::GetInstance(); - if (channel && channel->Initialize(enable_debug)) { return OK; } + if (channel && channel->Initialize(enable_debug)) { + uORB::Manager::get_instance()->set_uorb_communicator(channel); + return OK; + } } else if (!strcmp(argv[1], "test")) { uORB::AppsProtobufChannel *channel = uORB::AppsProtobufChannel::GetInstance(); diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp index 110d3832d6..3c64ce0618 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp @@ -40,11 +40,20 @@ bool uORB::AppsProtobufChannel::test_flag = false; // Initialize the static members uORB::AppsProtobufChannel *uORB::AppsProtobufChannel::_InstancePtr = nullptr; +uORBCommunicator::IChannelRxHandler *uORB::AppsProtobufChannel::_RxHandler = nullptr; +std::map uORB::AppsProtobufChannel::_SlpiSubscriberCache; +pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; +bool uORB::AppsProtobufChannel::_Debug = false; + void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic, const uint8_t *data, uint32_t length_in_bytes) { + + if (_Debug) { PX4_INFO("Got Receive callback for topic %s", topic); } + if (strcmp(topic, "slpi_debug") == 0) { PX4_INFO("SLPI: %s", (const char *) data); @@ -69,30 +78,74 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic, if (test_passed) { test_flag = true; } + return; + + } else if (_RxHandler) { + _RxHandler->process_received_message(topic, + length_in_bytes, + const_cast(data)); + } else { - PX4_INFO("Got received data callback for topic %s", topic); + PX4_ERR("Couldn't handle topic %s in receive callback", topic); } } void uORB::AppsProtobufChannel::AdvertiseCallback(const char *topic) { - PX4_INFO("Got advertisement callback for topic %s", topic); + if (_Debug) { PX4_INFO("Got advertisement callback for topic %s", topic); } - if (IS_MUORB_TEST(topic)) { test_flag = true; } + if (IS_MUORB_TEST(topic)) { + test_flag = true; + return; + + } else if (_RxHandler) { + _RxHandler->process_remote_topic(topic, true); + + } else { + PX4_ERR("Couldn't handle topic %s in advertise callback", topic); + } } void uORB::AppsProtobufChannel::SubscribeCallback(const char *topic) { - PX4_INFO("Got subscription callback for topic %s", topic); + if (_Debug) { PX4_INFO("Got subscription callback for topic %s", topic); } - if (IS_MUORB_TEST(topic)) { test_flag = true; } + if (IS_MUORB_TEST(topic)) { + test_flag = true; + return; + + } else if (_RxHandler) { + pthread_mutex_lock(&_rx_mutex); + _SlpiSubscriberCache[topic]++; + pthread_mutex_unlock(&_rx_mutex); + + _RxHandler->process_add_subscription(topic, 1000); + + } else { + PX4_ERR("Couldn't handle topic %s in subscribe callback", topic); + } } void uORB::AppsProtobufChannel::UnsubscribeCallback(const char *topic) { - PX4_INFO("Got remove subscription callback for topic %s", topic); + if (_Debug) { PX4_INFO("Got remove subscription callback for topic %s", topic); } - if (IS_MUORB_TEST(topic)) { test_flag = true; } + if (IS_MUORB_TEST(topic)) { + test_flag = true; + return; + + } else if (_RxHandler) { + pthread_mutex_lock(&_rx_mutex); + + if (_SlpiSubscriberCache[topic]) { _SlpiSubscriberCache[topic]--; } + + pthread_mutex_unlock(&_rx_mutex); + + _RxHandler->process_remove_subscription(topic); + + } else { + PX4_ERR("Couldn't handle topic %s in unsubscribe callback", topic); + } } bool uORB::AppsProtobufChannel::Test(MUORBTestType test_type) @@ -110,22 +163,27 @@ bool uORB::AppsProtobufChannel::Test(MUORBTestType test_type) switch (test_type) { case ADVERTISE_TEST_TYPE: - rc = fc_sensor_advertise(muorb_test_topic_name); + rc = topic_advertised(muorb_test_topic_name); + PX4_INFO("succesfully did ADVERTISE_TEST_TYPE"); break; case SUBSCRIBE_TEST_TYPE: - rc = fc_sensor_subscribe(muorb_test_topic_name); + rc = add_subscription(muorb_test_topic_name, 1000); + PX4_INFO("succesfully did SUBSCRIBE_TEST_TYPE"); break; case TOPIC_TEST_TYPE: rc = fc_sensor_send_data(muorb_test_topic_name, test_data, MUORB_TEST_DATA_LEN); + PX4_INFO("succesfully did TOPIC_TEST_TYPE"); break; case UNSUBSCRIBE_TEST_TYPE: - rc = fc_sensor_unsubscribe(muorb_test_topic_name); + rc = remove_subscription(muorb_test_topic_name); + PX4_INFO("succesfully did UNSUBSCRIBE_TEST_TYPE"); break; default: + PX4_ERR("Unknown test type %d", test_type); break; } @@ -170,7 +228,91 @@ bool uORB::AppsProtobufChannel::Initialize(bool enable_debug) } else { PX4_INFO("muorb protobuf initalize method succeeded"); + _Initialized = true; } return true; } + +int16_t uORB::AppsProtobufChannel::topic_advertised(const char *messageName) +{ + if (_Initialized) { + if (_Debug) { PX4_INFO("Advertising topic %s to remote side", messageName); } + + pthread_mutex_lock(&_tx_mutex); + int16_t rc = fc_sensor_advertise(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + return -1; +} + +int16_t uORB::AppsProtobufChannel::add_subscription(const char *messageName, int msgRateInHz) +{ + // This parameter is unused. + (void)(msgRateInHz); + + if (_Initialized) { + pthread_mutex_lock(&_tx_mutex); + int16_t rc = fc_sensor_subscribe(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + return -1; +} + +int16_t uORB::AppsProtobufChannel::remove_subscription(const char *messageName) +{ + if (_Initialized) { + pthread_mutex_lock(&_tx_mutex); + int16_t rc = fc_sensor_unsubscribe(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + return -1; +} + +int16_t uORB::AppsProtobufChannel::register_handler(uORBCommunicator::IChannelRxHandler *handler) +{ + _RxHandler = handler; + return 0; +} + +int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int length, uint8_t *data) +{ + // This is done to slow down high rate debug messages. + bool enable_debug = false; + + if ((_MessageCounter++ % 100) == 0) { + enable_debug = true; + } + + if (_Initialized) { + pthread_mutex_lock(&_rx_mutex); + int has_subscribers = _SlpiSubscriberCache[messageName]; + pthread_mutex_unlock(&_rx_mutex); + + if (has_subscribers) { + if (_Debug && enable_debug) { + PX4_INFO("Sending data for topic %s", messageName); + } + + pthread_mutex_lock(&_tx_mutex); + int16_t rc = fc_sensor_send_data(messageName, data, length); + pthread_mutex_unlock(&_tx_mutex); + return rc; + + } else { + if (_Debug && enable_debug) { + PX4_INFO("No subscribers (yet) in %s for topic %s", __FUNCTION__, messageName); + } + + return 0; + } + } + + return -1; +} diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp index f320b074e5..5ab26891e1 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.hpp @@ -36,15 +36,19 @@ #include #include -#include "MUORBTest.hpp" +#include + #include +#include "MUORBTest.hpp" +#include "uORB/uORBCommunicator.hpp" + namespace uORB { class AppsProtobufChannel; } -class uORB::AppsProtobufChannel +class uORB::AppsProtobufChannel : public uORBCommunicator::IChannel { public: /** @@ -67,15 +71,108 @@ public: return (_InstancePtr != nullptr); } + /** + * @brief Method used to initialize the fc sensor callback on the apps side. + * + * @param enable_debug + * This represents the debug boolean. If this is set to true, then specific + * log information will be printed out. + * @return + * 1 = success; This means the initialization of the fc_sensor data has happened. + * otherwise = failure. + */ bool Initialize(bool enable_debug); + /** + * @brief Interface to notify the remote entity of a topic being advertised. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t topic_advertised(const char *messageName); + + /** + * @brief Interface to notify the remote entity of interest of a + * subscription for a message. + * + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @param msgRate + * The max rate at which the subscriber can accept the messages. This value is unused. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t add_subscription(const char *messageName, int msgRateInHz); + + /** + * @brief Interface to notify the remote entity of removal of a subscription + * + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not necessarily mean that the receiver as received it. + * otherwise = failure. + */ + int16_t remove_subscription(const char *messageName); + + /** + * Register Message Handler. This is internal for the IChannel implementer* + */ + int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler); + + /** + * @brief Sends the data message over the communication link. + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @param length + * The length of the data buffer to be sent. + * @param data + * The actual data to be sent. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t send_message(const char *messageName, int length, uint8_t *data); + + /** + * @brief Interface to test the functions of the protobuf channel. + * + * @return + * 1 = success; This means all callbacks have been tested and return the correct value. + * otherwise = failure. + */ bool Test(); -private: // data members +private: + /** + * Data Members + */ static uORB::AppsProtobufChannel *_InstancePtr; + static uORBCommunicator::IChannelRxHandler *_RxHandler; + static std::map _SlpiSubscriberCache; + static pthread_mutex_t _tx_mutex; + static pthread_mutex_t _rx_mutex; + static bool _Debug; -private://class members. - /// constructor. + bool _Initialized; + uint32_t _MessageCounter; + +private: + /** + * Class Members + */ AppsProtobufChannel() {}; bool Test(MUORBTestType test_type); @@ -91,4 +188,4 @@ private://class members. }; -#endif /* _uORBAppsProtobufChannel_hpp_ */ +#endif diff --git a/src/modules/muorb/slpi/CMakeLists.txt b/src/modules/muorb/slpi/CMakeLists.txt index 41d9746ab4..ab76a64e8c 100644 --- a/src/modules/muorb/slpi/CMakeLists.txt +++ b/src/modules/muorb/slpi/CMakeLists.txt @@ -36,3 +36,4 @@ px4_add_library(modules__muorb__slpi ../test/MUORBTest.cpp ) target_include_directories(modules__muorb__slpi PRIVATE ../test) +target_include_directories(modules__muorb__slpi PRIVATE ${PX4_BINARY_DIR}/platforms/common/uORB) \ No newline at end of file diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.cpp b/src/modules/muorb/slpi/uORBProtobufChannel.cpp index 3916683d58..c0eed28b52 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.cpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.cpp @@ -32,9 +32,11 @@ ****************************************************************************/ #include "uORBProtobufChannel.hpp" +#include "uORB/uORBManager.hpp" #include "MUORBTest.hpp" #include +#include #include #include #include @@ -44,27 +46,141 @@ static MUORBTestType test_to_run; fc_func_ptrs muorb_func_ptrs; +// static initialization. +uORB::ProtobufChannel uORB::ProtobufChannel::_Instance; +uORBCommunicator::IChannelRxHandler *uORB::ProtobufChannel::_RxHandler; +std::map uORB::ProtobufChannel::_AppsSubscriberCache; +pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER; + +bool uORB::ProtobufChannel::_debug = false; +bool _px4_muorb_debug = false; +static bool px4muorb_orb_initialized = false; + +int16_t uORB::ProtobufChannel::topic_advertised(const char *messageName) +{ + if (_debug) { PX4_INFO("Advertising %s on remote side", messageName); } + + if (muorb_func_ptrs.advertise_func_ptr) { + pthread_mutex_lock(&_tx_mutex); + int16_t rc = muorb_func_ptrs.advertise_func_ptr(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + PX4_ERR("advertise_func_ptr is null in %s", __FUNCTION__); + return -1; +} + +int16_t uORB::ProtobufChannel::add_subscription(const char *messageName, int32_t msgRateInHz) +{ + // MsgRateInHz is unused in this function. + if (_debug) { PX4_INFO("Subscribing to %s on remote side", messageName); } + + if (muorb_func_ptrs.subscribe_func_ptr) { + pthread_mutex_lock(&_tx_mutex); + int16_t rc = muorb_func_ptrs.subscribe_func_ptr(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + PX4_ERR("subscribe_func_ptr is null in %s", __FUNCTION__); + return -1; +} + +int16_t uORB::ProtobufChannel::remove_subscription(const char *messageName) +{ + if (_debug) { PX4_INFO("Unsubscribing from %s on remote side", messageName); } + + if (muorb_func_ptrs.unsubscribe_func_ptr) { + pthread_mutex_lock(&_tx_mutex); + int16_t rc = muorb_func_ptrs.unsubscribe_func_ptr(messageName); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + PX4_ERR("unsubscribe_func_ptr is null in %s", __FUNCTION__); + return -1; +} + +int16_t uORB::ProtobufChannel::register_handler(uORBCommunicator::IChannelRxHandler *handler) +{ + _RxHandler = handler; + return 0; +} + +int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t length, uint8_t *data) +{ + // This function can be called from the PX4 log function so we have to make + // sure that we do not call PX4_INFO, PX4_ERR, etc. That would cause an + // infinite loop! + bool is_not_slpi_log = true; + + if ((strcmp(messageName, "slpi_debug") == 0) || (strcmp(messageName, "slpi_error") == 0)) { + is_not_slpi_log = false; + } + + if (muorb_func_ptrs.topic_data_func_ptr) { + if ((_debug) && (is_not_slpi_log)) { + PX4_INFO("Got message for topic %s", messageName); + } + + std::string temp(messageName); + int has_subscribers = 0; + pthread_mutex_lock(&_rx_mutex); + has_subscribers = _AppsSubscriberCache[temp]; + pthread_mutex_unlock(&_rx_mutex); + + if ((has_subscribers) || (is_not_slpi_log == false)) { + if ((_debug) && (is_not_slpi_log)) { + PX4_INFO("Sending message for topic %s", messageName); + } + + pthread_mutex_lock(&_tx_mutex); + int16_t rc = muorb_func_ptrs.topic_data_func_ptr(messageName, data, length); + pthread_mutex_unlock(&_tx_mutex); + return rc; + } + + if ((_debug) && (is_not_slpi_log)) { + PX4_INFO("Skipping message for topic %s", messageName); + } + + return 0; + } + + if (is_not_slpi_log) { + PX4_ERR("topic_data_func_ptr is null in %s", __FUNCTION__); + } + + return -1; +} + static void *test_runner(void *) { - PX4_INFO("test_runner called"); + if (_px4_muorb_debug) { PX4_INFO("test_runner called"); } + + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); switch (test_to_run) { case ADVERTISE_TEST_TYPE: - (void) muorb_func_ptrs.advertise_func_ptr(muorb_test_topic_name); + (void) channel->topic_advertised(muorb_test_topic_name); break; case SUBSCRIBE_TEST_TYPE: - (void) muorb_func_ptrs.subscribe_func_ptr(muorb_test_topic_name); + (void) channel->add_subscription(muorb_test_topic_name, 1); break; case UNSUBSCRIBE_TEST_TYPE: - (void) muorb_func_ptrs.unsubscribe_func_ptr(muorb_test_topic_name); + (void) channel->remove_subscription(muorb_test_topic_name); break; case TOPIC_TEST_TYPE: { uint8_t data[MUORB_TEST_DATA_LEN]; - for (uint8_t i = 0; i < MUORB_TEST_DATA_LEN; i++) { data[i] = i; } + for (uint8_t i = 0; i < MUORB_TEST_DATA_LEN; i++) { + data[i] = i; + } (void) muorb_func_ptrs.topic_data_func_ptr(muorb_test_topic_name, data, MUORB_TEST_DATA_LEN); } @@ -78,11 +194,42 @@ static void *test_runner(void *) int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us) { - // These function pointers will only be non-null on the first call - // so they must be saved off here - if (func_ptrs != nullptr) { muorb_func_ptrs = *func_ptrs; } + hrt_set_absolute_time_offset(clock_offset_us); - PX4_INFO("px4muorb_orb_initialize called"); + if (! px4muorb_orb_initialized) { + if (func_ptrs != nullptr) { + muorb_func_ptrs = *func_ptrs; + + } else { + PX4_ERR("NULL top level function pointer in %s", __FUNCTION__); + return -1; + } + + if ((muorb_func_ptrs.advertise_func_ptr == NULL) || + (muorb_func_ptrs.subscribe_func_ptr == NULL) || + (muorb_func_ptrs.unsubscribe_func_ptr == NULL) || + (muorb_func_ptrs.topic_data_func_ptr == NULL) || + (muorb_func_ptrs._config_spi_bus_func_t == NULL) || + (muorb_func_ptrs._spi_transfer_func_t == NULL) || + (muorb_func_ptrs._config_i2c_bus_func_t == NULL) || + (muorb_func_ptrs._set_i2c_address_func_t == NULL) || + (muorb_func_ptrs._i2c_transfer_func_t == NULL) || + (muorb_func_ptrs.open_uart_func_t == NULL) || + (muorb_func_ptrs.write_uart_func_t == NULL) || + (muorb_func_ptrs.read_uart_func_t == NULL) || + (muorb_func_ptrs.register_interrupt_callback == NULL)) { + PX4_ERR("NULL function pointers in %s", __FUNCTION__); + return -1; + } + + uORB::Manager::initialize(); + uORB::Manager::get_instance()->set_uorb_communicator( + uORB::ProtobufChannel::GetInstance()); + + px4muorb_orb_initialized = true; + + if (_px4_muorb_debug) { PX4_INFO("px4muorb_orb_initialize called"); } + } return 0; } @@ -93,39 +240,102 @@ char stack[TEST_STACK_SIZE]; void run_test(MUORBTestType test) { test_to_run = test; - (void)px4_task_spawn_cmd("test_MUORB", - SCHED_DEFAULT, - SCHED_PRIORITY_MAX - 2, - 2000, - (px4_main_t)&test_runner, - nullptr); + (void) px4_task_spawn_cmd("test_MUORB", + SCHED_DEFAULT, + SCHED_PRIORITY_MAX - 2, + 2000, + (px4_main_t)&test_runner, + nullptr); } int px4muorb_topic_advertised(const char *topic_name) { - if (IS_MUORB_TEST(topic_name)) { run_test(ADVERTISE_TEST_TYPE); } + if (IS_MUORB_TEST(topic_name)) { + run_test(ADVERTISE_TEST_TYPE); - PX4_INFO("px4muorb_topic_advertised called"); + if (_px4_muorb_debug) { PX4_INFO("px4muorb_topic_advertised for muorb test called"); } - return 0; + return 0; + } + + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); + + if (channel) { + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler) { + return rxHandler->process_remote_topic(topic_name, true); + + } else { + PX4_ERR("Null rx handler in %s", __FUNCTION__); + } + + } else { + PX4_ERR("Null channel pointer in %s", __FUNCTION__); + } + + return -1; } int px4muorb_add_subscriber(const char *topic_name) { - if (IS_MUORB_TEST(topic_name)) { run_test(SUBSCRIBE_TEST_TYPE); } + if (IS_MUORB_TEST(topic_name)) { + run_test(SUBSCRIBE_TEST_TYPE); - PX4_INFO("px4muorb_add_subscriber called"); + if (_px4_muorb_debug) { PX4_INFO("px4muorb_add_subscriber for muorb test called"); } - return 0; + return 0; + } + + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); + + if (channel) { + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler) { + channel->AddRemoteSubscriber(topic_name); + // Pick a high message rate of 1000 Hz + return rxHandler->process_add_subscription(topic_name, 1000); + + } else { + PX4_ERR("Null rx handler in %s", __FUNCTION__); + } + + } else { + PX4_ERR("Null channel pointer in %s", __FUNCTION__); + } + + return -1; } int px4muorb_remove_subscriber(const char *topic_name) { - if (IS_MUORB_TEST(topic_name)) { run_test(UNSUBSCRIBE_TEST_TYPE); } + if (IS_MUORB_TEST(topic_name)) { + run_test(UNSUBSCRIBE_TEST_TYPE); - PX4_INFO("px4muorb_remove_subscriber called"); + if (_px4_muorb_debug) { PX4_INFO("px4muorb_remove_subscriber for muorb test called"); } - return 0; + return 0; + } + + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); + + if (channel) { + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler) { + channel->RemoveRemoteSubscriber(topic_name); + return rxHandler->process_remove_subscription(topic_name); + + } else { + PX4_ERR("Null rx handler in %s", __FUNCTION__); + } + + } else { + PX4_ERR("Null channel pointer in %s", __FUNCTION__); + } + + return -1; } int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data, @@ -148,9 +358,29 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data, } if (test_passed) { run_test(TOPIC_TEST_TYPE); } + + if (_px4_muorb_debug) { PX4_INFO("px4muorb_send_topic_data called"); } + + return 0; } - PX4_INFO("px4muorb_send_topic_data called"); + uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance(); - return 0; + if (channel) { + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler) { + return rxHandler->process_received_message(topic_name, + data_len_in_bytes, + (uint8_t *) data); + + } else { + PX4_ERR("Null rx handler in %s", __FUNCTION__); + } + + } else { + PX4_ERR("Null channel pointer in %s", __FUNCTION__); + } + + return -1; } diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.hpp b/src/modules/muorb/slpi/uORBProtobufChannel.hpp index d39f51d5ef..1e5e3ea847 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.hpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.hpp @@ -35,6 +35,132 @@ #include #include +#include +#include +#include + +#include "uORB/uORBCommunicator.hpp" + +namespace uORB +{ +class ProtobufChannel; +} + +class uORB::ProtobufChannel : public uORBCommunicator::IChannel +{ +public: + /** + * static method to get the IChannel Implementor. + */ + static uORB::ProtobufChannel *GetInstance() + { + return &(_Instance); + } + + /** + * @brief Interface to notify the remote entity of a topic being advertised. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t topic_advertised(const char *messageName); + + /** + * @brief Interface to notify the remote entity of interest of a + * subscription for a message. + * + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @param msgRate + * The max rate at which the subscriber can accept the messages. This parameter is unused. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t add_subscription(const char *messageName, int32_t msgRateInHz); + + /** + * @brief Interface to notify the remote entity of removal of a subscription + * + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not necessarily mean that the receiver as received it. + * otherwise = failure. + */ + int16_t remove_subscription(const char *messageName); + + /** + * Register Message Handler. This is internal for the IChannel implementer* + */ + int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler); + + /** + * @brief Sends the data message over the communication link. + * @param messageName + * This represents the uORB message name; This message name should be + * globally unique. + * @param length + * The length of the data buffer to be sent. + * @param data + * The actual data to be sent. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + int16_t send_message(const char *messageName, int32_t length, uint8_t *data); + + uORBCommunicator::IChannelRxHandler *GetRxHandler() + { + return _RxHandler; + } + + void AddRemoteSubscriber(const std::string &messageName) + { + pthread_mutex_lock(&_rx_mutex); + _AppsSubscriberCache[messageName]++; + pthread_mutex_unlock(&_rx_mutex); + } + + void RemoveRemoteSubscriber(const std::string &messageName) + { + pthread_mutex_lock(&_rx_mutex); + + if (_AppsSubscriberCache[messageName]) { + _AppsSubscriberCache[messageName]--; + } + + pthread_mutex_unlock(&_rx_mutex); + } + + bool DebugEnabled() { return _debug; } + +private: + /** + * Data Members + */ + static uORB::ProtobufChannel _Instance; + static uORBCommunicator::IChannelRxHandler *_RxHandler; + static std::map _AppsSubscriberCache; + static pthread_mutex_t _tx_mutex; + static pthread_mutex_t _rx_mutex; + static bool _debug; + + /** + * Class Members + */ + ProtobufChannel() {}; +}; // TODO: This has to be defined in the slpi_proc build and in the PX4 build. // Make it accessible from one file to both builds. @@ -75,4 +201,4 @@ extern "C" { int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT; } -#endif /* _uORBProtobufChannel_hpp_ */ +#endif // _uORBProtobufChannel_hpp_