diff --git a/src/modules/uxrce_dds_client/dds_topics.h.em b/src/modules/uxrce_dds_client/dds_topics.h.em index 69c8fc58cc..81eadef1bb 100644 --- a/src/modules/uxrce_dds_client/dds_topics.h.em +++ b/src/modules/uxrce_dds_client/dds_topics.h.em @@ -39,11 +39,31 @@ static constexpr int max_topic_size = 512; static_assert(sizeof(@(pub['simple_base_type'])_s) <= max_topic_size, "topic too large, increase max_topic_size"); @[ end for]@ + + +// SFINAE to use R::MESSAGE_VERSION if it exists, and 0 otherwise +template +class MessageVersionHelper +{ + template + static constexpr uint32_t get(decltype(&C::MESSAGE_VERSION)) { return C::MESSAGE_VERSION; } + template + static constexpr uint32_t get(...) { return 0; } +public: + static constexpr uint32_t m = get(0); +}; + +template +static constexpr uint32_t get_message_version() { + return MessageVersionHelper::m; +} + struct SendSubscription { const struct orb_metadata *orb_meta; uxrObjectId data_writer; const char* dds_type_name; const char* topic; + uint32_t message_version; uint32_t topic_size; UcdrSerializeMethod ucdr_serialize_method; }; @@ -56,6 +76,7 @@ struct SendTopicsSubs { uxr_object_id(0, UXR_INVALID_ID), "@(pub['dds_type'])", "@(pub['topic'])", + get_message_version<@(pub['simple_base_type'])_s>(), ucdr_topic_size_@(pub['simple_base_type'])(), &ucdr_serialize_@(pub['simple_base_type']), }, @@ -101,6 +122,7 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) { // data writer not created yet create_data_writer(session, reliable_out_stream_id, participant_id, static_cast(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].topic, + send_subscriptions[idx].message_version, send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer); } @@ -173,7 +195,8 @@ bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id @[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@ { uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['simple_base_type']))) * 2; // use a bit larger queue size than internal - create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", "@(sub['dds_type'])", queue_depth); + uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>(); + create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth); } @[ end for]@ diff --git a/src/modules/uxrce_dds_client/utilities.hpp b/src/modules/uxrce_dds_client/utilities.hpp index 70886ec795..c2d038b3ed 100644 --- a/src/modules/uxrce_dds_client/utilities.hpp +++ b/src/modules/uxrce_dds_client/utilities.hpp @@ -23,29 +23,40 @@ uxrObjectId topic_id_from_orb(ORB_ID orb_id, uint8_t instance = 0) return uxrObjectId{}; } -static bool generate_topic_name(char *topic_name, const char *client_namespace, const char *topic) +static bool generate_topic_name(char *topic_name, const char *client_namespace, const char *topic, + uint32_t message_version = 0) { if (topic[0] == '/') { topic++; } + char version[16]; + + if (message_version != 0) { + snprintf(version, sizeof(version), "_v%u", (unsigned)message_version); + version[sizeof(version) - 1] = '\0'; + + } else { + version[0] = '\0'; + } + if (client_namespace != nullptr) { - int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s/%s", client_namespace, topic); + int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s/%s%s", client_namespace, topic, version); return (ret > 0 && ret < TOPIC_NAME_SIZE); } - int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s", topic); + int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s%s", topic, version); return (ret > 0 && ret < TOPIC_NAME_SIZE); } static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id, - ORB_ID orb_id, const char *client_namespace, const char *topic, const char *type_name, + ORB_ID orb_id, const char *client_namespace, const char *topic, uint32_t message_version, const char *type_name, uxrObjectId &datawriter_id) { // topic char topic_name[TOPIC_NAME_SIZE]; - if (!generate_topic_name(topic_name, client_namespace, topic)) { + if (!generate_topic_name(topic_name, client_namespace, topic, message_version)) { PX4_ERR("topic path too long"); return false; } @@ -92,12 +103,13 @@ static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_str static bool create_data_reader(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId input_stream_id, uxrObjectId participant_id, uint16_t index, const char *client_namespace, const char *topic, + uint32_t message_version, const char *type_name, uint16_t queue_depth) { // topic char topic_name[TOPIC_NAME_SIZE]; - if (!generate_topic_name(topic_name, client_namespace, topic)) { + if (!generate_topic_name(topic_name, client_namespace, topic, message_version)) { PX4_ERR("topic path too long"); return false; }