platforms: Serial new dedicated writeBlocking method (#25537)

* platforms: Serial new dedicated writeBlocking method

* finish writeBlocking()

* add back fsync

* updated posix, added string constant for port not open error

* format

* fix build

* remove fsync

* actually remove fsync

* remove fsync from write

* review feedback

---------

Co-authored-by: Jacob Dahl <dahl.jakejacob@gmail.com>
Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com>
This commit is contained in:
Daniel Agar 2025-09-15 19:22:49 -04:00 committed by GitHub
parent 41d3403ec7
commit d3f912ad25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 144 additions and 5 deletions

View File

@ -89,6 +89,11 @@ ssize_t Serial::write(const void *buffer, size_t buffer_size)
return _impl.write(buffer, buffer_size);
}
ssize_t Serial::writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_ms)
{
return _impl.writeBlocking(buffer, buffer_size, timeout_ms);
}
void Serial::flush()
{
return _impl.flush();

View File

@ -66,6 +66,7 @@ public:
ssize_t readAtLeast(uint8_t *buffer, size_t buffer_size, size_t character_count = 1, uint32_t timeout_ms = 0);
ssize_t write(const void *buffer, size_t buffer_size);
ssize_t writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_ms = 0);
void flush();

View File

@ -362,11 +362,68 @@ ssize_t SerialImpl::write(const void *buffer, size_t buffer_size)
}
}
::fsync(_serial_fd);
return written;
}
ssize_t SerialImpl::writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_ms)
{
if (!_open) {
PX4_ERR("Cannot writeBlocking to serial device until it has been opened");
return -1;
}
const uint8_t *data = static_cast<const uint8_t *>(buffer);
size_t total_written = 0;
const hrt_abstime start_time_us = hrt_absolute_time();
const hrt_abstime timeout_us = timeout_ms * 1000;
while (total_written < buffer_size) {
if (hrt_elapsed_time(&start_time_us) > timeout_us) {
PX4_WARN("Write timeout, sent %zu", total_written);
break;
}
pollfd fds[1];
fds[0].fd = _serial_fd;
fds[0].events = POLLOUT;
hrt_abstime elapsed_us = hrt_elapsed_time(&start_time_us);
int remaining_timeout_ms = (timeout_us - elapsed_us) / 1000;
if (remaining_timeout_ms <= 0) {
break;
}
int result = ::poll(fds, 1, remaining_timeout_ms);
if (result < 0) {
PX4_ERR("poll error %d", errno);
return -1;
}
if (fds[0].revents & POLLOUT) {
// Write as much as we can
ssize_t written = ::write(_serial_fd, data + total_written, buffer_size - total_written);
if (written < 0) {
if (errno == EAGAIN) {
// Buffer full, wait a bit and try again
px4_usleep(1000);
continue;
}
PX4_ERR("Write error: %d", errno);
return -1;
} else if (written > 0) {
total_written += written;
}
}
}
return total_written;
}
void SerialImpl::flush()
{
if (_open) {

View File

@ -64,6 +64,7 @@ public:
ssize_t readAtLeast(uint8_t *buffer, size_t buffer_size, size_t character_count = 1, uint32_t timeout_us = 0);
ssize_t write(const void *buffer, size_t buffer_size);
ssize_t writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_us = 0);
void flush();

View File

@ -64,6 +64,7 @@ public:
ssize_t readAtLeast(uint8_t *buffer, size_t buffer_size, size_t character_count = 1, uint32_t timeout_us = 0);
ssize_t write(const void *buffer, size_t buffer_size);
ssize_t writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_us = 0);
void flush();

View File

@ -274,7 +274,6 @@ ssize_t SerialImpl::read(uint8_t *buffer, size_t buffer_size)
if (ret < 0) {
PX4_DEBUG("%s read error %d", _port, ret);
}
return ret;
@ -344,11 +343,68 @@ ssize_t SerialImpl::write(const void *buffer, size_t buffer_size)
}
}
::fsync(_serial_fd);
return written;
}
ssize_t SerialImpl::writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_ms)
{
if (!_open) {
PX4_ERR("Cannot writeBlocking to serial device until it has been opened");
return -1;
}
const uint8_t *data = static_cast<const uint8_t *>(buffer);
size_t total_written = 0;
const hrt_abstime start_time_us = hrt_absolute_time();
const hrt_abstime timeout_us = timeout_ms * 1000;
while (total_written < buffer_size) {
if (hrt_elapsed_time(&start_time_us) > timeout_us) {
PX4_WARN("Write timeout, sent %zu", total_written);
break;
}
pollfd fds[1];
fds[0].fd = _serial_fd;
fds[0].events = POLLOUT;
hrt_abstime elapsed_us = hrt_elapsed_time(&start_time_us);
int remaining_timeout_ms = (timeout_us - elapsed_us) / 1000;
if (remaining_timeout_ms <= 0) {
break;
}
int result = ::poll(fds, 1, remaining_timeout_ms);
if (result < 0) {
PX4_ERR("poll error %d", errno);
return -1;
}
if (fds[0].revents & POLLOUT) {
// Write as much as we can
ssize_t written = ::write(_serial_fd, data + total_written, buffer_size - total_written);
if (written < 0) {
if (errno == EAGAIN) {
// Buffer full, wait a bit and try again
px4_usleep(1000);
continue;
}
PX4_ERR("Write error: %d", errno);
return -1;
} else if (written > 0) {
total_written += written;
}
}
}
return total_written;
}
void SerialImpl::flush()
{
if (_open) {

View File

@ -63,6 +63,7 @@ public:
ssize_t readAtLeast(uint8_t *buffer, size_t buffer_size, size_t character_count = 1, uint32_t timeout_us = 0);
ssize_t write(const void *buffer, size_t buffer_size);
ssize_t writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_us = 0);
void flush();

View File

@ -276,6 +276,23 @@ ssize_t SerialImpl::write(const void *buffer, size_t buffer_size)
return ret_write;
}
ssize_t SerialImpl::writeBlocking(const void *buffer, size_t buffer_size, uint32_t timeout_ms)
{
if (!_open) {
PX4_ERR("Cannot write to serial device until it has been opened");
return -1;
}
int ret_write = qurt_uart_write(_serial_fd, (const char *) buffer, buffer_size);
if (ret_write < 0) {
PX4_ERR("%s write error %d", _port, ret_write);
}
return ret_write;
}
void SerialImpl::flush()
{
if (_open) {