fix protocol_splitter: remove timeout, drop buffer if too full instead

The existing implementation had a flaw: when the buffer was getting full,
mavlink started to busy-loop, as the uart has data (poll returns immediately)
but no new data was read from the uart due to the buffer being full.
As rtps is running at lower prio, it never got the chance to read again,
making the problem even worse.
After 1s the timeout triggered and the buffer was cleared, so it recovered.

Instead of allowing for CPU spikes, we now immediately clear the buffer
(only as much as we have to), ensuring that new data is read from the uart.
This commit is contained in:
Beat Küng 2021-12-14 12:39:56 +01:00 committed by Daniel Agar
parent 542ee86bc9
commit 72065c3d71

View File

@ -58,8 +58,6 @@
#include <cstring>
static constexpr uint64_t reader_timeout_us = 1000000;
class Mavlink2Dev;
class RtpsDev;
class ReadBuffer;
@ -120,6 +118,7 @@ perf_counter_t mavlink_messages_parsed_count;
perf_counter_t mavlink_bytes_parsed_count;
perf_counter_t rtps_messages_parsed_count;
perf_counter_t rtps_bytes_parsed_count;
perf_counter_t buffer_drops;
class ReadBuffer
{
@ -134,6 +133,8 @@ public:
uint8_t buffer[1024] = {};
size_t buf_size = 0;
static const size_t BUFFER_THRESHOLD = sizeof(buffer) * 0.8;
// We keep track of the first Mavlink and Rtps packet in the buffer.
// If start and end are equal there is no packet.
size_t start_mavlink = 0;
@ -146,18 +147,30 @@ public:
size_t bytes_received = 0;
size_t bytes_lost = 0;
size_t header_bytes_received = 0;
// To keep track of readers.
hrt_abstime last_mavlink_read = 0;
hrt_abstime last_rtps_read = 0;
};
int ReadBuffer::read(int fd)
{
if (sizeof(buffer) == buf_size) {
// This happens if one consumer does not read the data, or not fast enough.
// TODO: add a mechanism to thrown away data if a user is no longer reading.
PX4_DEBUG("Buffer full: %zu %zu %zu %zu", start_mavlink, end_mavlink, start_rtps, end_rtps);
if (buf_size > BUFFER_THRESHOLD) {
// Drop the buffer if it's too full. This is not expected to happen, but it might, if one of the readers
// is too slow. In that case it's best to make space for new data, otherwise the faster reader might
// busy-loop w/o getting new data.
perf_count(buffer_drops);
PX4_DEBUG("Buffer full, dropping: %zu %zu %zu %zu", start_mavlink, end_mavlink, start_rtps, end_rtps);
// Drop only as much as we have to
size_t num_remove = math::max(start_mavlink, start_rtps);
if (num_remove == 0) {
num_remove = buf_size;
}
remove(0, num_remove);
start_mavlink -= math::min(num_remove, start_mavlink);
end_mavlink -= math::min(num_remove, end_mavlink);
start_rtps -= math::min(num_remove, start_rtps);
end_rtps -= math::min(num_remove, end_rtps);
}
int bytes_available = 0;
@ -264,7 +277,6 @@ protected:
int try_to_copy_data(char *buffer, size_t buflen, MessageType message_type);
void scan_for_packets();
void check_for_timeouts();
void cleanup();
void lock(enum Operation op)
@ -542,23 +554,6 @@ void DevCommon::scan_for_packets()
}
void DevCommon::check_for_timeouts()
{
// If a reader has timed out, mark its data as read.
if (hrt_elapsed_time(&_read_buffer->last_mavlink_read) > reader_timeout_us) {
if (_read_buffer->start_mavlink < _read_buffer->end_mavlink) {
_read_buffer->start_mavlink = _read_buffer->end_mavlink;
}
}
if (hrt_elapsed_time(&_read_buffer->last_rtps_read) > reader_timeout_us) {
if (_read_buffer->start_rtps < _read_buffer->end_rtps) {
_read_buffer->start_rtps = _read_buffer->end_rtps;
}
}
}
void DevCommon::cleanup()
{
const bool mavlink_available = (_read_buffer->start_mavlink < _read_buffer->end_mavlink);
@ -608,14 +603,11 @@ Mavlink2Dev::Mavlink2Dev(ReadBuffer *read_buffer)
ssize_t Mavlink2Dev::read(struct file *filp, char *buffer, size_t buflen)
{
_read_buffer->last_mavlink_read = hrt_absolute_time();
lock(Read);
// The cleanup needs to be right after a scan, so we don't clean up
// something that we haven't found yet.
scan_for_packets();
check_for_timeouts();
cleanup();
// If we have already a packet ready in the current buffer, we don't have
@ -630,7 +622,7 @@ ssize_t Mavlink2Dev::read(struct file *filp, char *buffer, size_t buflen)
// Otherwise, we have to do a read.
ret = _read_buffer->read(_fd);
if (ret < 0) {
if (ret <= 0) {
unlock(Read);
return ret;
}
@ -693,12 +685,9 @@ RtpsDev::RtpsDev(ReadBuffer *read_buffer)
ssize_t RtpsDev::read(struct file *filp, char *buffer, size_t buflen)
{
_read_buffer->last_rtps_read = hrt_absolute_time();
lock(Read);
scan_for_packets();
check_for_timeouts();
cleanup();
// If we have already a packet ready in the current buffer, we don't have to read.
@ -712,7 +701,7 @@ ssize_t RtpsDev::read(struct file *filp, char *buffer, size_t buflen)
// Otherwise, we have to do a read.
ret = _read_buffer->read(_fd);
if (ret < 0) {
if (ret <= 0) {
unlock(Read);
return ret;
}
@ -778,10 +767,11 @@ int protocol_splitter_main(int argc, char *argv[])
bytes_received_count = perf_alloc(PC_COUNT, "protocol_splitter: bytes received");
bytes_lost_count = perf_alloc(PC_COUNT, "protocol_splitter: bytes unused/lost");
header_bytes_received_count = perf_alloc(PC_COUNT, "protocol_splitter: header bytes received");
mavlink_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink messages parsed");
mavlink_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink messages bytes parsed");
rtps_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS messages parsed");
rtps_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS messages bytes parsed");
mavlink_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink msgs parsed");
mavlink_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: MAVLink msgs bytes parsed");
rtps_messages_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS msgs parsed");
rtps_bytes_parsed_count = perf_alloc(PC_COUNT, "protocol_splitter: RTPS msgs bytes parsed");
buffer_drops = perf_alloc(PC_COUNT, "protocol_splitter: buffer drops");
strncpy(objects->device_name, argv[2], sizeof(objects->device_name));
sem_init(&objects->r_lock, 1, 1);
@ -824,6 +814,7 @@ int protocol_splitter_main(int argc, char *argv[])
perf_free(mavlink_bytes_parsed_count);
perf_free(rtps_messages_parsed_count);
perf_free(rtps_bytes_parsed_count);
perf_free(buffer_drops);
}
}