Add payload tx/rx timeouts to DDS

* Add tx/rx timeouts

* Code style & tx default timeout

* Clarify TX/RX disable value
This commit is contained in:
Alexander Lerach 2025-02-14 14:54:42 +01:00 committed by GitHub
parent f2471861a3
commit 430be08131
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 147 additions and 55 deletions

View File

@ -105,3 +105,27 @@ parameters:
category: System
reboot_required: true
default: 0
UXRCE_DDS_TX_TO:
description:
short: TX rate timeout configuration
long: |
Specifies after how many seconds without sending data the DDS connection is reestablished.
A value less than one disables the TX rate timeout.
type: int32
category: System
reboot_required: true
default: 3
unit: s
UXRCE_DDS_RX_TO:
description:
short: RX rate timeout configuration
long: |
Specifies after how many seconds without receiving data the DDS connection is reestablished.
A value less than one disables the RX rate timeout.
type: int32
category: System
reboot_required: true
default: -1
unit: s

View File

@ -209,7 +209,7 @@ void UxrceddsClient::deinit()
_comm = nullptr;
}
bool UxrceddsClient::setup_session(uxrSession *session)
bool UxrceddsClient::setupSession(uxrSession *session)
{
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get());
_synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0);
@ -379,7 +379,7 @@ bool UxrceddsClient::setup_session(uxrSession *session)
return true;
}
void UxrceddsClient::delete_session(uxrSession *session)
void UxrceddsClient::deleteSession(uxrSession *session)
{
delete_repliers();
@ -472,6 +472,20 @@ static void fillMessageFormatResponse(const message_format_request_s &message_fo
message_format_response.timestamp = hrt_absolute_time();
}
void UxrceddsClient::calculateTxRxRate()
{
const hrt_abstime now = hrt_absolute_time();
if (now - _last_status_update > 1_s) {
float dt = (now - _last_status_update) / 1e6f;
_last_payload_tx_rate = (_subs->num_payload_sent - _last_num_payload_sent) / dt;
_last_payload_rx_rate = (_pubs->num_payload_received - _last_num_payload_received) / dt;
_last_num_payload_sent = _subs->num_payload_sent;
_last_num_payload_received = _pubs->num_payload_received;
_last_status_update = now;
}
}
void UxrceddsClient::handleMessageFormatRequest()
{
message_format_request_s message_format_request;
@ -483,6 +497,87 @@ void UxrceddsClient::handleMessageFormatRequest()
}
}
void UxrceddsClient::checkConnectivity(uxrSession *session)
{
// Reset TX zero counter, when data is sent
if (_last_payload_tx_rate > 0) {
_num_tx_rate_zero = 0;
}
// Reset RX zero counter, when data is received
if (_last_payload_rx_rate > 0) {
_num_rx_rate_zero = 0;
}
const hrt_abstime now = hrt_absolute_time();
// Start ping and tx/rx rate monitoring, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
_num_pings_missed = 0;
_last_ping = now;
} else {
if (hrt_elapsed_time(&_last_ping) > 1_s) {
// Check payload tx rate
if (_last_payload_tx_rate == 0) {
_num_tx_rate_zero++;
}
// Check payload rx rate
if (_last_payload_rx_rate == 0) {
_num_rx_rate_zero++;
}
// Check ping
_last_ping = now;
if (_had_ping_reply) {
_num_pings_missed = 0;
} else {
++_num_pings_missed;
}
int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(session, timeout_ms, attempts);
_had_ping_reply = false;
}
if (_num_pings_missed >= 3) {
PX4_ERR("No ping response, disconnecting");
_connected = false;
}
int32_t tx_timeout = _param_uxrce_dds_tx_to.get();
int32_t rx_timeout = _param_uxrce_dds_rx_to.get();
if (tx_timeout > 0 && _num_tx_rate_zero >= tx_timeout) {
PX4_ERR("Payload TX rate zero for too long, disconnecting");
_connected = false;
}
if (rx_timeout > 0 && _num_rx_rate_zero >= rx_timeout) {
PX4_ERR("Payload RX rate zero for too long, disconnecting");
_connected = false;
}
}
}
void UxrceddsClient::resetConnectivityCounters()
{
_last_status_update = hrt_absolute_time();
_last_ping = hrt_absolute_time();
_had_ping_reply = false;
_num_pings_missed = 0;
_last_num_payload_sent = 0;
_last_num_payload_received = 0;
_num_tx_rate_zero = 0;
_num_rx_rate_zero = 0;
}
void UxrceddsClient::syncSystemClock(uxrSession *session)
{
struct timespec ts = {};
@ -535,8 +630,8 @@ void UxrceddsClient::run()
continue;
}
if (!setup_session(&session)) {
delete_session(&session);
if (!setupSession(&session)) {
deleteSession(&session);
px4_usleep(1'000'000);
PX4_ERR("session setup failed, will retry now");
continue;
@ -552,13 +647,8 @@ void UxrceddsClient::run()
}
hrt_abstime last_sync_session = 0;
hrt_abstime last_status_update = hrt_absolute_time();
hrt_abstime last_ping = hrt_absolute_time();
int num_pings_missed = 0;
bool had_ping_reply = false;
uint32_t last_num_payload_sent{};
uint32_t last_num_payload_received{};
int poll_error_counter = 0;
resetConnectivityCounters();
_subs->init();
_subs_initialized = true;
@ -629,55 +719,19 @@ void UxrceddsClient::run()
// Check for a ping response
/* PONG_IN_SESSION_STATUS */
if (session.on_pong_flag == 1) {
had_ping_reply = true;
_had_ping_reply = true;
}
const hrt_abstime now = hrt_absolute_time();
// Calculate the payload tx/rx rate for connectivity monitoring
calculateTxRxRate();
if (now - last_status_update > 1_s) {
float dt = (now - last_status_update) / 1e6f;
_last_payload_tx_rate = (_subs->num_payload_sent - last_num_payload_sent) / dt;
_last_payload_rx_rate = (_pubs->num_payload_received - last_num_payload_received) / dt;
last_num_payload_sent = _subs->num_payload_sent;
last_num_payload_received = _pubs->num_payload_received;
last_status_update = now;
}
// Handle ping, unless we're actively sending & receiving payloads successfully
if ((_last_payload_tx_rate > 0) && (_last_payload_rx_rate > 0)) {
_connected = true;
num_pings_missed = 0;
last_ping = now;
} else {
if (hrt_elapsed_time(&last_ping) > 1_s) {
last_ping = now;
if (had_ping_reply) {
num_pings_missed = 0;
} else {
++num_pings_missed;
}
int timeout_ms = 1'000; // 1 second
uint8_t attempts = 1;
uxr_ping_agent_session(&session, timeout_ms, attempts);
had_ping_reply = false;
}
if (num_pings_missed >= 3) {
PX4_INFO("No ping response, disconnecting");
_connected = false;
}
}
// Check if there is still connectivity with the agent
checkConnectivity(&session);
perf_end(_loop_perf);
}
delete_session(&session);
deleteSession(&session);
}
}

View File

@ -120,13 +120,17 @@ private:
bool init();
void deinit();
bool setup_session(uxrSession *session);
void delete_session(uxrSession *session);
bool setupSession(uxrSession *session);
void deleteSession(uxrSession *session);
bool setBaudrate(int fd, unsigned baud);
void handleMessageFormatRequest();
void calculateTxRxRate();
void checkConnectivity(uxrSession *session);
void resetConnectivityCounters();
uORB::Publication<message_format_response_s> _message_format_response_pub{ORB_ID(message_format_response)};
uORB::Subscription _message_format_request_sub{ORB_ID(message_format_request)};
@ -179,6 +183,14 @@ private:
uxrCommunication *_comm{nullptr};
int _fd{-1};
hrt_abstime _last_status_update;
hrt_abstime _last_ping;
bool _had_ping_reply{false};
int _num_pings_missed{0};
int32_t _num_tx_rate_zero{0};
int32_t _num_rx_rate_zero{0};
uint32_t _last_num_payload_sent{0};
uint32_t _last_num_payload_received{0};
int _last_payload_tx_rate{}; ///< in B/s
int _last_payload_rx_rate{}; ///< in B/s
@ -197,6 +209,8 @@ private:
(ParamInt<px4::params::UXRCE_DDS_KEY>) _param_uxrce_key,
(ParamInt<px4::params::UXRCE_DDS_PTCFG>) _param_uxrce_dds_ptcfg,
(ParamInt<px4::params::UXRCE_DDS_SYNCC>) _param_uxrce_dds_syncc,
(ParamInt<px4::params::UXRCE_DDS_SYNCT>) _param_uxrce_dds_synct
(ParamInt<px4::params::UXRCE_DDS_SYNCT>) _param_uxrce_dds_synct,
(ParamInt<px4::params::UXRCE_DDS_TX_TO>) _param_uxrce_dds_tx_to,
(ParamInt<px4::params::UXRCE_DDS_RX_TO>) _param_uxrce_dds_rx_to
)
};