mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
Qurt MUORB Communication (#20584)
This commit is contained in:
parent
6b9d86680b
commit
b6ab7f159f
@ -33,6 +33,7 @@
|
||||
|
||||
#include <string.h>
|
||||
#include "uORBAppsProtobufChannel.hpp"
|
||||
#include "uORB/uORBManager.hpp"
|
||||
|
||||
extern "C" { __EXPORT int muorb_main(int argc, char *argv[]); }
|
||||
|
||||
@ -56,7 +57,10 @@ muorb_main(int argc, char *argv[])
|
||||
// Register the protobuf channel with UORB.
|
||||
uORB::AppsProtobufChannel *channel = uORB::AppsProtobufChannel::GetInstance();
|
||||
|
||||
if (channel && channel->Initialize(enable_debug)) { return OK; }
|
||||
if (channel && channel->Initialize(enable_debug)) {
|
||||
uORB::Manager::get_instance()->set_uorb_communicator(channel);
|
||||
return OK;
|
||||
}
|
||||
|
||||
} else if (!strcmp(argv[1], "test")) {
|
||||
uORB::AppsProtobufChannel *channel = uORB::AppsProtobufChannel::GetInstance();
|
||||
|
||||
@ -40,11 +40,20 @@ bool uORB::AppsProtobufChannel::test_flag = false;
|
||||
|
||||
// Initialize the static members
|
||||
uORB::AppsProtobufChannel *uORB::AppsProtobufChannel::_InstancePtr = nullptr;
|
||||
uORBCommunicator::IChannelRxHandler *uORB::AppsProtobufChannel::_RxHandler = nullptr;
|
||||
std::map<std::string, int> uORB::AppsProtobufChannel::_SlpiSubscriberCache;
|
||||
pthread_mutex_t uORB::AppsProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_mutex_t uORB::AppsProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
bool uORB::AppsProtobufChannel::_Debug = false;
|
||||
|
||||
|
||||
void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic,
|
||||
const uint8_t *data,
|
||||
uint32_t length_in_bytes)
|
||||
{
|
||||
|
||||
if (_Debug) { PX4_INFO("Got Receive callback for topic %s", topic); }
|
||||
|
||||
if (strcmp(topic, "slpi_debug") == 0) {
|
||||
PX4_INFO("SLPI: %s", (const char *) data);
|
||||
|
||||
@ -69,30 +78,74 @@ void uORB::AppsProtobufChannel::ReceiveCallback(const char *topic,
|
||||
|
||||
if (test_passed) { test_flag = true; }
|
||||
|
||||
return;
|
||||
|
||||
} else if (_RxHandler) {
|
||||
_RxHandler->process_received_message(topic,
|
||||
length_in_bytes,
|
||||
const_cast<uint8_t *>(data));
|
||||
|
||||
} else {
|
||||
PX4_INFO("Got received data callback for topic %s", topic);
|
||||
PX4_ERR("Couldn't handle topic %s in receive callback", topic);
|
||||
}
|
||||
}
|
||||
|
||||
void uORB::AppsProtobufChannel::AdvertiseCallback(const char *topic)
|
||||
{
|
||||
PX4_INFO("Got advertisement callback for topic %s", topic);
|
||||
if (_Debug) { PX4_INFO("Got advertisement callback for topic %s", topic); }
|
||||
|
||||
if (IS_MUORB_TEST(topic)) { test_flag = true; }
|
||||
if (IS_MUORB_TEST(topic)) {
|
||||
test_flag = true;
|
||||
return;
|
||||
|
||||
} else if (_RxHandler) {
|
||||
_RxHandler->process_remote_topic(topic, true);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Couldn't handle topic %s in advertise callback", topic);
|
||||
}
|
||||
}
|
||||
|
||||
void uORB::AppsProtobufChannel::SubscribeCallback(const char *topic)
|
||||
{
|
||||
PX4_INFO("Got subscription callback for topic %s", topic);
|
||||
if (_Debug) { PX4_INFO("Got subscription callback for topic %s", topic); }
|
||||
|
||||
if (IS_MUORB_TEST(topic)) { test_flag = true; }
|
||||
if (IS_MUORB_TEST(topic)) {
|
||||
test_flag = true;
|
||||
return;
|
||||
|
||||
} else if (_RxHandler) {
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
_SlpiSubscriberCache[topic]++;
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
|
||||
_RxHandler->process_add_subscription(topic, 1000);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Couldn't handle topic %s in subscribe callback", topic);
|
||||
}
|
||||
}
|
||||
|
||||
void uORB::AppsProtobufChannel::UnsubscribeCallback(const char *topic)
|
||||
{
|
||||
PX4_INFO("Got remove subscription callback for topic %s", topic);
|
||||
if (_Debug) { PX4_INFO("Got remove subscription callback for topic %s", topic); }
|
||||
|
||||
if (IS_MUORB_TEST(topic)) { test_flag = true; }
|
||||
if (IS_MUORB_TEST(topic)) {
|
||||
test_flag = true;
|
||||
return;
|
||||
|
||||
} else if (_RxHandler) {
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
|
||||
if (_SlpiSubscriberCache[topic]) { _SlpiSubscriberCache[topic]--; }
|
||||
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
|
||||
_RxHandler->process_remove_subscription(topic);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Couldn't handle topic %s in unsubscribe callback", topic);
|
||||
}
|
||||
}
|
||||
|
||||
bool uORB::AppsProtobufChannel::Test(MUORBTestType test_type)
|
||||
@ -110,22 +163,27 @@ bool uORB::AppsProtobufChannel::Test(MUORBTestType test_type)
|
||||
|
||||
switch (test_type) {
|
||||
case ADVERTISE_TEST_TYPE:
|
||||
rc = fc_sensor_advertise(muorb_test_topic_name);
|
||||
rc = topic_advertised(muorb_test_topic_name);
|
||||
PX4_INFO("succesfully did ADVERTISE_TEST_TYPE");
|
||||
break;
|
||||
|
||||
case SUBSCRIBE_TEST_TYPE:
|
||||
rc = fc_sensor_subscribe(muorb_test_topic_name);
|
||||
rc = add_subscription(muorb_test_topic_name, 1000);
|
||||
PX4_INFO("succesfully did SUBSCRIBE_TEST_TYPE");
|
||||
break;
|
||||
|
||||
case TOPIC_TEST_TYPE:
|
||||
rc = fc_sensor_send_data(muorb_test_topic_name, test_data, MUORB_TEST_DATA_LEN);
|
||||
PX4_INFO("succesfully did TOPIC_TEST_TYPE");
|
||||
break;
|
||||
|
||||
case UNSUBSCRIBE_TEST_TYPE:
|
||||
rc = fc_sensor_unsubscribe(muorb_test_topic_name);
|
||||
rc = remove_subscription(muorb_test_topic_name);
|
||||
PX4_INFO("succesfully did UNSUBSCRIBE_TEST_TYPE");
|
||||
break;
|
||||
|
||||
default:
|
||||
PX4_ERR("Unknown test type %d", test_type);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -170,7 +228,91 @@ bool uORB::AppsProtobufChannel::Initialize(bool enable_debug)
|
||||
|
||||
} else {
|
||||
PX4_INFO("muorb protobuf initalize method succeeded");
|
||||
_Initialized = true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int16_t uORB::AppsProtobufChannel::topic_advertised(const char *messageName)
|
||||
{
|
||||
if (_Initialized) {
|
||||
if (_Debug) { PX4_INFO("Advertising topic %s to remote side", messageName); }
|
||||
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = fc_sensor_advertise(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::AppsProtobufChannel::add_subscription(const char *messageName, int msgRateInHz)
|
||||
{
|
||||
// This parameter is unused.
|
||||
(void)(msgRateInHz);
|
||||
|
||||
if (_Initialized) {
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = fc_sensor_subscribe(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::AppsProtobufChannel::remove_subscription(const char *messageName)
|
||||
{
|
||||
if (_Initialized) {
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = fc_sensor_unsubscribe(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::AppsProtobufChannel::register_handler(uORBCommunicator::IChannelRxHandler *handler)
|
||||
{
|
||||
_RxHandler = handler;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int16_t uORB::AppsProtobufChannel::send_message(const char *messageName, int length, uint8_t *data)
|
||||
{
|
||||
// This is done to slow down high rate debug messages.
|
||||
bool enable_debug = false;
|
||||
|
||||
if ((_MessageCounter++ % 100) == 0) {
|
||||
enable_debug = true;
|
||||
}
|
||||
|
||||
if (_Initialized) {
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
int has_subscribers = _SlpiSubscriberCache[messageName];
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
|
||||
if (has_subscribers) {
|
||||
if (_Debug && enable_debug) {
|
||||
PX4_INFO("Sending data for topic %s", messageName);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = fc_sensor_send_data(messageName, data, length);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
|
||||
} else {
|
||||
if (_Debug && enable_debug) {
|
||||
PX4_INFO("No subscribers (yet) in %s for topic %s", __FUNCTION__, messageName);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -36,15 +36,19 @@
|
||||
|
||||
#include <stdint.h>
|
||||
#include <string>
|
||||
#include "MUORBTest.hpp"
|
||||
#include <map>
|
||||
|
||||
#include <px4_platform_common/log.h>
|
||||
|
||||
#include "MUORBTest.hpp"
|
||||
#include "uORB/uORBCommunicator.hpp"
|
||||
|
||||
namespace uORB
|
||||
{
|
||||
class AppsProtobufChannel;
|
||||
}
|
||||
|
||||
class uORB::AppsProtobufChannel
|
||||
class uORB::AppsProtobufChannel : public uORBCommunicator::IChannel
|
||||
{
|
||||
public:
|
||||
/**
|
||||
@ -67,15 +71,108 @@ public:
|
||||
return (_InstancePtr != nullptr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Method used to initialize the fc sensor callback on the apps side.
|
||||
*
|
||||
* @param enable_debug
|
||||
* This represents the debug boolean. If this is set to true, then specific
|
||||
* log information will be printed out.
|
||||
* @return
|
||||
* 1 = success; This means the initialization of the fc_sensor data has happened.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
bool Initialize(bool enable_debug);
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of a topic being advertised.
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name(aka topic); This message name should be
|
||||
* globally unique.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t topic_advertised(const char *messageName);
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of interest of a
|
||||
* subscription for a message.
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @param msgRate
|
||||
* The max rate at which the subscriber can accept the messages. This value is unused.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t add_subscription(const char *messageName, int msgRateInHz);
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of removal of a subscription
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not necessarily mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t remove_subscription(const char *messageName);
|
||||
|
||||
/**
|
||||
* Register Message Handler. This is internal for the IChannel implementer*
|
||||
*/
|
||||
int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler);
|
||||
|
||||
/**
|
||||
* @brief Sends the data message over the communication link.
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @param length
|
||||
* The length of the data buffer to be sent.
|
||||
* @param data
|
||||
* The actual data to be sent.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t send_message(const char *messageName, int length, uint8_t *data);
|
||||
|
||||
/**
|
||||
* @brief Interface to test the functions of the protobuf channel.
|
||||
*
|
||||
* @return
|
||||
* 1 = success; This means all callbacks have been tested and return the correct value.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
bool Test();
|
||||
|
||||
private: // data members
|
||||
private:
|
||||
/**
|
||||
* Data Members
|
||||
*/
|
||||
static uORB::AppsProtobufChannel *_InstancePtr;
|
||||
static uORBCommunicator::IChannelRxHandler *_RxHandler;
|
||||
static std::map<std::string, int> _SlpiSubscriberCache;
|
||||
static pthread_mutex_t _tx_mutex;
|
||||
static pthread_mutex_t _rx_mutex;
|
||||
static bool _Debug;
|
||||
|
||||
private://class members.
|
||||
/// constructor.
|
||||
bool _Initialized;
|
||||
uint32_t _MessageCounter;
|
||||
|
||||
private:
|
||||
/**
|
||||
* Class Members
|
||||
*/
|
||||
AppsProtobufChannel() {};
|
||||
|
||||
bool Test(MUORBTestType test_type);
|
||||
@ -91,4 +188,4 @@ private://class members.
|
||||
|
||||
};
|
||||
|
||||
#endif /* _uORBAppsProtobufChannel_hpp_ */
|
||||
#endif
|
||||
|
||||
@ -36,3 +36,4 @@ px4_add_library(modules__muorb__slpi
|
||||
../test/MUORBTest.cpp
|
||||
)
|
||||
target_include_directories(modules__muorb__slpi PRIVATE ../test)
|
||||
target_include_directories(modules__muorb__slpi PRIVATE ${PX4_BINARY_DIR}/platforms/common/uORB)
|
||||
@ -32,9 +32,11 @@
|
||||
****************************************************************************/
|
||||
|
||||
#include "uORBProtobufChannel.hpp"
|
||||
#include "uORB/uORBManager.hpp"
|
||||
#include "MUORBTest.hpp"
|
||||
#include <string>
|
||||
|
||||
#include <drivers/drv_hrt.h>
|
||||
#include <pthread.h>
|
||||
#include <px4_platform_common/tasks.h>
|
||||
#include <px4_platform_common/log.h>
|
||||
@ -44,27 +46,141 @@ static MUORBTestType test_to_run;
|
||||
|
||||
fc_func_ptrs muorb_func_ptrs;
|
||||
|
||||
// static initialization.
|
||||
uORB::ProtobufChannel uORB::ProtobufChannel::_Instance;
|
||||
uORBCommunicator::IChannelRxHandler *uORB::ProtobufChannel::_RxHandler;
|
||||
std::map<std::string, int> uORB::ProtobufChannel::_AppsSubscriberCache;
|
||||
pthread_mutex_t uORB::ProtobufChannel::_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_mutex_t uORB::ProtobufChannel::_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
bool uORB::ProtobufChannel::_debug = false;
|
||||
bool _px4_muorb_debug = false;
|
||||
static bool px4muorb_orb_initialized = false;
|
||||
|
||||
int16_t uORB::ProtobufChannel::topic_advertised(const char *messageName)
|
||||
{
|
||||
if (_debug) { PX4_INFO("Advertising %s on remote side", messageName); }
|
||||
|
||||
if (muorb_func_ptrs.advertise_func_ptr) {
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = muorb_func_ptrs.advertise_func_ptr(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
PX4_ERR("advertise_func_ptr is null in %s", __FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::ProtobufChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
|
||||
{
|
||||
// MsgRateInHz is unused in this function.
|
||||
if (_debug) { PX4_INFO("Subscribing to %s on remote side", messageName); }
|
||||
|
||||
if (muorb_func_ptrs.subscribe_func_ptr) {
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = muorb_func_ptrs.subscribe_func_ptr(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
PX4_ERR("subscribe_func_ptr is null in %s", __FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::ProtobufChannel::remove_subscription(const char *messageName)
|
||||
{
|
||||
if (_debug) { PX4_INFO("Unsubscribing from %s on remote side", messageName); }
|
||||
|
||||
if (muorb_func_ptrs.unsubscribe_func_ptr) {
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = muorb_func_ptrs.unsubscribe_func_ptr(messageName);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
PX4_ERR("unsubscribe_func_ptr is null in %s", __FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int16_t uORB::ProtobufChannel::register_handler(uORBCommunicator::IChannelRxHandler *handler)
|
||||
{
|
||||
_RxHandler = handler;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int16_t uORB::ProtobufChannel::send_message(const char *messageName, int32_t length, uint8_t *data)
|
||||
{
|
||||
// This function can be called from the PX4 log function so we have to make
|
||||
// sure that we do not call PX4_INFO, PX4_ERR, etc. That would cause an
|
||||
// infinite loop!
|
||||
bool is_not_slpi_log = true;
|
||||
|
||||
if ((strcmp(messageName, "slpi_debug") == 0) || (strcmp(messageName, "slpi_error") == 0)) {
|
||||
is_not_slpi_log = false;
|
||||
}
|
||||
|
||||
if (muorb_func_ptrs.topic_data_func_ptr) {
|
||||
if ((_debug) && (is_not_slpi_log)) {
|
||||
PX4_INFO("Got message for topic %s", messageName);
|
||||
}
|
||||
|
||||
std::string temp(messageName);
|
||||
int has_subscribers = 0;
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
has_subscribers = _AppsSubscriberCache[temp];
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
|
||||
if ((has_subscribers) || (is_not_slpi_log == false)) {
|
||||
if ((_debug) && (is_not_slpi_log)) {
|
||||
PX4_INFO("Sending message for topic %s", messageName);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&_tx_mutex);
|
||||
int16_t rc = muorb_func_ptrs.topic_data_func_ptr(messageName, data, length);
|
||||
pthread_mutex_unlock(&_tx_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if ((_debug) && (is_not_slpi_log)) {
|
||||
PX4_INFO("Skipping message for topic %s", messageName);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (is_not_slpi_log) {
|
||||
PX4_ERR("topic_data_func_ptr is null in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void *test_runner(void *)
|
||||
{
|
||||
PX4_INFO("test_runner called");
|
||||
if (_px4_muorb_debug) { PX4_INFO("test_runner called"); }
|
||||
|
||||
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
|
||||
|
||||
switch (test_to_run) {
|
||||
case ADVERTISE_TEST_TYPE:
|
||||
(void) muorb_func_ptrs.advertise_func_ptr(muorb_test_topic_name);
|
||||
(void) channel->topic_advertised(muorb_test_topic_name);
|
||||
break;
|
||||
|
||||
case SUBSCRIBE_TEST_TYPE:
|
||||
(void) muorb_func_ptrs.subscribe_func_ptr(muorb_test_topic_name);
|
||||
(void) channel->add_subscription(muorb_test_topic_name, 1);
|
||||
break;
|
||||
|
||||
case UNSUBSCRIBE_TEST_TYPE:
|
||||
(void) muorb_func_ptrs.unsubscribe_func_ptr(muorb_test_topic_name);
|
||||
(void) channel->remove_subscription(muorb_test_topic_name);
|
||||
break;
|
||||
|
||||
case TOPIC_TEST_TYPE: {
|
||||
uint8_t data[MUORB_TEST_DATA_LEN];
|
||||
|
||||
for (uint8_t i = 0; i < MUORB_TEST_DATA_LEN; i++) { data[i] = i; }
|
||||
for (uint8_t i = 0; i < MUORB_TEST_DATA_LEN; i++) {
|
||||
data[i] = i;
|
||||
}
|
||||
|
||||
(void) muorb_func_ptrs.topic_data_func_ptr(muorb_test_topic_name, data, MUORB_TEST_DATA_LEN);
|
||||
}
|
||||
@ -78,11 +194,42 @@ static void *test_runner(void *)
|
||||
|
||||
int px4muorb_orb_initialize(fc_func_ptrs *func_ptrs, int32_t clock_offset_us)
|
||||
{
|
||||
// These function pointers will only be non-null on the first call
|
||||
// so they must be saved off here
|
||||
if (func_ptrs != nullptr) { muorb_func_ptrs = *func_ptrs; }
|
||||
hrt_set_absolute_time_offset(clock_offset_us);
|
||||
|
||||
PX4_INFO("px4muorb_orb_initialize called");
|
||||
if (! px4muorb_orb_initialized) {
|
||||
if (func_ptrs != nullptr) {
|
||||
muorb_func_ptrs = *func_ptrs;
|
||||
|
||||
} else {
|
||||
PX4_ERR("NULL top level function pointer in %s", __FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((muorb_func_ptrs.advertise_func_ptr == NULL) ||
|
||||
(muorb_func_ptrs.subscribe_func_ptr == NULL) ||
|
||||
(muorb_func_ptrs.unsubscribe_func_ptr == NULL) ||
|
||||
(muorb_func_ptrs.topic_data_func_ptr == NULL) ||
|
||||
(muorb_func_ptrs._config_spi_bus_func_t == NULL) ||
|
||||
(muorb_func_ptrs._spi_transfer_func_t == NULL) ||
|
||||
(muorb_func_ptrs._config_i2c_bus_func_t == NULL) ||
|
||||
(muorb_func_ptrs._set_i2c_address_func_t == NULL) ||
|
||||
(muorb_func_ptrs._i2c_transfer_func_t == NULL) ||
|
||||
(muorb_func_ptrs.open_uart_func_t == NULL) ||
|
||||
(muorb_func_ptrs.write_uart_func_t == NULL) ||
|
||||
(muorb_func_ptrs.read_uart_func_t == NULL) ||
|
||||
(muorb_func_ptrs.register_interrupt_callback == NULL)) {
|
||||
PX4_ERR("NULL function pointers in %s", __FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
uORB::Manager::initialize();
|
||||
uORB::Manager::get_instance()->set_uorb_communicator(
|
||||
uORB::ProtobufChannel::GetInstance());
|
||||
|
||||
px4muorb_orb_initialized = true;
|
||||
|
||||
if (_px4_muorb_debug) { PX4_INFO("px4muorb_orb_initialize called"); }
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -93,39 +240,102 @@ char stack[TEST_STACK_SIZE];
|
||||
void run_test(MUORBTestType test)
|
||||
{
|
||||
test_to_run = test;
|
||||
(void)px4_task_spawn_cmd("test_MUORB",
|
||||
SCHED_DEFAULT,
|
||||
SCHED_PRIORITY_MAX - 2,
|
||||
2000,
|
||||
(px4_main_t)&test_runner,
|
||||
nullptr);
|
||||
(void) px4_task_spawn_cmd("test_MUORB",
|
||||
SCHED_DEFAULT,
|
||||
SCHED_PRIORITY_MAX - 2,
|
||||
2000,
|
||||
(px4_main_t)&test_runner,
|
||||
nullptr);
|
||||
}
|
||||
|
||||
int px4muorb_topic_advertised(const char *topic_name)
|
||||
{
|
||||
if (IS_MUORB_TEST(topic_name)) { run_test(ADVERTISE_TEST_TYPE); }
|
||||
if (IS_MUORB_TEST(topic_name)) {
|
||||
run_test(ADVERTISE_TEST_TYPE);
|
||||
|
||||
PX4_INFO("px4muorb_topic_advertised called");
|
||||
if (_px4_muorb_debug) { PX4_INFO("px4muorb_topic_advertised for muorb test called"); }
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
|
||||
|
||||
if (channel) {
|
||||
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
|
||||
|
||||
if (rxHandler) {
|
||||
return rxHandler->process_remote_topic(topic_name, true);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null rx handler in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null channel pointer in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int px4muorb_add_subscriber(const char *topic_name)
|
||||
{
|
||||
if (IS_MUORB_TEST(topic_name)) { run_test(SUBSCRIBE_TEST_TYPE); }
|
||||
if (IS_MUORB_TEST(topic_name)) {
|
||||
run_test(SUBSCRIBE_TEST_TYPE);
|
||||
|
||||
PX4_INFO("px4muorb_add_subscriber called");
|
||||
if (_px4_muorb_debug) { PX4_INFO("px4muorb_add_subscriber for muorb test called"); }
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
|
||||
|
||||
if (channel) {
|
||||
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
|
||||
|
||||
if (rxHandler) {
|
||||
channel->AddRemoteSubscriber(topic_name);
|
||||
// Pick a high message rate of 1000 Hz
|
||||
return rxHandler->process_add_subscription(topic_name, 1000);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null rx handler in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null channel pointer in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int px4muorb_remove_subscriber(const char *topic_name)
|
||||
{
|
||||
if (IS_MUORB_TEST(topic_name)) { run_test(UNSUBSCRIBE_TEST_TYPE); }
|
||||
if (IS_MUORB_TEST(topic_name)) {
|
||||
run_test(UNSUBSCRIBE_TEST_TYPE);
|
||||
|
||||
PX4_INFO("px4muorb_remove_subscriber called");
|
||||
if (_px4_muorb_debug) { PX4_INFO("px4muorb_remove_subscriber for muorb test called"); }
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
|
||||
|
||||
if (channel) {
|
||||
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
|
||||
|
||||
if (rxHandler) {
|
||||
channel->RemoveRemoteSubscriber(topic_name);
|
||||
return rxHandler->process_remove_subscription(topic_name);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null rx handler in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null channel pointer in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data,
|
||||
@ -148,9 +358,29 @@ int px4muorb_send_topic_data(const char *topic_name, const uint8_t *data,
|
||||
}
|
||||
|
||||
if (test_passed) { run_test(TOPIC_TEST_TYPE); }
|
||||
|
||||
if (_px4_muorb_debug) { PX4_INFO("px4muorb_send_topic_data called"); }
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
PX4_INFO("px4muorb_send_topic_data called");
|
||||
uORB::ProtobufChannel *channel = uORB::ProtobufChannel::GetInstance();
|
||||
|
||||
return 0;
|
||||
if (channel) {
|
||||
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
|
||||
|
||||
if (rxHandler) {
|
||||
return rxHandler->process_received_message(topic_name,
|
||||
data_len_in_bytes,
|
||||
(uint8_t *) data);
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null rx handler in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("Null channel pointer in %s", __FUNCTION__);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -35,6 +35,132 @@
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "uORB/uORBCommunicator.hpp"
|
||||
|
||||
namespace uORB
|
||||
{
|
||||
class ProtobufChannel;
|
||||
}
|
||||
|
||||
class uORB::ProtobufChannel : public uORBCommunicator::IChannel
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* static method to get the IChannel Implementor.
|
||||
*/
|
||||
static uORB::ProtobufChannel *GetInstance()
|
||||
{
|
||||
return &(_Instance);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of a topic being advertised.
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name(aka topic); This message name should be
|
||||
* globally unique.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t topic_advertised(const char *messageName);
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of interest of a
|
||||
* subscription for a message.
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @param msgRate
|
||||
* The max rate at which the subscriber can accept the messages. This parameter is unused.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t add_subscription(const char *messageName, int32_t msgRateInHz);
|
||||
|
||||
/**
|
||||
* @brief Interface to notify the remote entity of removal of a subscription
|
||||
*
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not necessarily mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t remove_subscription(const char *messageName);
|
||||
|
||||
/**
|
||||
* Register Message Handler. This is internal for the IChannel implementer*
|
||||
*/
|
||||
int16_t register_handler(uORBCommunicator::IChannelRxHandler *handler);
|
||||
|
||||
/**
|
||||
* @brief Sends the data message over the communication link.
|
||||
* @param messageName
|
||||
* This represents the uORB message name; This message name should be
|
||||
* globally unique.
|
||||
* @param length
|
||||
* The length of the data buffer to be sent.
|
||||
* @param data
|
||||
* The actual data to be sent.
|
||||
* @return
|
||||
* 0 = success; This means the messages is successfully sent to the receiver
|
||||
* Note: This does not mean that the receiver as received it.
|
||||
* otherwise = failure.
|
||||
*/
|
||||
int16_t send_message(const char *messageName, int32_t length, uint8_t *data);
|
||||
|
||||
uORBCommunicator::IChannelRxHandler *GetRxHandler()
|
||||
{
|
||||
return _RxHandler;
|
||||
}
|
||||
|
||||
void AddRemoteSubscriber(const std::string &messageName)
|
||||
{
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
_AppsSubscriberCache[messageName]++;
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
}
|
||||
|
||||
void RemoveRemoteSubscriber(const std::string &messageName)
|
||||
{
|
||||
pthread_mutex_lock(&_rx_mutex);
|
||||
|
||||
if (_AppsSubscriberCache[messageName]) {
|
||||
_AppsSubscriberCache[messageName]--;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&_rx_mutex);
|
||||
}
|
||||
|
||||
bool DebugEnabled() { return _debug; }
|
||||
|
||||
private:
|
||||
/**
|
||||
* Data Members
|
||||
*/
|
||||
static uORB::ProtobufChannel _Instance;
|
||||
static uORBCommunicator::IChannelRxHandler *_RxHandler;
|
||||
static std::map<std::string, int> _AppsSubscriberCache;
|
||||
static pthread_mutex_t _tx_mutex;
|
||||
static pthread_mutex_t _rx_mutex;
|
||||
static bool _debug;
|
||||
|
||||
/**
|
||||
* Class Members
|
||||
*/
|
||||
ProtobufChannel() {};
|
||||
};
|
||||
|
||||
// TODO: This has to be defined in the slpi_proc build and in the PX4 build.
|
||||
// Make it accessible from one file to both builds.
|
||||
@ -75,4 +201,4 @@ extern "C" {
|
||||
int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT;
|
||||
}
|
||||
|
||||
#endif /* _uORBProtobufChannel_hpp_ */
|
||||
#endif // _uORBProtobufChannel_hpp_
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user