From 61e2f566ca5662c93420c6f9c460e6b556be84b8 Mon Sep 17 00:00:00 2001 From: Peter van der Perk Date: Sun, 4 May 2025 15:27:47 +0200 Subject: [PATCH] Zenoh config, lv and connection fixes Fixes a bug in the csv parsing Use % for / seperators in ros2_lv On startup retry connecting --- src/modules/zenoh/zenoh.cpp | 173 +++++++++++++++++------------ src/modules/zenoh/zenoh_config.cpp | 19 +++- src/modules/zenoh/zenoh_config.hpp | 3 +- 3 files changed, 123 insertions(+), 72 deletions(-) diff --git a/src/modules/zenoh/zenoh.cpp b/src/modules/zenoh/zenoh.cpp index 0e78b6a1a3..3820495118 100644 --- a/src/modules/zenoh/zenoh.cpp +++ b/src/modules/zenoh/zenoh.cpp @@ -140,6 +140,19 @@ int ZENOH::generate_rmw_zenoh_topic_liveliness_keyexpr(const z_id_t *id, const c // NOT REALLY COMPLIANT WITH RMW_ZENOH_CPP but get's the job done // TODO build a correct keyexpr + char topic_lv[TOPIC_INFO_SIZE]; + char *str = &topic_lv[0]; + + strcpy(topic_lv, topic); + + while (*str) { + if (*str == '/') { + *str = '%'; + } + + str++; + } + return snprintf(keyexpr, KEYEXPR_SIZE, "@ros2_lv/%" PRId32 "/" "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x/" @@ -159,7 +172,7 @@ int ZENOH::generate_rmw_zenoh_topic_liveliness_keyexpr(const z_id_t *id, const c px4_guid[4], px4_guid[5], px4_guid[6], px4_guid[7], px4_guid[8], px4_guid[9], px4_guid[10], px4_guid[11], px4_guid[12], px4_guid[13], px4_guid[14], px4_guid[15], - topic, type_camel_case, + topic_lv, type_camel_case, rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3], rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7], rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11], @@ -175,37 +188,39 @@ int ZENOH::setupSession() { char mode[NET_MODE_SIZE]; char locator[NET_LOCATOR_SIZE]; + z_owned_config_t config; int ret = 0; _config.getNetworkConfig(mode, locator); - z_owned_config_t config; - z_config_default(&config); - zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); - - if (locator[0] != 0) { - zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, locator); - - } else if (strcmp(Z_CONFIG_MODE_PEER, mode) == 0) { - zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, Z_CONFIG_MULTICAST_LOCATOR_DEFAULT); - } - PX4_INFO("Opening session..."); - ret = z_open(&s, z_move(config), NULL); - if (ret < 0) { - if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED) { - PX4_ERR("Unable to open session, make sure zenohd is running on %s", locator); + do { + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); - } else if (ret == _Z_ERR_SCOUT_NO_RESULTS) { - PX4_ERR("Unable to open session, scout no results"); + if (locator[0] != 0) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, locator); - } else { - PX4_ERR("Unable to open session, ret: %d", ret); + } else if (strcmp(Z_CONFIG_MODE_PEER, mode) == 0) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, Z_CONFIG_MULTICAST_LOCATOR_DEFAULT); } - return ret; - } + if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED) { + PX4_WARN("Unable to open session, make sure zenohd is running on %s", locator); + + } else if (ret == _Z_ERR_SCOUT_NO_RESULTS) { + PX4_WARN("Unable to open session, scout no results"); + + } else if (ret < 0) { + PX4_WARN("Unable to open session, ret: %d", ret); + } + + if (ret != 0) { + sleep(5); // Wait 5 seconds when doing a retry + } + + } while ((ret = z_open(&s, z_move(config), NULL)) < 0); // Start read and lease tasks for zenoh-pico if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { @@ -260,42 +275,48 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds) char type[TOPIC_INFO_SIZE]; for (i = 0; i < _sub_count; i++) { - _config.getSubscriberMapping(topic, type); - _zenoh_subscribers[i] = genSubscriber(type); - const uint8_t *rihs_hash = getRIHS01_Hash(type); + if (_config.getSubscriberMapping(topic, type)) { + _zenoh_subscribers[i] = genSubscriber(type); + const uint8_t *rihs_hash = getRIHS01_Hash(type); - if (rihs_hash != NULL && _zenoh_subscribers[i] != 0 && - generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) { - _zenoh_subscribers[i]->declare_subscriber(s, keyexpr); + if (rihs_hash != NULL && _zenoh_subscribers[i] != 0 && + generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) { + _zenoh_subscribers[i]->declare_subscriber(s, keyexpr); #ifdef CONFIG_ZENOH_RMW_LIVELINESS - if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MS") > 0) { - z_view_keyexpr_t ke; + if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MS") > 0) { + z_view_keyexpr_t ke; - if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { - printf("%s is not a valid key expression\n", keyexpr); - return -1; + if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression\n", keyexpr); + return -1; + } + + z_owned_liveliness_token_t token; + + if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) { + printf("Unable to create liveliness token!\n"); + return -1; + } } - z_owned_liveliness_token_t token; - - if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) { - printf("Unable to create liveliness token!\n"); - return -1; - } - } - #endif + } else { + PX4_ERR("Could not create a subscriber for type %s", type); + } + } else { - PX4_ERR("Could not create a subscriber for type %s", type); - } - - - if (_config.getSubscriberMapping(topic, type) < 0) { - PX4_WARN("Subscriber mapping parsing error"); + _zenoh_publishers[i] = NULL; + PX4_ERR("Error parsing publisher config at index %i", i); } } + + if (_config.getSubscriberMapping(topic, type) < 0) { + PX4_WARN("Subscriber mapping parsing error"); + } + + _config.closePubSubMapping(); } #endif @@ -308,42 +329,50 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds) char type[TOPIC_INFO_SIZE]; for (i = 0; i < _pub_count; i++) { - _config.getPublisherMapping(topic, type); - _zenoh_publishers[i] = genPublisher(type); - const uint8_t *rihs_hash = getRIHS01_Hash(type); + if (_config.getPublisherMapping(topic, type)) { + printf("Index %i ret %i Topic %s Type %s", i, ret, topic, type); + _zenoh_publishers[i] = genPublisher(type); + const uint8_t *rihs_hash = getRIHS01_Hash(type); - if (rihs_hash && _zenoh_publishers[i] != 0 && - generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) { - _zenoh_publishers[i]->declare_publisher(s, keyexpr, (uint8_t *)&px4_guid); - _zenoh_publishers[i]->setPollFD(&pfds[i]); + if (rihs_hash && _zenoh_publishers[i] != 0 && + generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) { + _zenoh_publishers[i]->declare_publisher(s, keyexpr, (uint8_t *)&px4_guid); + _zenoh_publishers[i]->setPollFD(&pfds[i]); #ifdef CONFIG_ZENOH_RMW_LIVELINESS - if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MP") > 0) { - z_view_keyexpr_t ke; + if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MP") > 0) { + z_view_keyexpr_t ke; - if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { - printf("%s is not a valid key expression\n", keyexpr); - return -1; + if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression\n", keyexpr); + return -1; + } + + z_owned_liveliness_token_t token; + + if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) { + printf("Unable to create liveliness token!\n"); + return -1; + } } - z_owned_liveliness_token_t token; - - if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) { - printf("Unable to create liveliness token!\n"); - return -1; - } - } - #endif + } else { + PX4_ERR("Could not create a publisher for type %s", type); + } + } else { - PX4_ERR("Could not create a publisher for type %s", type); + _zenoh_publishers[i] = NULL; + PX4_ERR("Error parsing publisher config at index %i", i); } } if (_config.getSubscriberMapping(topic, type) < 0) { PX4_WARN("Publisher mapping parsing error"); } + + _config.closePubSubMapping(); } #endif @@ -377,6 +406,8 @@ void ZENOH::run() } } + hrt_abstime start = hrt_absolute_time(); + while (!should_exit()) { int pret = px4_poll(pfds, _pub_count, 100); @@ -395,6 +426,12 @@ void ZENOH::run() } } } + + if (hrt_elapsed_time(&start) > 1 * 1000000) { + //PX4_INFO("Keep alive??\n"); + //zp_send_keep_alive(z_loan(s), NULL); + start = hrt_absolute_time(); + } } // Exiting cleaning up publisher and subscribers diff --git a/src/modules/zenoh/zenoh_config.cpp b/src/modules/zenoh/zenoh_config.cpp index 840770de93..8246482c19 100644 --- a/src/modules/zenoh/zenoh_config.cpp +++ b/src/modules/zenoh/zenoh_config.cpp @@ -261,6 +261,19 @@ int Zenoh_Config::getLineCount(const char *filename) return lines; } +int Zenoh_Config::closePubSubMapping() +{ + if (fp_mapping != NULL) { + //Close the file + fclose(fp_mapping); + fp_mapping = NULL; + return 0; + } + + return 0; +} + + // Very rudamentary here but we've to wait for a more advanced param system int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename) { @@ -272,6 +285,8 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename if (fp_mapping) { while (fgets(buffer, MAX_LINE_SIZE, fp_mapping) != NULL) { + printf("getPubSubMapping %s", buffer); + if (buffer[0] != '\n') { const char *config_type = get_csv_field(buffer, 2); const char *config_topic = get_csv_field(buffer, 1); @@ -294,9 +309,7 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename } //Close the file - fclose(fp_mapping); - fp_mapping = NULL; - return 0; + return closePubSubMapping(); } diff --git a/src/modules/zenoh/zenoh_config.hpp b/src/modules/zenoh/zenoh_config.hpp index 3c47779ecd..444275834b 100644 --- a/src/modules/zenoh/zenoh_config.hpp +++ b/src/modules/zenoh/zenoh_config.hpp @@ -60,7 +60,7 @@ #define KEYEXPR_RIHS01_SIZE sizeof("RIHS01_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") #define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::" #define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME) -#define TOPIC_INFO_SIZE (64) +#define TOPIC_INFO_SIZE (96) #define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE) #define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128) @@ -89,6 +89,7 @@ public: { return getPubSubMapping(topic, type, ZENOH_SUB_CONFIG_PATH); } + int closePubSubMapping(); private: