Zenoh optimize memory usage and add optional publish on matching

This commit is contained in:
Peter van der Perk 2025-05-06 22:49:29 +02:00 committed by Beat Küng
parent 7887f16daa
commit 5622565eea
6 changed files with 60 additions and 29 deletions

View File

@ -48,6 +48,16 @@ if(NOT DEFINED CONFIG_NET_TCPPROTO_OPTIONS AND CONFIG_PLATFORM_NUTTX)
set(Z_FEATURE_TCP_NODELAY 0 CACHE STRING "Toggle TCP_NODELAY")
endif()
if(NOT DEFINED CONFIG_PTHREAD_MUTEX_TYPES AND CONFIG_PLATFORM_NUTTX)
message( SEND_ERROR "Pthread mutex is diabled, Zenoh will not function." )
endif()
set(FRAG_MAX_SIZE 512 CACHE STRING "Use this to override the maximum size for fragmented messages")
set(BATCH_UNICAST_SIZE 256 CACHE STRING "Use this to override the maximum unicast batch size")
set(BATCH_MULTICAST_SIZE 256 CACHE STRING "Use this to override the maximum multicast batch size")
set(Z_FEATURE_UNSTABLE_API 1 CACHE STRING "Toggle unstable Zenoh-C API")
px4_add_git_submodule(TARGET git_zenoh-pico PATH "zenoh-pico")
add_subdirectory(zenoh-pico)
unset(MESSAGE_QUIET)
@ -59,11 +69,7 @@ target_compile_options(zenohpico_static PUBLIC -Wno-cast-align
-Wno-unused-result
-Wno-type-limits
-Wno-unused-variable
-Wno-maybe-uninitialized
-DZ_FEATURE_UNSTABLE_API
-DZ_BATCH_SIZE_RX=512
-DZ_BATCH_SIZE_TX=512)
-Wno-maybe-uninitialized)
target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes)
if(CONFIG_PLATFORM_NUTTX)

View File

@ -28,6 +28,13 @@ if MODULES_ZENOH
Declares liveliness tokens with key expressions in the way rmw_zenoh expects them
Allowing to construct ROS2 graphs
config ZENOH_PUB_ON_MATCHING
bool "[EXPERIMENTAL] Only publish data when having matching subscribers"
default n
---help---
Uses the Zenoh matching feature to check whether a publisher has subscribers.
If so, only then publish the data. This is still experimental
# Choose exactly one item
choice ZENOH_PUBSUB_SELECTION

View File

@ -63,6 +63,14 @@ public:
// Update the uORB Subscription and broadcast a Zenoh ROS2 message
virtual int8_t update() override
{
#ifdef CONFIG_ZENOH_PUB_ON_MATCHING
z_matching_status_t status;
if (z_publisher_get_matching_status(z_loan(_pub), &status) == _Z_RES_OK && !status.matching) {
return _Z_RES_OK;
}
#endif
uint8_t data[_uorb_meta->o_size];
orb_copy(_uorb_meta, _uorb_sub, data);
@ -98,6 +106,11 @@ public:
Zenoh_Publisher::print();
}
const char *getName()
{
return _uorb_meta->o_name;
}
private:
const orb_metadata *_uorb_meta;
int _uorb_sub;

View File

@ -44,6 +44,8 @@
Zenoh_Publisher::Zenoh_Publisher()
{
attachment.sequence_number = 0;
attachment.rmw_gid_size = RMW_GID_STORAGE_SIZE;
}
Zenoh_Publisher::~Zenoh_Publisher()
@ -61,8 +63,6 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr,
{
z_view_keyexpr_t ke;
this->rmw_gid = gid;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
@ -73,6 +73,8 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr,
return -1;
}
memcpy(attachment.rmw_gid, gid, RMW_GID_STORAGE_SIZE);
return 0;
}
@ -81,18 +83,13 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size)
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
z_owned_bytes_t attachment;
z_bytes_empty(&attachment);
attachment.sequence_number++;
attachment.time = hrt_absolute_time();
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
z_owned_bytes_t z_attachment;
z_bytes_from_static_buf(&z_attachment, (const uint8_t *)&attachment, RMW_ATTACHEMENT_SIZE);
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++);
ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time());
ze_serializer_serialize_buf(z_loan_mut(serializer), rmw_gid, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), &attachment);
options.attachment = z_move(attachment);
options.attachment = z_move(z_attachment);
z_owned_bytes_t payload;
z_bytes_copy_from_buf(&payload, buf, size);

View File

@ -48,8 +48,15 @@
#include <containers/List.hpp>
#include <zenoh-pico.h>
#define RMW_GID_STORAGE_SIZE 16u
#define RMW_ATTACHEMENT_SIZE (8u + 8u + 1u + RMW_GID_STORAGE_SIZE)
typedef struct __attribute__((__packed__)) RmwAttachment {
int64_t sequence_number;
int64_t time;
uint8_t rmw_gid_size;
uint8_t rmw_gid[RMW_GID_STORAGE_SIZE];
} RmwAttachment;
class Zenoh_Publisher : public ListNode<Zenoh_Publisher *>
{
@ -69,8 +76,5 @@ protected:
int8_t publish(const uint8_t *, int size);
z_owned_publisher_t _pub;
int64_t sequence_number;
/* RMW_GID_STORAGE_SIZE bytes DDS-Like GID */
uint8_t *rmw_gid;
RmwAttachment attachment;
};

View File

@ -419,7 +419,7 @@ void ZENOH::run()
ret = _zenoh_publishers[i]->update();
if (ret < 0) {
PX4_WARN("Publisher error %i", ret);
PX4_WARN("%s Publisher error %i", _zenoh_publishers[i]->getName(), ret);
}
}
@ -502,17 +502,21 @@ int ZENOH::print_status()
PX4_INFO("Publishers");
for (int i = 0; i < _pub_count; i++) {
if (_zenoh_publishers[i]) {
_zenoh_publishers[i]->print();
if (_zenoh_publishers) {
for (int i = 0; i < _pub_count; i++) {
if (_zenoh_publishers[i]) {
_zenoh_publishers[i]->print();
}
}
}
PX4_INFO("Subscribers");
for (int i = 0; i < _sub_count; i++) {
if (_zenoh_subscribers[i]) {
_zenoh_subscribers[i]->print();
if (_zenoh_subscribers) {
for (int i = 0; i < _sub_count; i++) {
if (_zenoh_subscribers[i]) {
_zenoh_subscribers[i]->print();
}
}
}