diff --git a/src/modules/zenoh/CMakeLists.txt b/src/modules/zenoh/CMakeLists.txt index 6765ce7ae4..f335136cb1 100644 --- a/src/modules/zenoh/CMakeLists.txt +++ b/src/modules/zenoh/CMakeLists.txt @@ -60,6 +60,7 @@ target_compile_options(zenohpico_static PUBLIC -Wno-cast-align -Wno-type-limits -Wno-unused-variable -Wno-maybe-uninitialized + -DZ_FEATURE_UNSTABLE_API -DZ_BATCH_SIZE_RX=512 -DZ_BATCH_SIZE_TX=512) diff --git a/src/modules/zenoh/publishers/zenoh_publisher.cpp b/src/modules/zenoh/publishers/zenoh_publisher.cpp index 11dfe16149..2fc958bca5 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.cpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.cpp @@ -87,14 +87,9 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size) ze_owned_serializer_t serializer; ze_serializer_empty(&serializer); - ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++); - - ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time()); - - ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); - ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, 16); + ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, RMW_GID_STORAGE_SIZE); ze_serializer_finish(z_move(serializer), &attachment); options.attachment = z_move(attachment); diff --git a/src/modules/zenoh/publishers/zenoh_publisher.hpp b/src/modules/zenoh/publishers/zenoh_publisher.hpp index 696c0b03be..e6bdf604e4 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.hpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.hpp @@ -48,6 +48,9 @@ #include #include + +#define RMW_GID_STORAGE_SIZE 16u + class Zenoh_Publisher : public ListNode { public: @@ -68,6 +71,6 @@ protected: z_owned_publisher_t _pub; int64_t sequence_number; - /* 16 bytes DDS-Like GID */ + /* RMW_GID_STORAGE_SIZE bytes DDS-Like GID */ uint8_t *rmw_gid; }; diff --git a/src/modules/zenoh/subscribers/uorb_subscriber.hpp b/src/modules/zenoh/subscribers/uorb_subscriber.hpp index 3df9868324..694193d718 100644 --- a/src/modules/zenoh/subscribers/uorb_subscriber.hpp +++ b/src/modules/zenoh/subscribers/uorb_subscriber.hpp @@ -58,7 +58,11 @@ public: _uorb_pub_handle = orb_advertise_multi(_uorb_meta, nullptr, &instance); }; - ~uORB_Zenoh_Subscriber() override = default; + ~uORB_Zenoh_Subscriber() + { + undeclare_subscriber(); + orb_unadvertise(_uorb_pub_handle); + } // Update the uORB Subscription and broadcast a Zenoh ROS2 message void data_handler(const z_loaned_sample_t *sample) @@ -66,14 +70,33 @@ public: char data[_uorb_meta->o_size]; // TODO process rmw_zenoh attachment - const z_loaned_bytes_t *payload = z_sample_payload(sample); size_t len = z_bytes_len(payload); - dds_istream_t is = {.m_buffer = (unsigned char *)(payload), .m_size = static_cast(len), - .m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2 - }; - dds_stream_read(&is, data, &dds_allocator, _cdr_ops); +#if defined(Z_FEATURE_UNSTABLE_API) + // Check if payload is contiguous so we can decode directly on that pointer + z_view_slice_t view; + + if (z_bytes_get_contiguous_view(payload, &view) == Z_OK) { + const uint8_t *ptr = z_slice_data(z_loan(view)); + + dds_istream_t is = {.m_buffer = (unsigned char *)(ptr), .m_size = static_cast(len), + .m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2 + }; + dds_stream_read(&is, data, &dds_allocator, _cdr_ops); + + } else +#endif + { + unsigned char reassembled_payload[len]; + z_bytes_reader_t reader = z_bytes_get_reader(payload); + z_bytes_reader_read(&reader, reassembled_payload, len); + + dds_istream_t is = {.m_buffer = reassembled_payload, .m_size = static_cast(len), + .m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2 + }; + dds_stream_read(&is, data, &dds_allocator, _cdr_ops); + } // As long as we don't have timesynchronization between Zenoh nodes // we've to manually set the timestamp