diff --git a/src/modules/zenoh/CMakeLists.txt b/src/modules/zenoh/CMakeLists.txt index cac47cbccc..61dc3b6c43 100644 --- a/src/modules/zenoh/CMakeLists.txt +++ b/src/modules/zenoh/CMakeLists.txt @@ -48,6 +48,16 @@ if(NOT DEFINED CONFIG_NET_TCPPROTO_OPTIONS AND CONFIG_PLATFORM_NUTTX) set(Z_FEATURE_TCP_NODELAY 0 CACHE STRING "Toggle TCP_NODELAY") endif() +if(NOT DEFINED CONFIG_PTHREAD_MUTEX_TYPES AND CONFIG_PLATFORM_NUTTX) + message( SEND_ERROR "Pthread mutex is diabled, Zenoh will not function." ) +endif() + +set(FRAG_MAX_SIZE 512 CACHE STRING "Use this to override the maximum size for fragmented messages") +set(BATCH_UNICAST_SIZE 256 CACHE STRING "Use this to override the maximum unicast batch size") +set(BATCH_MULTICAST_SIZE 256 CACHE STRING "Use this to override the maximum multicast batch size") + +set(Z_FEATURE_UNSTABLE_API 1 CACHE STRING "Toggle unstable Zenoh-C API") + px4_add_git_submodule(TARGET git_zenoh-pico PATH "zenoh-pico") add_subdirectory(zenoh-pico) unset(MESSAGE_QUIET) @@ -59,11 +69,7 @@ target_compile_options(zenohpico_static PUBLIC -Wno-cast-align -Wno-unused-result -Wno-type-limits -Wno-unused-variable - -Wno-maybe-uninitialized - -DZ_FEATURE_UNSTABLE_API - -DZ_BATCH_SIZE_RX=512 - -DZ_BATCH_SIZE_TX=512) - + -Wno-maybe-uninitialized) target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes) if(CONFIG_PLATFORM_NUTTX) diff --git a/src/modules/zenoh/Kconfig b/src/modules/zenoh/Kconfig index ad2bde5dc8..6e98efc2e5 100644 --- a/src/modules/zenoh/Kconfig +++ b/src/modules/zenoh/Kconfig @@ -28,6 +28,13 @@ if MODULES_ZENOH Declares liveliness tokens with key expressions in the way rmw_zenoh expects them Allowing to construct ROS2 graphs + config ZENOH_PUB_ON_MATCHING + bool "[EXPERIMENTAL] Only publish data when having matching subscribers" + default n + ---help--- + Uses the Zenoh matching feature to check whether a publisher has subscribers. + If so, only then publish the data. This is still experimental + # Choose exactly one item choice ZENOH_PUBSUB_SELECTION diff --git a/src/modules/zenoh/publishers/uorb_publisher.hpp b/src/modules/zenoh/publishers/uorb_publisher.hpp index ef291b05f4..d96f8a6db6 100644 --- a/src/modules/zenoh/publishers/uorb_publisher.hpp +++ b/src/modules/zenoh/publishers/uorb_publisher.hpp @@ -63,6 +63,14 @@ public: // Update the uORB Subscription and broadcast a Zenoh ROS2 message virtual int8_t update() override { +#ifdef CONFIG_ZENOH_PUB_ON_MATCHING + z_matching_status_t status; + + if (z_publisher_get_matching_status(z_loan(_pub), &status) == _Z_RES_OK && !status.matching) { + return _Z_RES_OK; + } + +#endif uint8_t data[_uorb_meta->o_size]; orb_copy(_uorb_meta, _uorb_sub, data); @@ -98,6 +106,11 @@ public: Zenoh_Publisher::print(); } + const char *getName() + { + return _uorb_meta->o_name; + } + private: const orb_metadata *_uorb_meta; int _uorb_sub; diff --git a/src/modules/zenoh/publishers/zenoh_publisher.cpp b/src/modules/zenoh/publishers/zenoh_publisher.cpp index 2fc958bca5..25f7a95c10 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.cpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.cpp @@ -44,6 +44,8 @@ Zenoh_Publisher::Zenoh_Publisher() { + attachment.sequence_number = 0; + attachment.rmw_gid_size = RMW_GID_STORAGE_SIZE; } Zenoh_Publisher::~Zenoh_Publisher() @@ -61,8 +63,6 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, { z_view_keyexpr_t ke; - this->rmw_gid = gid; - if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { printf("%s is not a valid key expression\n", keyexpr); return -1; @@ -73,6 +73,8 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, return -1; } + memcpy(attachment.rmw_gid, gid, RMW_GID_STORAGE_SIZE); + return 0; } @@ -81,18 +83,13 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size) z_publisher_put_options_t options; z_publisher_put_options_default(&options); - z_owned_bytes_t attachment; - z_bytes_empty(&attachment); + attachment.sequence_number++; + attachment.time = hrt_absolute_time(); - ze_owned_serializer_t serializer; - ze_serializer_empty(&serializer); + z_owned_bytes_t z_attachment; + z_bytes_from_static_buf(&z_attachment, (const uint8_t *)&attachment, RMW_ATTACHEMENT_SIZE); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++); - ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time()); - ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, RMW_GID_STORAGE_SIZE); - - ze_serializer_finish(z_move(serializer), &attachment); - options.attachment = z_move(attachment); + options.attachment = z_move(z_attachment); z_owned_bytes_t payload; z_bytes_copy_from_buf(&payload, buf, size); diff --git a/src/modules/zenoh/publishers/zenoh_publisher.hpp b/src/modules/zenoh/publishers/zenoh_publisher.hpp index e6bdf604e4..230048db32 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.hpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.hpp @@ -48,8 +48,15 @@ #include #include - #define RMW_GID_STORAGE_SIZE 16u +#define RMW_ATTACHEMENT_SIZE (8u + 8u + 1u + RMW_GID_STORAGE_SIZE) + +typedef struct __attribute__((__packed__)) RmwAttachment { + int64_t sequence_number; + int64_t time; + uint8_t rmw_gid_size; + uint8_t rmw_gid[RMW_GID_STORAGE_SIZE]; +} RmwAttachment; class Zenoh_Publisher : public ListNode { @@ -69,8 +76,5 @@ protected: int8_t publish(const uint8_t *, int size); z_owned_publisher_t _pub; - int64_t sequence_number; - - /* RMW_GID_STORAGE_SIZE bytes DDS-Like GID */ - uint8_t *rmw_gid; + RmwAttachment attachment; }; diff --git a/src/modules/zenoh/zenoh.cpp b/src/modules/zenoh/zenoh.cpp index 359bc2c8d4..318b770bd4 100644 --- a/src/modules/zenoh/zenoh.cpp +++ b/src/modules/zenoh/zenoh.cpp @@ -419,7 +419,7 @@ void ZENOH::run() ret = _zenoh_publishers[i]->update(); if (ret < 0) { - PX4_WARN("Publisher error %i", ret); + PX4_WARN("%s Publisher error %i", _zenoh_publishers[i]->getName(), ret); } } @@ -502,17 +502,21 @@ int ZENOH::print_status() PX4_INFO("Publishers"); - for (int i = 0; i < _pub_count; i++) { - if (_zenoh_publishers[i]) { - _zenoh_publishers[i]->print(); + if (_zenoh_publishers) { + for (int i = 0; i < _pub_count; i++) { + if (_zenoh_publishers[i]) { + _zenoh_publishers[i]->print(); + } } } PX4_INFO("Subscribers"); - for (int i = 0; i < _sub_count; i++) { - if (_zenoh_subscribers[i]) { - _zenoh_subscribers[i]->print(); + if (_zenoh_subscribers) { + for (int i = 0; i < _sub_count; i++) { + if (_zenoh_subscribers[i]) { + _zenoh_subscribers[i]->print(); + } } }