mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
uxrce_dds_client: add DDS message versioning
This adds "_v" + string(T::MESSAGE_VERSION) to the ROS topic if the message contains a MESSAGE_VERSION field (and the version is non-zero).
This commit is contained in:
parent
136f9f48fc
commit
975ec30c9c
@ -39,11 +39,31 @@ static constexpr int max_topic_size = 512;
|
||||
static_assert(sizeof(@(pub['simple_base_type'])_s) <= max_topic_size, "topic too large, increase max_topic_size");
|
||||
@[ end for]@
|
||||
|
||||
|
||||
|
||||
// SFINAE to use R::MESSAGE_VERSION if it exists, and 0 otherwise
|
||||
template <typename R>
|
||||
class MessageVersionHelper
|
||||
{
|
||||
template <typename C>
|
||||
static constexpr uint32_t get(decltype(&C::MESSAGE_VERSION)) { return C::MESSAGE_VERSION; }
|
||||
template <typename C>
|
||||
static constexpr uint32_t get(...) { return 0; }
|
||||
public:
|
||||
static constexpr uint32_t m = get<R>(0);
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
static constexpr uint32_t get_message_version() {
|
||||
return MessageVersionHelper<T>::m;
|
||||
}
|
||||
|
||||
struct SendSubscription {
|
||||
const struct orb_metadata *orb_meta;
|
||||
uxrObjectId data_writer;
|
||||
const char* dds_type_name;
|
||||
const char* topic;
|
||||
uint32_t message_version;
|
||||
uint32_t topic_size;
|
||||
UcdrSerializeMethod ucdr_serialize_method;
|
||||
};
|
||||
@ -56,6 +76,7 @@ struct SendTopicsSubs {
|
||||
uxr_object_id(0, UXR_INVALID_ID),
|
||||
"@(pub['dds_type'])",
|
||||
"@(pub['topic'])",
|
||||
get_message_version<@(pub['simple_base_type'])_s>(),
|
||||
ucdr_topic_size_@(pub['simple_base_type'])(),
|
||||
&ucdr_serialize_@(pub['simple_base_type']),
|
||||
},
|
||||
@ -101,6 +122,7 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream
|
||||
if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) {
|
||||
// data writer not created yet
|
||||
create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].topic,
|
||||
send_subscriptions[idx].message_version,
|
||||
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer);
|
||||
}
|
||||
|
||||
@ -173,7 +195,8 @@ bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id
|
||||
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
|
||||
{
|
||||
uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['simple_base_type']))) * 2; // use a bit larger queue size than internal
|
||||
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", "@(sub['dds_type'])", queue_depth);
|
||||
uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>();
|
||||
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth);
|
||||
}
|
||||
@[ end for]@
|
||||
|
||||
|
||||
@ -23,29 +23,40 @@ uxrObjectId topic_id_from_orb(ORB_ID orb_id, uint8_t instance = 0)
|
||||
return uxrObjectId{};
|
||||
}
|
||||
|
||||
static bool generate_topic_name(char *topic_name, const char *client_namespace, const char *topic)
|
||||
static bool generate_topic_name(char *topic_name, const char *client_namespace, const char *topic,
|
||||
uint32_t message_version = 0)
|
||||
{
|
||||
if (topic[0] == '/') {
|
||||
topic++;
|
||||
}
|
||||
|
||||
char version[16];
|
||||
|
||||
if (message_version != 0) {
|
||||
snprintf(version, sizeof(version), "_v%u", (unsigned)message_version);
|
||||
version[sizeof(version) - 1] = '\0';
|
||||
|
||||
} else {
|
||||
version[0] = '\0';
|
||||
}
|
||||
|
||||
if (client_namespace != nullptr) {
|
||||
int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s/%s", client_namespace, topic);
|
||||
int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s/%s%s", client_namespace, topic, version);
|
||||
return (ret > 0 && ret < TOPIC_NAME_SIZE);
|
||||
}
|
||||
|
||||
int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s", topic);
|
||||
int ret = snprintf(topic_name, TOPIC_NAME_SIZE, "rt/%s%s", topic, version);
|
||||
return (ret > 0 && ret < TOPIC_NAME_SIZE);
|
||||
}
|
||||
|
||||
static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrObjectId participant_id,
|
||||
ORB_ID orb_id, const char *client_namespace, const char *topic, const char *type_name,
|
||||
ORB_ID orb_id, const char *client_namespace, const char *topic, uint32_t message_version, const char *type_name,
|
||||
uxrObjectId &datawriter_id)
|
||||
{
|
||||
// topic
|
||||
char topic_name[TOPIC_NAME_SIZE];
|
||||
|
||||
if (!generate_topic_name(topic_name, client_namespace, topic)) {
|
||||
if (!generate_topic_name(topic_name, client_namespace, topic, message_version)) {
|
||||
PX4_ERR("topic path too long");
|
||||
return false;
|
||||
}
|
||||
@ -92,12 +103,13 @@ static bool create_data_writer(uxrSession *session, uxrStreamId reliable_out_str
|
||||
|
||||
static bool create_data_reader(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId input_stream_id,
|
||||
uxrObjectId participant_id, uint16_t index, const char *client_namespace, const char *topic,
|
||||
uint32_t message_version,
|
||||
const char *type_name, uint16_t queue_depth)
|
||||
{
|
||||
// topic
|
||||
char topic_name[TOPIC_NAME_SIZE];
|
||||
|
||||
if (!generate_topic_name(topic_name, client_namespace, topic)) {
|
||||
if (!generate_topic_name(topic_name, client_namespace, topic, message_version)) {
|
||||
PX4_ERR("topic path too long");
|
||||
return false;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user