diff --git a/src/modules/uxrce_dds_client/module.yaml b/src/modules/uxrce_dds_client/module.yaml index 4e5db21160..773475f1f5 100644 --- a/src/modules/uxrce_dds_client/module.yaml +++ b/src/modules/uxrce_dds_client/module.yaml @@ -105,3 +105,27 @@ parameters: category: System reboot_required: true default: 0 + + UXRCE_DDS_TX_TO: + description: + short: TX rate timeout configuration + long: | + Specifies after how many seconds without sending data the DDS connection is reestablished. + A value less than one disables the TX rate timeout. + type: int32 + category: System + reboot_required: true + default: 3 + unit: s + + UXRCE_DDS_RX_TO: + description: + short: RX rate timeout configuration + long: | + Specifies after how many seconds without receiving data the DDS connection is reestablished. + A value less than one disables the RX rate timeout. + type: int32 + category: System + reboot_required: true + default: -1 + unit: s diff --git a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp index 339bedd7a9..b550912791 100644 --- a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp +++ b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp @@ -209,7 +209,7 @@ void UxrceddsClient::deinit() _comm = nullptr; } -bool UxrceddsClient::setup_session(uxrSession *session) +bool UxrceddsClient::setupSession(uxrSession *session) { _participant_config = static_cast(_param_uxrce_dds_ptcfg.get()); _synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0); @@ -379,7 +379,7 @@ bool UxrceddsClient::setup_session(uxrSession *session) return true; } -void UxrceddsClient::delete_session(uxrSession *session) +void UxrceddsClient::deleteSession(uxrSession *session) { delete_repliers(); @@ -472,6 +472,20 @@ static void fillMessageFormatResponse(const message_format_request_s &message_fo message_format_response.timestamp = hrt_absolute_time(); } +void UxrceddsClient::calculateTxRxRate() +{ + const hrt_abstime now = hrt_absolute_time(); + + if (now - _last_status_update > 1_s) { + float dt = (now - _last_status_update) / 1e6f; + _last_payload_tx_rate = (_subs->num_payload_sent - _last_num_payload_sent) / dt; + _last_payload_rx_rate = (_pubs->num_payload_received - _last_num_payload_received) / dt; + _last_num_payload_sent = _subs->num_payload_sent; + _last_num_payload_received = _pubs->num_payload_received; + _last_status_update = now; + } +} + void UxrceddsClient::handleMessageFormatRequest() { message_format_request_s message_format_request; @@ -483,6 +497,87 @@ void UxrceddsClient::handleMessageFormatRequest() } } +void UxrceddsClient::checkConnectivity(uxrSession *session) +{ + // Reset TX zero counter, when data is sent + if (_last_payload_tx_rate > 0) { + _num_tx_rate_zero = 0; + } + + // Reset RX zero counter, when data is received + if (_last_payload_rx_rate > 0) { + _num_rx_rate_zero = 0; + } + + const hrt_abstime now = hrt_absolute_time(); + + // Start ping and tx/rx rate monitoring, unless we're actively sending & receiving payloads successfully + if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) { + _connected = true; + _num_pings_missed = 0; + _last_ping = now; + + } else { + if (hrt_elapsed_time(&_last_ping) > 1_s) { + // Check payload tx rate + if (_last_payload_tx_rate == 0) { + _num_tx_rate_zero++; + } + + // Check payload rx rate + if (_last_payload_rx_rate == 0) { + _num_rx_rate_zero++; + } + + // Check ping + _last_ping = now; + + if (_had_ping_reply) { + _num_pings_missed = 0; + + } else { + ++_num_pings_missed; + } + + int timeout_ms = 1'000; // 1 second + uint8_t attempts = 1; + uxr_ping_agent_session(session, timeout_ms, attempts); + + _had_ping_reply = false; + } + + if (_num_pings_missed >= 3) { + PX4_ERR("No ping response, disconnecting"); + _connected = false; + } + + int32_t tx_timeout = _param_uxrce_dds_tx_to.get(); + int32_t rx_timeout = _param_uxrce_dds_rx_to.get(); + + if (tx_timeout > 0 && _num_tx_rate_zero >= tx_timeout) { + PX4_ERR("Payload TX rate zero for too long, disconnecting"); + _connected = false; + } + + if (rx_timeout > 0 && _num_rx_rate_zero >= rx_timeout) { + PX4_ERR("Payload RX rate zero for too long, disconnecting"); + _connected = false; + } + } +} + +void UxrceddsClient::resetConnectivityCounters() +{ + _last_status_update = hrt_absolute_time(); + _last_ping = hrt_absolute_time(); + _had_ping_reply = false; + _num_pings_missed = 0; + _last_num_payload_sent = 0; + _last_num_payload_received = 0; + _num_tx_rate_zero = 0; + _num_rx_rate_zero = 0; +} + void UxrceddsClient::syncSystemClock(uxrSession *session) { struct timespec ts = {}; @@ -535,8 +630,8 @@ void UxrceddsClient::run() continue; } - if (!setup_session(&session)) { - delete_session(&session); + if (!setupSession(&session)) { + deleteSession(&session); px4_usleep(1'000'000); PX4_ERR("session setup failed, will retry now"); continue; @@ -552,13 +647,8 @@ void UxrceddsClient::run() } hrt_abstime last_sync_session = 0; - hrt_abstime last_status_update = hrt_absolute_time(); - hrt_abstime last_ping = hrt_absolute_time(); - int num_pings_missed = 0; - bool had_ping_reply = false; - uint32_t last_num_payload_sent{}; - uint32_t last_num_payload_received{}; int poll_error_counter = 0; + resetConnectivityCounters(); _subs->init(); _subs_initialized = true; @@ -629,55 +719,19 @@ void UxrceddsClient::run() // Check for a ping response /* PONG_IN_SESSION_STATUS */ if (session.on_pong_flag == 1) { - had_ping_reply = true; + _had_ping_reply = true; } - const hrt_abstime now = hrt_absolute_time(); + // Calculate the payload tx/rx rate for connectivity monitoring + calculateTxRxRate(); - if (now - last_status_update > 1_s) { - float dt = (now - last_status_update) / 1e6f; - _last_payload_tx_rate = (_subs->num_payload_sent - last_num_payload_sent) / dt; - _last_payload_rx_rate = (_pubs->num_payload_received - last_num_payload_received) / dt; - last_num_payload_sent = _subs->num_payload_sent; - last_num_payload_received = _pubs->num_payload_received; - last_status_update = now; - } - - // Handle ping, unless we're actively sending & receiving payloads successfully - if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) { - _connected = true; - num_pings_missed = 0; - last_ping = now; - - } else { - if (hrt_elapsed_time(&last_ping) > 1_s) { - last_ping = now; - - if (had_ping_reply) { - num_pings_missed = 0; - - } else { - ++num_pings_missed; - } - - int timeout_ms = 1'000; // 1 second - uint8_t attempts = 1; - uxr_ping_agent_session(&session, timeout_ms, attempts); - - had_ping_reply = false; - } - - if (num_pings_missed >= 3) { - PX4_INFO("No ping response, disconnecting"); - _connected = false; - } - } + // Check if there is still connectivity with the agent + checkConnectivity(&session); perf_end(_loop_perf); - } - delete_session(&session); + deleteSession(&session); } } diff --git a/src/modules/uxrce_dds_client/uxrce_dds_client.h b/src/modules/uxrce_dds_client/uxrce_dds_client.h index 2a75d73e4a..823e242f9d 100644 --- a/src/modules/uxrce_dds_client/uxrce_dds_client.h +++ b/src/modules/uxrce_dds_client/uxrce_dds_client.h @@ -120,13 +120,17 @@ private: bool init(); void deinit(); - bool setup_session(uxrSession *session); - void delete_session(uxrSession *session); + bool setupSession(uxrSession *session); + void deleteSession(uxrSession *session); bool setBaudrate(int fd, unsigned baud); void handleMessageFormatRequest(); + void calculateTxRxRate(); + void checkConnectivity(uxrSession *session); + void resetConnectivityCounters(); + uORB::Publication _message_format_response_pub{ORB_ID(message_format_response)}; uORB::Subscription _message_format_request_sub{ORB_ID(message_format_request)}; @@ -179,6 +183,14 @@ private: uxrCommunication *_comm{nullptr}; int _fd{-1}; + hrt_abstime _last_status_update; + hrt_abstime _last_ping; + bool _had_ping_reply{false}; + int _num_pings_missed{0}; + int32_t _num_tx_rate_zero{0}; + int32_t _num_rx_rate_zero{0}; + uint32_t _last_num_payload_sent{0}; + uint32_t _last_num_payload_received{0}; int _last_payload_tx_rate{}; ///< in B/s int _last_payload_rx_rate{}; ///< in B/s @@ -197,6 +209,8 @@ private: (ParamInt) _param_uxrce_key, (ParamInt) _param_uxrce_dds_ptcfg, (ParamInt) _param_uxrce_dds_syncc, - (ParamInt) _param_uxrce_dds_synct + (ParamInt) _param_uxrce_dds_synct, + (ParamInt) _param_uxrce_dds_tx_to, + (ParamInt) _param_uxrce_dds_rx_to ) };