diff --git a/src/drivers/protocol_splitter/protocol_splitter.cpp b/src/drivers/protocol_splitter/protocol_splitter.cpp index 017cf0cb16..8d84d46e84 100644 --- a/src/drivers/protocol_splitter/protocol_splitter.cpp +++ b/src/drivers/protocol_splitter/protocol_splitter.cpp @@ -58,8 +58,6 @@ #include -static constexpr uint64_t reader_timeout_us = 1000000; - class Mavlink2Dev; class RtpsDev; class ReadBuffer; @@ -120,6 +118,7 @@ perf_counter_t mavlink_messages_parsed_count; perf_counter_t mavlink_bytes_parsed_count; perf_counter_t rtps_messages_parsed_count; perf_counter_t rtps_bytes_parsed_count; +perf_counter_t buffer_drops; class ReadBuffer { @@ -134,6 +133,8 @@ public: uint8_t buffer[1024] = {}; size_t buf_size = 0; + static const size_t BUFFER_THRESHOLD = sizeof(buffer) * 0.8; + // We keep track of the first Mavlink and Rtps packet in the buffer. // If start and end are equal there is no packet. size_t start_mavlink = 0; @@ -146,18 +147,30 @@ public: size_t bytes_received = 0; size_t bytes_lost = 0; size_t header_bytes_received = 0; - - // To keep track of readers. - hrt_abstime last_mavlink_read = 0; - hrt_abstime last_rtps_read = 0; }; int ReadBuffer::read(int fd) { - if (sizeof(buffer) == buf_size) { - // This happens if one consumer does not read the data, or not fast enough. - // TODO: add a mechanism to thrown away data if a user is no longer reading. - PX4_DEBUG("Buffer full: %zu %zu %zu %zu", start_mavlink, end_mavlink, start_rtps, end_rtps); + if (buf_size > BUFFER_THRESHOLD) { + // Drop the buffer if it's too full. This is not expected to happen, but it might, if one of the readers + // is too slow. In that case it's best to make space for new data, otherwise the faster reader might + // busy-loop w/o getting new data. + perf_count(buffer_drops); + + PX4_DEBUG("Buffer full, dropping: %zu %zu %zu %zu", start_mavlink, end_mavlink, start_rtps, end_rtps); + + // Drop only as much as we have to + size_t num_remove = math::max(start_mavlink, start_rtps); + + if (num_remove == 0) { + num_remove = buf_size; + } + + remove(0, num_remove); + start_mavlink -= math::min(num_remove, start_mavlink); + end_mavlink -= math::min(num_remove, end_mavlink); + start_rtps -= math::min(num_remove, start_rtps); + end_rtps -= math::min(num_remove, end_rtps); } int bytes_available = 0; @@ -264,7 +277,6 @@ protected: int try_to_copy_data(char *buffer, size_t buflen, MessageType message_type); void scan_for_packets(); - void check_for_timeouts(); void cleanup(); void lock(enum Operation op) @@ -542,23 +554,6 @@ void DevCommon::scan_for_packets() } -void DevCommon::check_for_timeouts() -{ - // If a reader has timed out, mark its data as read. - - if (hrt_elapsed_time(&_read_buffer->last_mavlink_read) > reader_timeout_us) { - if (_read_buffer->start_mavlink < _read_buffer->end_mavlink) { - _read_buffer->start_mavlink = _read_buffer->end_mavlink; - } - } - - if (hrt_elapsed_time(&_read_buffer->last_rtps_read) > reader_timeout_us) { - if (_read_buffer->start_rtps < _read_buffer->end_rtps) { - _read_buffer->start_rtps = _read_buffer->end_rtps; - } - } -} - void DevCommon::cleanup() { const bool mavlink_available = (_read_buffer->start_mavlink < _read_buffer->end_mavlink); @@ -608,14 +603,11 @@ Mavlink2Dev::Mavlink2Dev(ReadBuffer *read_buffer) ssize_t Mavlink2Dev::read(struct file *filp, char *buffer, size_t buflen) { - _read_buffer->last_mavlink_read = hrt_absolute_time(); - lock(Read); // The cleanup needs to be right after a scan, so we don't clean up // something that we haven't found yet. scan_for_packets(); - check_for_timeouts(); cleanup(); // If we have already a packet ready in the current buffer, we don't have @@ -630,7 +622,7 @@ ssize_t Mavlink2Dev::read(struct file *filp, char *buffer, size_t buflen) // Otherwise, we have to do a read. ret = _read_buffer->read(_fd); - if (ret < 0) { + if (ret <= 0) { unlock(Read); return ret; } @@ -693,12 +685,9 @@ RtpsDev::RtpsDev(ReadBuffer *read_buffer) ssize_t RtpsDev::read(struct file *filp, char *buffer, size_t buflen) { - _read_buffer->last_rtps_read = hrt_absolute_time(); - lock(Read); scan_for_packets(); - check_for_timeouts(); cleanup(); // If we have already a packet ready in the current buffer, we don't have to read. @@ -712,7 +701,7 @@ ssize_t RtpsDev::read(struct file *filp, char *buffer, size_t buflen) // Otherwise, we have to do a read. ret = _read_buffer->read(_fd); - if (ret < 0) { + if (ret <= 0) { unlock(Read); return ret; } @@ -778,10 +767,11 @@ int protocol_splitter_main(int argc, char *argv[]) bytes_received_count = perf_alloc(PC_COUNT, "protocol_splitter: bytes received"); bytes_lost_count = perf_alloc(PC_COUNT, "protocol_splitter: bytes unused/lost"); header_bytes_received_count = perf_alloc(PC_COUNT, "protocol_splitter: header bytes received"); - mavlink_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink messages parsed"); - mavlink_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink messages bytes parsed"); - rtps_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS messages parsed"); - rtps_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS messages bytes parsed"); + mavlink_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink msgs parsed"); + mavlink_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink msgs bytes parsed"); + rtps_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS msgs parsed"); + rtps_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS msgs bytes parsed"); + buffer_drops = perf_alloc(PC_COUNT, "protocol_splitter: buffer drops"); strncpy(objects->device_name, argv[2], sizeof(objects->device_name)); sem_init(&objects->r_lock, 1, 1); @@ -824,6 +814,7 @@ int protocol_splitter_main(int argc, char *argv[]) perf_free(mavlink_bytes_parsed_count); perf_free(rtps_messages_parsed_count); perf_free(rtps_bytes_parsed_count); + perf_free(buffer_drops); } }