rmw attachment serialization changes

Use new atachment serialization format
Subscriber fix parsing payload and remove uorb publisher on destructor
This commit is contained in:
Peter van der Perk 2025-05-04 15:29:08 +02:00 committed by Beat Küng
parent 61e2f566ca
commit ac2627cca9
4 changed files with 35 additions and 13 deletions

View File

@ -60,6 +60,7 @@ target_compile_options(zenohpico_static PUBLIC -Wno-cast-align
-Wno-type-limits
-Wno-unused-variable
-Wno-maybe-uninitialized
-DZ_FEATURE_UNSTABLE_API
-DZ_BATCH_SIZE_RX=512
-DZ_BATCH_SIZE_TX=512)

View File

@ -87,14 +87,9 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size)
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time());
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, 16);
ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), &attachment);
options.attachment = z_move(attachment);

View File

@ -48,6 +48,9 @@
#include <containers/List.hpp>
#include <zenoh-pico.h>
#define RMW_GID_STORAGE_SIZE 16u
class Zenoh_Publisher : public ListNode<Zenoh_Publisher *>
{
public:
@ -68,6 +71,6 @@ protected:
z_owned_publisher_t _pub;
int64_t sequence_number;
/* 16 bytes DDS-Like GID */
/* RMW_GID_STORAGE_SIZE bytes DDS-Like GID */
uint8_t *rmw_gid;
};

View File

@ -58,7 +58,11 @@ public:
_uorb_pub_handle = orb_advertise_multi(_uorb_meta, nullptr, &instance);
};
~uORB_Zenoh_Subscriber() override = default;
~uORB_Zenoh_Subscriber()
{
undeclare_subscriber();
orb_unadvertise(_uorb_pub_handle);
}
// Update the uORB Subscription and broadcast a Zenoh ROS2 message
void data_handler(const z_loaned_sample_t *sample)
@ -66,14 +70,33 @@ public:
char data[_uorb_meta->o_size];
// TODO process rmw_zenoh attachment
const z_loaned_bytes_t *payload = z_sample_payload(sample);
size_t len = z_bytes_len(payload);
dds_istream_t is = {.m_buffer = (unsigned char *)(payload), .m_size = static_cast<int>(len),
.m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2
};
dds_stream_read(&is, data, &dds_allocator, _cdr_ops);
#if defined(Z_FEATURE_UNSTABLE_API)
// Check if payload is contiguous so we can decode directly on that pointer
z_view_slice_t view;
if (z_bytes_get_contiguous_view(payload, &view) == Z_OK) {
const uint8_t *ptr = z_slice_data(z_loan(view));
dds_istream_t is = {.m_buffer = (unsigned char *)(ptr), .m_size = static_cast<int>(len),
.m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2
};
dds_stream_read(&is, data, &dds_allocator, _cdr_ops);
} else
#endif
{
unsigned char reassembled_payload[len];
z_bytes_reader_t reader = z_bytes_get_reader(payload);
z_bytes_reader_read(&reader, reassembled_payload, len);
dds_istream_t is = {.m_buffer = reassembled_payload, .m_size = static_cast<int>(len),
.m_index = 4, .m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2
};
dds_stream_read(&is, data, &dds_allocator, _cdr_ops);
}
// As long as we don't have timesynchronization between Zenoh nodes
// we've to manually set the timestamp