uxrce_dds_client: immediately create data writers on startup

There is some race condition where in rare cases the topic publication
right after creating the writer did not get received on the ROS side.
This happens even with reliable QoS & reliable transport.
This commit is contained in:
Beat Küng 2024-01-18 13:12:28 +01:00
parent ba35ca461c
commit 6fcfd5fac1
3 changed files with 20 additions and 20 deletions

View File

@ -87,17 +87,27 @@ struct SendTopicsSubs {
uint32_t num_payload_sent{};
void init();
bool init(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char *client_namespace);
void update(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId best_effort_stream_id, uxrObjectId participant_id, const char *client_namespace);
void reset();
};
void SendTopicsSubs::init() {
bool SendTopicsSubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char *client_namespace) {
bool ret = true;
for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) {
fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta);
fds[idx].events = POLLIN;
orb_set_interval(fds[idx].fd, UXRCE_DEFAULT_POLL_RATE);
if (fds[idx].events == 0) {
fds[idx].fd = orb_subscribe(send_subscriptions[idx].orb_meta);
fds[idx].events = POLLIN;
orb_set_interval(fds[idx].fd, UXRCE_DEFAULT_POLL_RATE);
}
if (!create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].topic,
send_subscriptions[idx].message_version,
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer)) {
ret = false;
}
}
return ret;
}
void SendTopicsSubs::reset() {
@ -119,12 +129,6 @@ void SendTopicsSubs::update(uxrSession *session, uxrStreamId reliable_out_stream
if (fds[idx].revents & POLLIN) {
// Topic updated, copy data and send
orb_copy(send_subscriptions[idx].orb_meta, fds[idx].fd, &topic_data);
if (send_subscriptions[idx].data_writer.id == UXR_INVALID_ID) {
// data writer not created yet
create_data_writer(session, reliable_out_stream_id, participant_id, static_cast<ORB_ID>(send_subscriptions[idx].orb_meta->o_id), client_namespace, send_subscriptions[idx].topic,
send_subscriptions[idx].message_version,
send_subscriptions[idx].dds_type_name, send_subscriptions[idx].data_writer);
}
if (send_subscriptions[idx].data_writer.id != UXR_INVALID_ID) {

View File

@ -366,6 +366,11 @@ bool UxrceddsClient::setupSession(uxrSession *session)
return false;
}
if (!_subs->init(session, _reliable_out, reliable_in, best_effort_in, _participant_id, _client_namespace)) {
PX4_ERR("subs init failed");
return false;
}
// create VehicleCommand replier
if (_num_of_repliers < MAX_NUM_REPLIERS) {
if (add_replier(new VehicleCommandSrv(session, _reliable_out, reliable_in, _participant_id, _client_namespace,
@ -388,11 +393,6 @@ void UxrceddsClient::deleteSession(uxrSession *session)
_session_created = false;
}
if (_subs_initialized) {
_subs->reset();
_subs_initialized = false;
}
_last_payload_tx_rate = 0;
_timesync.reset_filter();
}
@ -650,9 +650,6 @@ void UxrceddsClient::run()
int poll_error_counter = 0;
resetConnectivityCounters();
_subs->init();
_subs_initialized = true;
while (!should_exit() && _connected) {
perf_begin(_loop_perf);
perf_count(_loop_interval_perf);

View File

@ -197,7 +197,6 @@ private:
bool _connected{false};
bool _session_created{false};
bool _timesync_converged{false};
bool _subs_initialized{false};
Timesync _timesync{timesync_status_s::SOURCE_PROTOCOL_DDS};