mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-05-23 07:17:35 +08:00
[uxrce_dds_client] Allow for arbitrary topic instances to be bridged (#22350)
Signed-off-by: Beniamino Pozzan <beniamino.pozzan@gmail.com> Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com>
This commit is contained in:
@@ -67,6 +67,7 @@ struct SendSubscription {
|
||||
uint32_t topic_size;
|
||||
UcdrSerializeMethod ucdr_serialize_method;
|
||||
uint64_t publish_interval_ms;
|
||||
uint8_t orb_instance;
|
||||
};
|
||||
|
||||
// Subscribers for messages to send
|
||||
@@ -81,6 +82,7 @@ struct SendTopicsSubs {
|
||||
ucdr_topic_size_@(pub['simple_base_type'])(),
|
||||
&ucdr_serialize_@(pub['simple_base_type']),
|
||||
static_cast<uint64_t>((@(pub.get('rate_limit', 0)) > 0) ? (1e3 / @(pub.get('rate_limit', 1e3))) : UXRCE_DEFAULT_POLL_INTERVAL_MS),
|
||||
@(pub['instance'])
|
||||
},
|
||||
@[ end for]@
|
||||
};
|
||||
@@ -98,13 +100,13 @@ bool SendTopicsSubs::init(uxrSession *session, uxrStreamId reliable_out_stream_i
|
||||
bool ret = true;
|
||||
for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) {
|
||||
if (fds[idx].events == 0) {
|
||||
fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta);
|
||||
fds[idx].fd = orb_subscribe_multi(send_subscriptions[idx].orb_meta, send_subscriptions[idx].orb_instance);
|
||||
fds[idx].events = POLLIN;
|
||||
orb_set_interval(fds[idx].fd, send_subscriptions[idx].publish_interval_ms);
|
||||
}
|
||||
|
||||
if (!create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].topic,
|
||||
send_subscriptions[idx].message_version,
|
||||
send_subscriptions[idx].message_version, send_subscriptions[idx].orb_instance,
|
||||
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer)) {
|
||||
ret = false;
|
||||
}
|
||||
|
||||
@@ -102,12 +102,24 @@ def process_message_type(msg_type):
|
||||
# topic_simple: eg vehicle_status
|
||||
msg_type['topic_simple'] = msg_type['topic'].split('/')[-1]
|
||||
|
||||
def process_message_instance(msg_type):
|
||||
if 'instance' in msg_type:
|
||||
# if instance is given, check if it is a non negative integer
|
||||
if not (type(msg_type['instance']) is int and msg_type['instance'] >= 0) :
|
||||
raise TypeError("`instance` must be a non negative integer")
|
||||
# add trailing instance to topic name
|
||||
msg_type['topic'] = f"{msg_type['topic']}{msg_type['instance']}"
|
||||
else:
|
||||
# if instance is not given,
|
||||
msg_type['instance'] = 0
|
||||
|
||||
merged_em_globals['namespace'] = namespace
|
||||
|
||||
pubs_not_empty = msg_map['publications'] is not None
|
||||
if pubs_not_empty:
|
||||
for p in msg_map['publications']:
|
||||
process_message_type(p)
|
||||
process_message_instance(p)
|
||||
|
||||
merged_em_globals['publications'] = msg_map['publications'] if pubs_not_empty else []
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ static bool generate_topic_name(char *topic_name, const char *client_namespace,
|
||||
}
|
||||
|
||||
static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id,
|
||||
ORB_ID orb_id, const char *client_namespace, const char *topic, uint32_t message_version, const char *type_name,
|
||||
ORB_ID orb_id, const char *client_namespace, const char *topic, uint32_t message_version, uint8_t instance, const char *type_name,
|
||||
uxrObjectId &datawriter_id)
|
||||
{
|
||||
// topic
|
||||
@@ -61,7 +61,7 @@ static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_str
|
||||
return false;
|
||||
}
|
||||
|
||||
uxrObjectId topic_id = topic_id_from_orb(orb_id);
|
||||
uxrObjectId topic_id = topic_id_from_orb(orb_id, instance);
|
||||
uint16_t topic_req = uxr_buffer_create_topic_bin(session, reliable_out_stream_id, topic_id, participant_id, topic_name,
|
||||
type_name, UXR_REPLACE);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user