Zenoh config, lv and connection fixes

Fixes a bug in the csv parsing
Use % for / seperators in ros2_lv
On startup retry connecting
This commit is contained in:
Peter van der Perk 2025-05-04 15:27:47 +02:00 committed by Beat Küng
parent 3d30eaae5f
commit 61e2f566ca
3 changed files with 123 additions and 72 deletions

View File

@ -140,6 +140,19 @@ int ZENOH::generate_rmw_zenoh_topic_liveliness_keyexpr(const z_id_t *id, const c
// NOT REALLY COMPLIANT WITH RMW_ZENOH_CPP but get's the job done
// TODO build a correct keyexpr
char topic_lv[TOPIC_INFO_SIZE];
char *str = &topic_lv[0];
strcpy(topic_lv, topic);
while (*str) {
if (*str == '/') {
*str = '%';
}
str++;
}
return snprintf(keyexpr, KEYEXPR_SIZE,
"@ros2_lv/%" PRId32 "/"
"%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x/"
@ -159,7 +172,7 @@ int ZENOH::generate_rmw_zenoh_topic_liveliness_keyexpr(const z_id_t *id, const c
px4_guid[4], px4_guid[5], px4_guid[6], px4_guid[7],
px4_guid[8], px4_guid[9], px4_guid[10], px4_guid[11],
px4_guid[12], px4_guid[13], px4_guid[14], px4_guid[15],
topic, type_camel_case,
topic_lv, type_camel_case,
rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3],
rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7],
rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11],
@ -175,37 +188,39 @@ int ZENOH::setupSession()
{
char mode[NET_MODE_SIZE];
char locator[NET_LOCATOR_SIZE];
z_owned_config_t config;
int ret = 0;
_config.getNetworkConfig(mode, locator);
z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (locator[0] != 0) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, locator);
} else if (strcmp(Z_CONFIG_MODE_PEER, mode) == 0) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, Z_CONFIG_MULTICAST_LOCATOR_DEFAULT);
}
PX4_INFO("Opening session...");
ret = z_open(&s, z_move(config), NULL);
if (ret < 0) {
if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED) {
PX4_ERR("Unable to open session, make sure zenohd is running on %s", locator);
do {
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
} else if (ret == _Z_ERR_SCOUT_NO_RESULTS) {
PX4_ERR("Unable to open session, scout no results");
if (locator[0] != 0) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, locator);
} else {
PX4_ERR("Unable to open session, ret: %d", ret);
} else if (strcmp(Z_CONFIG_MODE_PEER, mode) == 0) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, Z_CONFIG_MULTICAST_LOCATOR_DEFAULT);
}
return ret;
}
if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED) {
PX4_WARN("Unable to open session, make sure zenohd is running on %s", locator);
} else if (ret == _Z_ERR_SCOUT_NO_RESULTS) {
PX4_WARN("Unable to open session, scout no results");
} else if (ret < 0) {
PX4_WARN("Unable to open session, ret: %d", ret);
}
if (ret != 0) {
sleep(5); // Wait 5 seconds when doing a retry
}
} while ((ret = z_open(&s, z_move(config), NULL)) < 0);
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
@ -260,42 +275,48 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
char type[TOPIC_INFO_SIZE];
for (i = 0; i < _sub_count; i++) {
_config.getSubscriberMapping(topic, type);
_zenoh_subscribers[i] = genSubscriber(type);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
if (_config.getSubscriberMapping(topic, type)) {
_zenoh_subscribers[i] = genSubscriber(type);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
if (rihs_hash != NULL && _zenoh_subscribers[i] != 0 &&
generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) {
_zenoh_subscribers[i]->declare_subscriber(s, keyexpr);
if (rihs_hash != NULL && _zenoh_subscribers[i] != 0 &&
generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) {
_zenoh_subscribers[i]->declare_subscriber(s, keyexpr);
#ifdef CONFIG_ZENOH_RMW_LIVELINESS
if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MS") > 0) {
z_view_keyexpr_t ke;
if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MS") > 0) {
z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
}
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
return -1;
}
}
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
return -1;
}
}
#endif
} else {
PX4_ERR("Could not create a subscriber for type %s", type);
}
} else {
PX4_ERR("Could not create a subscriber for type %s", type);
}
if (_config.getSubscriberMapping(topic, type) < 0) {
PX4_WARN("Subscriber mapping parsing error");
_zenoh_publishers[i] = NULL;
PX4_ERR("Error parsing publisher config at index %i", i);
}
}
if (_config.getSubscriberMapping(topic, type) < 0) {
PX4_WARN("Subscriber mapping parsing error");
}
_config.closePubSubMapping();
}
#endif
@ -308,42 +329,50 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
char type[TOPIC_INFO_SIZE];
for (i = 0; i < _pub_count; i++) {
_config.getPublisherMapping(topic, type);
_zenoh_publishers[i] = genPublisher(type);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
if (_config.getPublisherMapping(topic, type)) {
printf("Index %i ret %i Topic %s Type %s", i, ret, topic, type);
_zenoh_publishers[i] = genPublisher(type);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
if (rihs_hash && _zenoh_publishers[i] != 0 &&
generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) {
_zenoh_publishers[i]->declare_publisher(s, keyexpr, (uint8_t *)&px4_guid);
_zenoh_publishers[i]->setPollFD(&pfds[i]);
if (rihs_hash && _zenoh_publishers[i] != 0 &&
generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) {
_zenoh_publishers[i]->declare_publisher(s, keyexpr, (uint8_t *)&px4_guid);
_zenoh_publishers[i]->setPollFD(&pfds[i]);
#ifdef CONFIG_ZENOH_RMW_LIVELINESS
if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MP") > 0) {
z_view_keyexpr_t ke;
if (generate_rmw_zenoh_topic_liveliness_keyexpr(&self_id, topic, rihs_hash, type, keyexpr, "MP") > 0) {
z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
}
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
return -1;
}
}
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
return -1;
}
}
#endif
} else {
PX4_ERR("Could not create a publisher for type %s", type);
}
} else {
PX4_ERR("Could not create a publisher for type %s", type);
_zenoh_publishers[i] = NULL;
PX4_ERR("Error parsing publisher config at index %i", i);
}
}
if (_config.getSubscriberMapping(topic, type) < 0) {
PX4_WARN("Publisher mapping parsing error");
}
_config.closePubSubMapping();
}
#endif
@ -377,6 +406,8 @@ void ZENOH::run()
}
}
hrt_abstime start = hrt_absolute_time();
while (!should_exit()) {
int pret = px4_poll(pfds, _pub_count, 100);
@ -395,6 +426,12 @@ void ZENOH::run()
}
}
}
if (hrt_elapsed_time(&start) > 1 * 1000000) {
//PX4_INFO("Keep alive??\n");
//zp_send_keep_alive(z_loan(s), NULL);
start = hrt_absolute_time();
}
}
// Exiting cleaning up publisher and subscribers

View File

@ -261,6 +261,19 @@ int Zenoh_Config::getLineCount(const char *filename)
return lines;
}
int Zenoh_Config::closePubSubMapping()
{
if (fp_mapping != NULL) {
//Close the file
fclose(fp_mapping);
fp_mapping = NULL;
return 0;
}
return 0;
}
// Very rudamentary here but we've to wait for a more advanced param system
int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename)
{
@ -272,6 +285,8 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename
if (fp_mapping) {
while (fgets(buffer, MAX_LINE_SIZE, fp_mapping) != NULL) {
printf("getPubSubMapping %s", buffer);
if (buffer[0] != '\n') {
const char *config_type = get_csv_field(buffer, 2);
const char *config_topic = get_csv_field(buffer, 1);
@ -294,9 +309,7 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, const char *filename
}
//Close the file
fclose(fp_mapping);
fp_mapping = NULL;
return 0;
return closePubSubMapping();
}

View File

@ -60,7 +60,7 @@
#define KEYEXPR_RIHS01_SIZE sizeof("RIHS01_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
#define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::"
#define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME)
#define TOPIC_INFO_SIZE (64)
#define TOPIC_INFO_SIZE (96)
#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE)
#define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128)
@ -89,6 +89,7 @@ public:
{
return getPubSubMapping(topic, type, ZENOH_SUB_CONFIG_PATH);
}
int closePubSubMapping();
private: