From fee7da696dbdcc19670cd1b2970dc45224680bd2 Mon Sep 17 00:00:00 2001 From: Marco Hauswirth Date: Thu, 18 Dec 2025 14:08:21 +0100 Subject: [PATCH] uxrce_dds: support multi-instance uORB topics Enable DDS bridge to handle multi-instance uORB topics by mapping multiple topic instances to a single DDS topic with instance field. --- src/modules/uxrce_dds_client/dds_topics.h.em | 41 +++++++++++++++++-- src/modules/uxrce_dds_client/dds_topics.yaml | 11 ++++- .../uxrce_dds_client/generate_dds_topics.py | 23 ++++++++++- 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/modules/uxrce_dds_client/dds_topics.h.em b/src/modules/uxrce_dds_client/dds_topics.h.em index 30ba6053ea..6dab6f1426 100644 --- a/src/modules/uxrce_dds_client/dds_topics.h.em +++ b/src/modules/uxrce_dds_client/dds_topics.h.em @@ -162,8 +162,22 @@ struct RcvTopicsPubs { uORB::Publication<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pub{ORB_ID(@(sub['topic_simple']))}; @[ end for]@ -@[ for sub in subscriptions_multi]@ - uORB::PublicationMulti<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pub{ORB_ID(@(sub['topic_simple']))}; +@# Group subscriptions_multi by topic to create arrays +@{ +multi_topic_groups = {} +for sub in subscriptions_multi: + topic_simple = sub['topic_simple'] + if topic_simple not in multi_topic_groups: + multi_topic_groups[topic_simple] = [] + multi_topic_groups[topic_simple].append(sub) +}@ +@[ for topic_simple, subs in multi_topic_groups.items()]@ + uORB::PublicationMulti<@(subs[0]['simple_base_type'])_s> @(topic_simple)_pubs[@(len(subs))] { +@[ for idx, sub in enumerate(subs)]@ + {ORB_ID(@(topic_simple))}@('' if idx == len(subs)-1 else ',') + +@[ end for]@ + }; @[ end for]@ uint32_t num_payload_received{}; @@ -179,7 +193,7 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t pubs->num_payload_received += length; switch (object_id.id) { -@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@ +@[ for idx, sub in enumerate(subscriptions)]@ case @(idx)+ (65535U / 32U) + 1: { @(sub['simple_base_type'])_s data; @@ -190,6 +204,18 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t } break; +@[ end for]@ +@[ for idx, sub in enumerate(subscriptions_multi)]@ + case @(idx + len(subscriptions))+ (65535U / 32U) + 1: { + @(sub['simple_base_type'])_s data; + + if (ucdr_deserialize_@(sub['simple_base_type'])(*ub, data, time_offset_us)) { + //print_message(ORB_ID(@(sub['simple_base_type'])), data); + pubs->@(sub['topic_simple'])_pubs[@(sub['instance'])].publish(data); + } + } + break; + @[ end for]@ default: @@ -200,13 +226,20 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char *client_namespace) { -@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@ +@[ for idx, sub in enumerate(subscriptions)]@ { uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['simple_base_type']))) * 2; // use a bit larger queue size than internal uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>(); create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth); } @[ end for]@ +@[ for idx, sub in enumerate(subscriptions_multi)]@ + { + uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['topic_simple']))) * 2; // use a bit larger queue size than internal + uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>(); + create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx + len(subscriptions)), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth); + } +@[ end for]@ uxr_set_topic_callback(session, on_topic_update, this); diff --git a/src/modules/uxrce_dds_client/dds_topics.yaml b/src/modules/uxrce_dds_client/dds_topics.yaml index dc6cc1f2cf..a23cdbc979 100644 --- a/src/modules/uxrce_dds_client/dds_topics.yaml +++ b/src/modules/uxrce_dds_client/dds_topics.yaml @@ -52,7 +52,7 @@ publications: - topic: /fmu/out/transponder_report type: px4_msgs::msg::TransponderReport - + # - topic: /fmu/out/vehicle_angular_velocity # type: px4_msgs::msg::VehicleAngularVelocity @@ -191,7 +191,7 @@ subscriptions: type: px4_msgs::msg::ActuatorServos - topic: /fmu/in/aux_global_position - type: px4_msgs::msg::VehicleGlobalPosition + type: px4_msgs::msg::AuxGlobalPosition - topic: /fmu/in/fixed_wing_longitudinal_setpoint type: px4_msgs::msg::FixedWingLongitudinalSetpoint @@ -228,3 +228,10 @@ subscriptions: # Create uORB::PublicationMulti subscriptions_multi: + - type: px4_msgs::msg::AuxGlobalPosition + uorb_topic: aux_global_position + ros_topics: + - /fmu/in/aux_global_position_A + - /fmu/in/aux_global_position_B + - /fmu/in/aux_global_position_C + - /fmu/in/aux_global_position_D diff --git a/src/modules/uxrce_dds_client/generate_dds_topics.py b/src/modules/uxrce_dds_client/generate_dds_topics.py index 957ff4b578..7f201eb881 100644 --- a/src/modules/uxrce_dds_client/generate_dds_topics.py +++ b/src/modules/uxrce_dds_client/generate_dds_topics.py @@ -131,11 +131,30 @@ if subs_not_empty: merged_em_globals['subscriptions'] = msg_map['subscriptions'] if subs_not_empty else [] subs_multi_not_empty = msg_map['subscriptions_multi'] is not None +expanded_subs_multi = [] if subs_multi_not_empty: for sm in msg_map['subscriptions_multi']: - process_message_type(sm) + if 'ros_topics' in sm and 'uorb_topic' in sm: + # Expand each DDS topic into a separate entry + # All entries for the same uorb_topic will use the same array + uorb_topic_name = sm['uorb_topic'] + for idx, dds_topic in enumerate(sm['ros_topics']): + expanded_entry = { + 'type': sm['type'], + 'topic': dds_topic, + 'uorb_topic': uorb_topic_name, + 'instance': idx + } + process_message_type(expanded_entry) + expanded_entry['topic_simple'] = uorb_topic_name + expanded_subs_multi.append(expanded_entry) + else: + # Fallback for old-style single topic entries + process_message_type(sm) + sm['instance'] = 0 + expanded_subs_multi.append(sm) -merged_em_globals['subscriptions_multi'] = msg_map['subscriptions_multi'] if subs_multi_not_empty else [] +merged_em_globals['subscriptions_multi'] = expanded_subs_multi merged_em_globals['type_includes'] = sorted(set(all_type_includes))