diff --git a/platforms/common/uORB/uORBCommunicator.hpp b/platforms/common/uORB/uORBCommunicator.hpp index cbb996b82b..8122b34780 100644 --- a/platforms/common/uORB/uORBCommunicator.hpp +++ b/platforms/common/uORB/uORBCommunicator.hpp @@ -67,21 +67,8 @@ public: * Note: This does not mean that the receiver as received it. * otherwise = failure. */ - virtual int16_t topic_advertised(const char *messageName) = 0; - /** - * @brief Interface to notify the remote entity of a topic being unadvertised - * and is no longer publishing messages. - * - * @param messageName - * This represents the uORB message name(aka topic); This message name should be - * globally unique. - * @return - * 0 = success; This means the messages is successfully sent to the receiver - * Note: This does not mean that the receiver as received it. - * otherwise = failure. - */ - //virtual int16_t topic_unadvertised(const char *messageName) = 0; + virtual int16_t topic_advertised(const char *messageName) = 0; /** * @brief Interface to notify the remote entity of interest of a @@ -100,8 +87,6 @@ public: virtual int16_t add_subscription(const char *messageName, int32_t msgRateInHz) = 0; - - /** * @brief Interface to notify the remote entity of removal of a subscription * @@ -116,10 +101,10 @@ public: virtual int16_t remove_subscription(const char *messageName) = 0; - /** * Register Message Handler. This is internal for the IChannel implementer* */ + virtual int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler) = 0; @@ -155,35 +140,30 @@ class uORBCommunicator::IChannelRxHandler public: /** - * Interface to process a received topic from remote. + * Interface to process a received topic advertisement from remote. * @param topic_name * This represents the uORB message Name (topic); This message Name should be * globally unique. - * @param isAdvertisement - * Represents if the topic has been advertised or is no longer avialable. * @return * 0 = success; This means the messages is successfully handled in the * handler. * otherwise = failure. */ - virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement) = 0; + virtual int16_t process_remote_topic(const char *topic_name) = 0; /** * Interface to process a received AddSubscription from remote. * @param messageName * This represents the uORB message Name; This message Name should be * globally unique. - * @param msgRate - * The max rate at which the subscriber can accept the messages. * @return * 0 = success; This means the messages is successfully handled in the * handler. * otherwise = failure. */ - virtual int16_t process_add_subscription(const char *messageName, int32_t msgRateInHz) = 0; - + virtual int16_t process_add_subscription(const char *messageName) = 0; /** * Interface to process a received control msg to remove subscription diff --git a/platforms/common/uORB/uORBDeviceNode.cpp b/platforms/common/uORB/uORBDeviceNode.cpp index 15bbcd5d56..7cc5c9bd3e 100644 --- a/platforms/common/uORB/uORBDeviceNode.cpp +++ b/platforms/common/uORB/uORBDeviceNode.cpp @@ -436,7 +436,7 @@ void uORB::DeviceNode::remove_internal_subscriber() } #ifdef CONFIG_ORB_COMMUNICATOR -int16_t uORB::DeviceNode::process_add_subscription(int32_t rateInHz) +int16_t uORB::DeviceNode::process_add_subscription() { // if there is already data in the node, send this out to // the remote entity. diff --git a/platforms/common/uORB/uORBDeviceNode.hpp b/platforms/common/uORB/uORBDeviceNode.hpp index 5ffb18b58d..c5c5bb22c0 100644 --- a/platforms/common/uORB/uORBDeviceNode.hpp +++ b/platforms/common/uORB/uORBDeviceNode.hpp @@ -124,17 +124,23 @@ public: static int unadvertise(orb_advert_t handle); #ifdef CONFIG_ORB_COMMUNICATOR - static int16_t topic_advertised(const orb_metadata *meta); - /** - * processes a request for add subscription from remote - * @param rateInHz - * Specifies the desired rate for the message. + * processes a request for topic advertisement from remote + * @param meta + * The uORB metadata (usually from the ORB_ID() macro) for the topic. * @return * 0 = success * otherwise failure. */ - int16_t process_add_subscription(int32_t rateInHz); + static int16_t topic_advertised(const orb_metadata *meta); + + /** + * processes a request for add subscription from remote + * @return + * 0 = success + * otherwise failure. + */ + int16_t process_add_subscription(); /** * processes a request to remove a subscription from remote. diff --git a/platforms/common/uORB/uORBManager.cpp b/platforms/common/uORB/uORBManager.cpp index 030a38df1d..5b5530694b 100644 --- a/platforms/common/uORB/uORBManager.cpp +++ b/platforms/common/uORB/uORBManager.cpp @@ -262,50 +262,6 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance) } } -#ifdef CONFIG_ORB_COMMUNICATOR - - /* - * Generate the path to the node and try to open it. - */ - char path[orb_maxpath]; - int inst = instance; - - ret = uORB::Utils::node_mkpath(path, meta, &inst); - - if (ret != OK) { - errno = -ret; - return PX4_ERROR; - } - - ret = px4_access(path, F_OK); - - if (ret == -1) { - if (_remote_topics.find(meta->o_name)) { - ret = 0; - } - } - - if (ret == 0) { - // we know the topic exists, but it's not necessarily advertised/published yet (for example - // if there is only a subscriber) - // The open() will not lead to memory allocations. - int fd = px4_open(path, 0); - - if (fd >= 0) { - unsigned long is_advertised; - - if (px4_ioctl(fd, ORBIOCISADVERTISED, (unsigned long)&is_advertised) == 0) { - if (!is_advertised) { - ret = PX4_ERROR; - } - } - - px4_close(fd); - } - } - -#endif /* CONFIG_ORB_COMMUNICATOR */ - return ret; } @@ -365,8 +321,13 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, } #ifdef CONFIG_ORB_COMMUNICATOR - // For remote systems call over and inform them - uORB::DeviceNode::topic_advertised(meta); + + // Advertise to the remote side, but only if it is a local topic. Otherwise + // we will generate an advertisement loop. + if (_remote_topics.find(meta->o_name) == false) { + uORB::DeviceNode::topic_advertised(meta); + } + #endif /* CONFIG_ORB_COMMUNICATOR */ /* the advertiser may perform an initial publish to initialise the object */ @@ -618,11 +579,14 @@ int uORB::Manager::node_open(const struct orb_metadata *meta, bool advertiser, i #ifdef CONFIG_ORB_COMMUNICATOR void uORB::Manager::set_uorb_communicator(uORBCommunicator::IChannel *channel) { - _comm_channel = channel; + pthread_mutex_lock(&_communicator_mutex); - if (_comm_channel != nullptr) { - _comm_channel->register_handler(this); + if (channel != nullptr) { + channel->register_handler(this); + _comm_channel = channel; } + + pthread_mutex_unlock(&_communicator_mutex); } uORBCommunicator::IChannel *uORB::Manager::get_uorb_communicator() @@ -634,27 +598,30 @@ uORBCommunicator::IChannel *uORB::Manager::get_uorb_communicator() return temp; } -int16_t uORB::Manager::process_remote_topic(const char *topic_name, bool isAdvertisement) +int16_t uORB::Manager::process_remote_topic(const char *topic_name) { PX4_DEBUG("entering process_remote_topic: name: %s", topic_name); - int16_t rc = 0; - + // Look to see if we already have a node for this topic char nodepath[orb_maxpath]; int ret = uORB::Utils::node_mkpath(nodepath, topic_name); - DeviceMaster *device_master = get_device_master(); - if (ret == OK && device_master && isAdvertisement) { - uORB::DeviceNode *node = device_master->getDeviceNode(nodepath); + if (ret == OK) { + DeviceMaster *device_master = get_device_master(); - if (node) { - node->mark_as_advertised(); - _remote_topics.insert(topic_name); - return rc; + if (device_master) { + uORB::DeviceNode *node = device_master->getDeviceNode(nodepath); + + if (node) { + PX4_INFO("Marking DeviceNode(%s) as advertised in process_remote_topic", topic_name); + node->mark_as_advertised(); + _remote_topics.insert(topic_name); + return 0; + } } } - // Didn't find a node so we need to create it via an advertisement + // We didn't find a node so we need to create it via an advertisement const struct orb_metadata *const *topic_list = orb_get_topics(); orb_id_t topic_ptr = nullptr; @@ -668,23 +635,23 @@ int16_t uORB::Manager::process_remote_topic(const char *topic_name, bool isAdver if (topic_ptr) { PX4_INFO("Advertising remote topic %s", topic_name); _remote_topics.insert(topic_name); - orb_advertise(topic_ptr, nullptr); + // Add some queue depth when advertising remote topics. These + // topics may get aggregated and thus delivered in a batch that + // requires some buffering in a queue. + orb_advertise(topic_ptr, nullptr, 5); } else { PX4_INFO("process_remote_topic meta not found for %s\n", topic_name); - _remote_topics.erase(topic_name); - rc = -1; } - return rc; + return 0; } -int16_t uORB::Manager::process_add_subscription(const char *messageName, int32_t msgRateInHz) +int16_t uORB::Manager::process_add_subscription(const char *messageName) { PX4_DEBUG("entering Manager_process_add_subscription: name: %s", messageName); int16_t rc = 0; - _remote_subscriber_topics.insert(messageName); char nodepath[orb_maxpath]; int ret = uORB::Utils::node_mkpath(nodepath, messageName); DeviceMaster *device_master = get_device_master(); @@ -697,7 +664,7 @@ int16_t uORB::Manager::process_add_subscription(const char *messageName, int32_t } else { // node is present. - node->process_add_subscription(msgRateInHz); + node->process_add_subscription(); } } else { @@ -710,7 +677,6 @@ int16_t uORB::Manager::process_add_subscription(const char *messageName, int32_t int16_t uORB::Manager::process_remove_subscription(const char *messageName) { int16_t rc = -1; - _remote_subscriber_topics.erase(messageName); char nodepath[orb_maxpath]; int ret = uORB::Utils::node_mkpath(nodepath, messageName); DeviceMaster *device_master = get_device_master(); @@ -757,11 +723,6 @@ int16_t uORB::Manager::process_received_message(const char *messageName, int32_t return rc; } -bool uORB::Manager::is_remote_subscriber_present(const char *messageName) -{ - return _remote_subscriber_topics.find(messageName); -} - #endif /* CONFIG_ORB_COMMUNICATOR */ #ifdef ORB_USE_PUBLISHER_RULES diff --git a/platforms/common/uORB/uORBManager.hpp b/platforms/common/uORB/uORBManager.hpp index b466f0cd22..1c2870b1ee 100644 --- a/platforms/common/uORB/uORBManager.hpp +++ b/platforms/common/uORB/uORBManager.hpp @@ -481,11 +481,6 @@ public: */ uORBCommunicator::IChannel *get_uorb_communicator(); - /** - * Utility method to check if there is a remote subscriber present - * for a given topic - */ - bool is_remote_subscriber_present(const char *messageName); #endif /* CONFIG_ORB_COMMUNICATOR */ private: // class methods @@ -506,7 +501,7 @@ private: // data members uORBCommunicator::IChannel *_comm_channel{nullptr}; static pthread_mutex_t _communicator_mutex; - ORBSet _remote_subscriber_topics; + // Track the advertisements we get from the remote side ORBSet _remote_topics; #endif /* CONFIG_ORB_COMMUNICATOR */ @@ -518,32 +513,28 @@ private: //class methods #ifdef CONFIG_ORB_COMMUNICATOR /** - * Interface to process a received topic from remote. + * Interface to process a received topic advertisement from remote. * @param topic_name * This represents the uORB message Name (topic); This message Name should be * globally unique. - * @param isAdvertisement - * Represents if the topic has been advertised or is no longer avialable. * @return * 0 = success; This means the messages is successfully handled in the * handler. * otherwise = failure. */ - virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement); + virtual int16_t process_remote_topic(const char *topic_name); /** * Interface to process a received AddSubscription from remote. * @param messageName * This represents the uORB message Name; This message Name should be * globally unique. - * @param msgRate - * The max rate at which the subscriber can accept the messages. * @return * 0 = success; This means the messages is successfully handled in the * handler. * otherwise = failure. */ - virtual int16_t process_add_subscription(const char *messageName, int32_t msgRateInHz); + virtual int16_t process_add_subscription(const char *messageName); /** * Interface to process a received control msg to remove subscription diff --git a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp index 3c64ce0618..4687f4b37b 100644 --- a/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp +++ b/src/modules/muorb/apps/uORBAppsProtobufChannel.cpp @@ -99,7 +99,7 @@ void uORB::AppsProtobufChannel::AdvertiseCallback(const char *topic) return; } else if (_RxHandler) { - _RxHandler->process_remote_topic(topic, true); + _RxHandler->process_remote_topic(topic); } else { PX4_ERR("Couldn't handle topic %s in advertise callback", topic); @@ -119,7 +119,7 @@ void uORB::AppsProtobufChannel::SubscribeCallback(const char *topic) _SlpiSubscriberCache[topic]++; pthread_mutex_unlock(&_rx_mutex); - _RxHandler->process_add_subscription(topic, 1000); + _RxHandler->process_add_subscription(topic); } else { PX4_ERR("Couldn't handle topic %s in subscribe callback", topic); diff --git a/src/modules/muorb/slpi/uORBProtobufChannel.cpp b/src/modules/muorb/slpi/uORBProtobufChannel.cpp index c0eed28b52..fae8900dcb 100644 --- a/src/modules/muorb/slpi/uORBProtobufChannel.cpp +++ b/src/modules/muorb/slpi/uORBProtobufChannel.cpp @@ -264,7 +264,7 @@ int px4muorb_topic_advertised(const char *topic_name) uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); if (rxHandler) { - return rxHandler->process_remote_topic(topic_name, true); + return rxHandler->process_remote_topic(topic_name); } else { PX4_ERR("Null rx handler in %s", __FUNCTION__); @@ -295,7 +295,7 @@ int px4muorb_add_subscriber(const char *topic_name) if (rxHandler) { channel->AddRemoteSubscriber(topic_name); // Pick a high message rate of 1000 Hz - return rxHandler->process_add_subscription(topic_name, 1000); + return rxHandler->process_add_subscription(topic_name); } else { PX4_ERR("Null rx handler in %s", __FUNCTION__);