From ba3dbbd38d1397f369a0a58f3eaafd3b56ac113f Mon Sep 17 00:00:00 2001 From: TSC21 Date: Wed, 14 Jul 2021 14:25:31 +0200 Subject: [PATCH] microRTPS: send the system ID with the RTPS packet header and remove the need for extra id fields in uORB This allows that all messages (not only timesync messages) that get received on the same system that sent them do not get parsed. As the microRTPS agent is built currently, this will only happen right now if someone sets the same UDP port to send and receive data, or by manually changing the agent topics (which were always autogenerated). --- .../uorb_microcdr/microRTPS_client.cpp.em | 3 +- msg/templates/urtps/RtpsTopics.cpp.em | 16 +- msg/templates/urtps/microRTPS_agent.cpp.em | 14 +- msg/templates/urtps/microRTPS_timesync.cpp.em | 4 +- msg/templates/urtps/microRTPS_timesync.h.em | 4 - msg/templates/urtps/microRTPS_transport.cpp | 246 +++++++++--------- msg/templates/urtps/microRTPS_transport.h | 89 ++++--- .../microRTPS_client_main.cpp | 11 +- 8 files changed, 205 insertions(+), 182 deletions(-) diff --git a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em index d611e720b8..bda05e09b7 100644 --- a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em +++ b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em @@ -140,11 +140,10 @@ void *send(void *args) if (subs->@(topic)_sub.update(&@(topic)_data)) { @[ if topic == 'Timesync' or topic == 'timesync']@ - if (@(topic)_data.sys_id == 0 && @(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) { + if (@(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) { last_remote_msg_seq = @(topic)_data.seq; @(topic)_data.timestamp = hrt_absolute_time(); - @(topic)_data.sys_id = 1; @(topic)_data.seq = last_msg_seq; @(topic)_data.tc1 = hrt_absolute_time() * 1000ULL; @(topic)_data.ts1 = @(topic)_data.ts1; diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 1e0d5b0329..ff70b5f99b 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -117,11 +117,11 @@ void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len) eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len); eprosima::fastcdr::Cdr cdr_des(cdrbuffer); st.deserialize(cdr_des); + @[ if topic == 'Timesync' or topic == 'timesync']@ _timesync->processTimesyncMsg(&st, &_@(topic)_pub); - - if (getMsgSysID(&st) == 1) { @[ end if]@ + // apply timestamp offset uint64_t timestamp = getMsgTimestamp(&st); uint64_t timestamp_sample = getMsgTimestampSample(&st); @@ -129,11 +129,8 @@ void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len) setMsgTimestamp(&st, timestamp); _timesync->subtractOffset(timestamp_sample); setMsgTimestampSample(&st, timestamp_sample); - _@(topic)_pub.publish(&st); -@[ if topic == 'Timesync' or topic == 'timesync']@ - } -@[ end if]@ + _@(topic)_pub.publish(&st); } break; @[end for]@ @@ -157,10 +154,7 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) case @(rtps_message_id(ids, topic)): // @(topic) if (_@(topic)_sub.hasMsg()) { @(topic)_msg_t msg = _@(topic)_sub.getMsg(); -@[ if topic == 'Timesync' or topic == 'timesync']@ - if (getMsgSysID(&msg) == 0) { -@[ end if]@ // apply timestamps offset uint64_t timestamp = getMsgTimestamp(&msg); uint64_t timestamp_sample = getMsgTimestampSample(&msg); @@ -168,12 +162,10 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) setMsgTimestamp(&msg, timestamp); _timesync->addOffset(timestamp_sample); setMsgTimestampSample(&msg, timestamp_sample); + msg.serialize(scdr); ret = true; -@[ if topic == 'Timesync' or topic == 'timesync']@ - } -@[ end if]@ _@(topic)_sub.unlockMsg(); } diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index ad2103764e..ccf464c579 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -286,10 +286,19 @@ int main(int argc, char **argv) printf("[ micrortps_agent ]\tUsing only the localhost network...\n"); } + /** + * Set the system ID to Mission Computer, in order to identify the agent side + * + * Note: theoretically a multi-agent system is possible, but this would require + * adjustments in the way the timesync is done (would have to create a timesync + * instance per agent). Keeping it contained for a 1:1 link for now is reasonable. + */ + const uint8_t sys_id = static_cast(MicroRtps::System::MISSION_COMPUTER); + switch (_options.transport) { case options::eTransports::UART: { transport_node = std::make_unique(_options.device, _options.baudrate, _options.poll_ms, - _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); + _options.sw_flow_control, _options.hw_flow_control, sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n", _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); @@ -297,7 +306,8 @@ int main(int argc, char **argv) break; case options::eTransports::UDP: { - transport_node = std::make_unique(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug); + transport_node = std::make_unique(_options.ip, _options.recv_port, _options.send_port, + sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n", _options.ip, _options.recv_port, _options.send_port, _options.sleep_us); } diff --git a/msg/templates/urtps/microRTPS_timesync.cpp.em b/msg/templates/urtps/microRTPS_timesync.cpp.em index 7b15cd8bb2..6cb71a90bd 100644 --- a/msg/templates/urtps/microRTPS_timesync.cpp.em +++ b/msg/templates/urtps/microRTPS_timesync.cpp.em @@ -227,7 +227,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub) { - if (getMsgSysID(msg) == 1 && getMsgSeq(msg) != _last_remote_msg_seq) { + if (getMsgSeq(msg) != _last_remote_msg_seq) { _last_remote_msg_seq = getMsgSeq(msg); if (getMsgTC1(msg) > 0) { @@ -245,7 +245,6 @@ void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub) @[else]@ setMsgTimestamp(msg, getSteadyTimeUSec()); @[end if]@ - setMsgSysID(msg, 0); setMsgSeq(msg, getMsgSeq(msg) + 1); @[if ros2_distro]@ setMsgTC1(msg, getROSTimeNSec()); @@ -267,7 +266,6 @@ timesync_msg_t TimeSync::newTimesyncMsg() @[else]@ setMsgTimestamp(&msg, getSteadyTimeUSec()); @[end if]@ - setMsgSysID(&msg, 0); setMsgSeq(&msg, _last_msg_seq); setMsgTC1(&msg, 0); @[if ros2_distro]@ diff --git a/msg/templates/urtps/microRTPS_timesync.h.em b/msg/templates/urtps/microRTPS_timesync.h.em index e272cf67db..0614906aa3 100644 --- a/msg/templates/urtps/microRTPS_timesync.h.em +++ b/msg/templates/urtps/microRTPS_timesync.h.em @@ -259,13 +259,11 @@ private: /** Timesync msg Getters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp_(); } - inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id_(); } inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq_(); } inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1_(); } inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1_(); } @[elif ros2_distro]@ inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp(); } - inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id(); } inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq(); } inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1(); } inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); } @@ -282,12 +280,10 @@ private: /** Timesync msg Setters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; } @[elif ros2_distro]@ - inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; } diff --git a/msg/templates/urtps/microRTPS_transport.cpp b/msg/templates/urtps/microRTPS_transport.cpp index c4f3719f26..60b04a9ba3 100644 --- a/msg/templates/urtps/microRTPS_transport.cpp +++ b/msg/templates/urtps/microRTPS_transport.cpp @@ -80,9 +80,10 @@ uint16_t const crc16_table[256] = { 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 }; -Transport_node::Transport_node(const bool _debug): - rx_buff_pos(0), - debug(_debug) +Transport_node::Transport_node(const uint8_t sys_id, const bool debug): + _rx_buff_pos(0), + _debug(debug), + _sys_id(sys_id) { } @@ -106,15 +107,15 @@ uint16_t Transport_node::crc16(uint8_t const *buffer, size_t len) return crc; } -ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len) +ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer_len) { - if (nullptr == out_buffer || nullptr == topic_ID || !fds_OK()) { + if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) { return -1; } - *topic_ID = 255; + *topic_id = 255; - ssize_t len = node_read((void *)(rx_buffer + rx_buff_pos), sizeof(rx_buffer) - rx_buff_pos); + ssize_t len = node_read((void *)(_rx_buffer + _rx_buff_pos), sizeof(_rx_buffer) - _rx_buff_pos); if (len < 0) { int errsv = errno; @@ -122,11 +123,11 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (errsv && EAGAIN != errsv && ETIMEDOUT != errsv) { #ifndef PX4_DEBUG - if (debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); } + if (_debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); } #else - if (debug) { PX4_DEBUG("Read fail %d", errsv); } + if (_debug) { PX4_DEBUG("Read fail %d", errsv); } #endif /* PX4_DEBUG */ } @@ -134,26 +135,26 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer return len; } - rx_buff_pos += len; + _rx_buff_pos += len; // We read some size_t header_size = sizeof(struct Header); // but not enough - if (rx_buff_pos < header_size) { + if (_rx_buff_pos < header_size) { return 0; } uint32_t msg_start_pos = 0; - for (msg_start_pos = 0; msg_start_pos <= rx_buff_pos - header_size; ++msg_start_pos) { - if ('>' == rx_buffer[msg_start_pos] && memcmp(rx_buffer + msg_start_pos, ">>>", 3) == 0) { + for (msg_start_pos = 0; msg_start_pos <= _rx_buff_pos - header_size; ++msg_start_pos) { + if ('>' == _rx_buffer[msg_start_pos] && memcmp(_rx_buffer + msg_start_pos, ">>>", 3) == 0) { break; } } // Start not found - if (msg_start_pos > (rx_buff_pos - header_size)) { + if (msg_start_pos > (_rx_buff_pos - header_size)) { #ifndef PX4_DEBUG if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } @@ -165,25 +166,36 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer #endif /* PX4_DEBUG */ // All we've checked so far is garbage, drop it - but save unchecked bytes - memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos); - rx_buff_pos -= msg_start_pos; + memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); + _rx_buff_pos -= msg_start_pos; return -1; } - // [>,>,>,topic_ID,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] - struct Header *header = (struct Header *)&rx_buffer[msg_start_pos]; + // [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] + struct Header *header = (struct Header *)&_rx_buffer[msg_start_pos]; uint32_t payload_len = ((uint32_t)header->payload_len_h << 8) | header->payload_len_l; + // The received message comes from this system. Discard it. + // This might happen when: + // 1. The same UDP port is being used to send a rcv packets or + // 2. The same topic on the agent is being used for outgoing and incoming data + if (header->sys_id == _sys_id) { + // Drop the message and continue with the read buffer + memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); + _rx_buff_pos -= (msg_start_pos + 1); + return -1; + } + // The message won't fit the buffer. if (buffer_len < header_size + payload_len) { // Drop the message and continue with the read buffer - memmove(rx_buffer, rx_buffer + msg_start_pos + 1, rx_buff_pos - (msg_start_pos + 1)); - rx_buff_pos -= (msg_start_pos + 1); + memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); + _rx_buff_pos -= (msg_start_pos + 1); return -EMSGSIZE; } // We do not have a complete message yet - if (msg_start_pos + header_size + payload_len > rx_buff_pos) { + if (msg_start_pos + header_size + payload_len > _rx_buff_pos) { // If there's garbage at the beginning, drop it if (msg_start_pos > 0) { #ifndef PX4_DEBUG @@ -195,15 +207,15 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); } #endif /* PX4_DEBUG */ - memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos); - rx_buff_pos -= msg_start_pos; + memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); + _rx_buff_pos -= msg_start_pos; } return 0; } uint16_t read_crc = ((uint16_t)header->crc_h << 8) | header->crc_l; - uint16_t calc_crc = crc16((uint8_t *)rx_buffer + msg_start_pos + header_size, payload_len); + uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + msg_start_pos + header_size, payload_len); if (read_crc != calc_crc) { #ifndef PX4_DEBUG @@ -212,27 +224,27 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer #else - if (debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } + if (_debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } #endif /* PX4_DEBUG */ // Drop garbage up just beyond the start of the message - memmove(rx_buffer, rx_buffer + (msg_start_pos + 1), rx_buff_pos); + memmove(_rx_buffer, _rx_buffer + (msg_start_pos + 1), _rx_buff_pos); // If there is a CRC error, the payload len cannot be trusted - rx_buff_pos -= (msg_start_pos + 1); + _rx_buff_pos -= (msg_start_pos + 1); len = -1; } else { // copy message to outbuffer and set other return values - memmove(out_buffer, rx_buffer + msg_start_pos + header_size, payload_len); - *topic_ID = header->topic_ID; + memmove(out_buffer, _rx_buffer + msg_start_pos + header_size, payload_len); + *topic_id = header->topic_id; len = payload_len + header_size; - // discard message from rx_buffer - rx_buff_pos -= msg_start_pos + header_size + payload_len; - memmove(rx_buffer, rx_buffer + msg_start_pos + header_size + payload_len, rx_buff_pos); + // discard message from _rx_buffer + _rx_buff_pos -= msg_start_pos + header_size + payload_len; + memmove(_rx_buffer, _rx_buffer + msg_start_pos + header_size + payload_len, _rx_buff_pos); } return len; @@ -243,18 +255,19 @@ size_t Transport_node::get_header_length() return sizeof(struct Header); } -ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length) +ssize_t Transport_node::write(const uint8_t topic_id, char buffer[], size_t length) { if (!fds_OK()) { return -1; } - static struct Header header = {{'>', '>', '>'}, 0u, 0u, 0u, 0u, 0u, 0u}; + static struct Header header = {{'>', '>', '>'}, 0u, 0u, 0u, 0u, 0u, 0u, 0u}; - // [>,>,>,topic_ID,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end] + // [>,>,>,topic_id,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end] uint16_t crc = crc16((uint8_t *)&buffer[sizeof(header)], length); - header.topic_ID = topic_ID; + header.topic_id = topic_id; + header.sys_id = _sys_id; header.seq = _seq_number++; header.payload_len_h = (length >> 8) & 0xff; header.payload_len_l = length & 0xff; @@ -273,19 +286,20 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng return len + sizeof(header); } -UART_node::UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug): - Transport_node(_debug), - uart_fd(-1), - baudrate(_baudrate), - poll_ms(_poll_ms), - hw_flow_control(_hw_flow_control), - sw_flow_control(_sw_flow_control) +UART_node::UART_node(const char *uart_name, const uint32_t baudrate, + const uint32_t poll_ms, const bool hw_flow_control, + const bool sw_flow_control, const uint8_t sys_id, + const bool debug): + Transport_node(sys_id, debug), + _uart_fd(-1), + _baudrate(baudrate), + _poll_ms(poll_ms), + _hw_flow_control(hw_flow_control), + _sw_flow_control(sw_flow_control) { - if (nullptr != _uart_name) { - strcpy(uart_name, _uart_name); + if (nullptr != uart_name) { + strcpy(_uart_name, uart_name); } } @@ -297,20 +311,20 @@ UART_node::~UART_node() int UART_node::init() { // Open a serial port - uart_fd = open(uart_name, O_RDWR | O_NOCTTY | O_NONBLOCK); + _uart_fd = open(_uart_name, O_RDWR | O_NOCTTY | O_NONBLOCK); - if (uart_fd < 0) { + if (_uart_fd < 0) { #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: Failed to open device: %s (%d)\033[0m\n", uart_name, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: Failed to open device: %s (%d)\033[0m\n", _uart_name, errno); #else - PX4_ERR("UART transport: Failed to open device: %s (%d)", uart_name, errno); + PX4_ERR("UART transport: Failed to open device: %s (%d)", _uart_name, errno); #endif /* PX4_ERR */ return -errno; } // If using shared UART, no need to set it up - if (baudrate == 0) { - return uart_fd; + if (_baudrate == 0) { + return _uart_fd; } // Try to set baud rate @@ -318,13 +332,13 @@ int UART_node::init() int termios_state; // Back up the original uart configuration to restore it after exit - if ((termios_state = tcgetattr(uart_fd, &uart_config)) < 0) { + if ((termios_state = tcgetattr(_uart_fd, &uart_config)) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state, + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", _uart_name, termios_state, errno); #else - PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", uart_name, termios_state, errno); + PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", _uart_name, termios_state, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; @@ -343,11 +357,11 @@ int UART_node::init() uart_config.c_lflag &= ~(ISIG | ICANON | ECHO | TOSTOP | IEXTEN); // Flow control - if (hw_flow_control) { + if (_hw_flow_control) { // HW flow control uart_config.c_lflag |= CRTSCTS; - } else if (sw_flow_control) { + } else if (_sw_flow_control) { // SW flow control uart_config.c_lflag |= (IXON | IXOFF | IXANY); } @@ -355,10 +369,10 @@ int UART_node::init() // Set baud rate speed_t speed; - if (!baudrate_to_speed(baudrate, &speed)) { + if (!baudrate_to_speed(_baudrate, &speed)) { #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n", - uart_name, baudrate); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported _baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n", + _uart_name, _baudrate); #else PX4_ERR("UART transport: ERR SET BAUD %s: Unsupported baudrate: %" PRIu32 "\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\n", uart_name, baudrate); @@ -370,21 +384,21 @@ int UART_node::init() if (cfsetispeed(&uart_config, speed) < 0 || cfsetospeed(&uart_config, speed) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state, + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", _uart_name, termios_state, errno); #else - PX4_ERR("ERR SET BAUD %s: %d (%d)", uart_name, termios_state, errno); + PX4_ERR("ERR SET BAUD %s: %d (%d)", _uart_name, termios_state, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; } - if ((termios_state = tcsetattr(uart_fd, TCSANOW, &uart_config)) < 0) { + if ((termios_state = tcsetattr(_uart_fd, TCSANOW, &uart_config)) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET CONF %s (%d)\033[0m\n", uart_name, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET CONF %s (%d)\033[0m\n", _uart_name, errno); #else - PX4_ERR("UART transport: ERR SET CONF %s (%d)", uart_name, errno); + PX4_ERR("UART transport: ERR SET CONF %s (%d)", _uart_name, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; @@ -393,7 +407,7 @@ int UART_node::init() char aux[64]; bool flush = false; - while (0 < ::read(uart_fd, (void *)&aux, 64)) { + while (0 < ::read(_uart_fd, (void *)&aux, 64)) { flush = true; #ifndef px4_usleep usleep(1000); @@ -406,48 +420,48 @@ int UART_node::init() if (flush) { #ifndef PX4_DEBUG - if (debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); } + if (_debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); } #else - if (debug) { PX4_DEBUG("UART transport: Flush"); } + if (_debug) { PX4_DEBUG("UART transport: Flush"); } #endif /* PX4_DEBUG */ } else { #ifndef PX4_DEBUG - if (debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); } + if (_debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); } #else - if (debug) { PX4_DEBUG("UART transport: No flush"); } + if (_debug) { PX4_DEBUG("UART transport: No flush"); } #endif /* PX4_INFO */ } - poll_fd[0].fd = uart_fd; - poll_fd[0].events = POLLIN; + _poll_fd[0].fd = _uart_fd; + _poll_fd[0].events = POLLIN; - return uart_fd; + return _uart_fd; } bool UART_node::fds_OK() { - return (-1 != uart_fd); + return (-1 != _uart_fd); } uint8_t UART_node::close() { - if (-1 != uart_fd) { + if (-1 != _uart_fd) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tClosed UART.\n\033[0m"); #else PX4_WARN("Closed UART."); #endif /* PX4_WARN */ - ::close(uart_fd); - uart_fd = -1; - memset(&poll_fd, 0, sizeof(poll_fd)); + ::close(_uart_fd); + _uart_fd = -1; + memset(&_poll_fd, 0, sizeof(_poll_fd)); } return 0; @@ -460,10 +474,10 @@ ssize_t UART_node::node_read(void *buffer, size_t len) } ssize_t ret = 0; - int r = poll(poll_fd, 1, poll_ms); + int r = poll(_poll_fd, 1, _poll_ms); - if (r == 1 && (poll_fd[0].revents & POLLIN)) { - ret = ::read(uart_fd, buffer, len); + if (r == 1 && (_poll_fd[0].revents & POLLIN)) { + ret = ::read(_uart_fd, buffer, len); } return ret; @@ -475,7 +489,7 @@ ssize_t UART_node::node_write(void *buffer, size_t len) return -1; } - return ::write(uart_fd, buffer, len); + return ::write(_uart_fd, buffer, len); } bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed) @@ -577,16 +591,16 @@ bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed) return true; } -UDP_node::UDP_node(const char *_udp_ip, uint16_t _udp_port_recv, - uint16_t _udp_port_send, const bool _debug): - Transport_node(_debug), - sender_fd(-1), - receiver_fd(-1), - udp_port_recv(_udp_port_recv), - udp_port_send(_udp_port_send) +UDP_node::UDP_node(const char *udp_ip, uint16_t udp_port_recv, + uint16_t udp_port_send, const uint8_t sys_id, const bool debug): + Transport_node(sys_id, debug), + _sender_fd(-1), + _receiver_fd(-1), + _udp_port_recv(udp_port_recv), + _udp_port_send(udp_port_send) { - if (nullptr != _udp_ip) { - strcpy(udp_ip, _udp_ip); + if (nullptr != udp_ip) { + strcpy(_udp_ip, udp_ip); } } @@ -597,7 +611,7 @@ UDP_node::~UDP_node() int UDP_node::init() { - if (0 > init_receiver(udp_port_recv) || 0 > init_sender(udp_port_send)) { + if (0 > init_receiver(_udp_port_recv) || 0 > init_sender(_udp_port_send)) { return -1; } @@ -606,19 +620,19 @@ int UDP_node::init() bool UDP_node::fds_OK() { - return (-1 != sender_fd && -1 != receiver_fd); + return (-1 != _sender_fd && -1 != _receiver_fd); } int UDP_node::init_receiver(uint16_t udp_port) { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) // udp socket data - memset((char *)&receiver_inaddr, 0, sizeof(receiver_inaddr)); - receiver_inaddr.sin_family = AF_INET; - receiver_inaddr.sin_port = htons(udp_port); - receiver_inaddr.sin_addr.s_addr = htonl(INADDR_ANY); + memset((char *)&_receiver_inaddr, 0, sizeof(_receiver_inaddr)); + _receiver_inaddr.sin_family = AF_INET; + _receiver_inaddr.sin_port = htons(udp_port); + _receiver_inaddr.sin_addr.s_addr = htonl(INADDR_ANY); - if ((receiver_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + if ((_receiver_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Create socket failed\033[0m\n"); #else @@ -633,7 +647,7 @@ int UDP_node::init_receiver(uint16_t udp_port) PX4_INFO("UDP transport: Trying to connect..."); #endif /* PX4_INFO */ - if (bind(receiver_fd, (struct sockaddr *)&receiver_inaddr, sizeof(receiver_inaddr)) < 0) { + if (bind(_receiver_fd, (struct sockaddr *)&_receiver_inaddr, sizeof(_receiver_inaddr)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Bind failed\033[0m\n"); #else @@ -655,7 +669,7 @@ int UDP_node::init_sender(uint16_t udp_port) { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - if ((sender_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + if ((_sender_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Create socket failed\033[0m\n"); #else @@ -664,11 +678,11 @@ int UDP_node::init_sender(uint16_t udp_port) return -1; } - memset((char *) &sender_outaddr, 0, sizeof(sender_outaddr)); - sender_outaddr.sin_family = AF_INET; - sender_outaddr.sin_port = htons(udp_port); + memset((char *) &_sender_outaddr, 0, sizeof(_sender_outaddr)); + _sender_outaddr.sin_family = AF_INET; + _sender_outaddr.sin_port = htons(udp_port); - if (inet_aton(udp_ip, &sender_outaddr.sin_addr) == 0) { + if (inet_aton(_udp_ip, &_sender_outaddr.sin_addr) == 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: inet_aton() failed\033[0m\n"); #else @@ -686,26 +700,26 @@ uint8_t UDP_node::close() { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - if (sender_fd != -1) { + if (_sender_fd != -1) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tUDP transport: Closed sender socket!\033[0m\n"); #else PX4_WARN("UDP transport: Closed sender socket!"); #endif /* PX4_WARN */ - shutdown(sender_fd, SHUT_RDWR); - ::close(sender_fd); - sender_fd = -1; + shutdown(_sender_fd, SHUT_RDWR); + ::close(_sender_fd); + _sender_fd = -1; } - if (receiver_fd != -1) { + if (_receiver_fd != -1) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tUDP transport: Closed receiver socket!\033[0m\n"); #else PX4_WARN("UDP transport: Closed receiver socket!"); #endif /* PX4_WARN */ - shutdown(receiver_fd, SHUT_RDWR); - ::close(receiver_fd); - receiver_fd = -1; + shutdown(_receiver_fd, SHUT_RDWR); + ::close(_receiver_fd); + _receiver_fd = -1; } #endif /* __PX4_NUTTX */ @@ -721,8 +735,8 @@ ssize_t UDP_node::node_read(void *buffer, size_t len) ssize_t ret = 0; #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) // Blocking call - static socklen_t addrlen = sizeof(receiver_outaddr); - ret = recvfrom(receiver_fd, buffer, len, 0, (struct sockaddr *)&receiver_outaddr, &addrlen); + static socklen_t addrlen = sizeof(_receiver_outaddr); + ret = recvfrom(_receiver_fd, buffer, len, 0, (struct sockaddr *)&_receiver_outaddr, &addrlen); #endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */ return ret; } @@ -735,7 +749,7 @@ ssize_t UDP_node::node_write(void *buffer, size_t len) ssize_t ret = 0; #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - ret = sendto(sender_fd, buffer, len, 0, (struct sockaddr *)&sender_outaddr, sizeof(sender_outaddr)); + ret = sendto(_sender_fd, buffer, len, 0, (struct sockaddr *)&_sender_outaddr, sizeof(_sender_outaddr)); #endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */ return ret; } diff --git a/msg/templates/urtps/microRTPS_transport.h b/msg/templates/urtps/microRTPS_transport.h index ac6351a7c3..3605b63f8c 100644 --- a/msg/templates/urtps/microRTPS_transport.h +++ b/msg/templates/urtps/microRTPS_transport.h @@ -41,19 +41,26 @@ #define BUFFER_SIZE 1024 #define DEFAULT_UART "/dev/ttyACM0" +namespace MicroRtps { + enum class System { + FMU, + MISSION_COMPUTER + }; +} + class Transport_node { public: - Transport_node(const bool _debug); + Transport_node(const uint8_t sys_id, const bool debug); virtual ~Transport_node(); virtual int init() {return 0;} virtual uint8_t close() {return 0;} - ssize_t read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len); + ssize_t read(uint8_t *topic_id, char out_buffer[], size_t buffer_len); /** * write a buffer - * @param topic_ID + * @param topic_id * @param buffer buffer to write: it must leave get_header_length() bytes free at the beginning. This will be * filled with the header. length does not include get_header_length(). So buffer looks like this: * ------------------------------------------------- @@ -63,11 +70,23 @@ public: * @param length buffer length excluding header length * @return length on success, <0 on error */ - ssize_t write(const uint8_t topic_ID, char buffer[], size_t length); + ssize_t write(const uint8_t topic_id, char buffer[], size_t length); /** Get the Length of struct Header to make headroom for the size of struct Header along with payload */ size_t get_header_length(); +private: + struct __attribute__((packed)) Header { + char marker[3]; + uint8_t topic_id; + uint8_t sys_id; + uint8_t seq; + uint8_t payload_len_h; + uint8_t payload_len_l; + uint8_t crc_h; + uint8_t crc_l; + }; + protected: virtual ssize_t node_read(void *buffer, size_t len) = 0; virtual ssize_t node_write(void *buffer, size_t len) = 0; @@ -75,30 +94,22 @@ protected: uint16_t crc16_byte(uint16_t crc, const uint8_t data); uint16_t crc16(uint8_t const *buffer, size_t len); -protected: - uint32_t rx_buff_pos; - char rx_buffer[BUFFER_SIZE] = {}; - bool debug = false; - uint8_t _seq_number{0}; + uint32_t _rx_buff_pos; + char _rx_buffer[BUFFER_SIZE]{}; -private: - struct __attribute__((packed)) Header { - char marker[3]; - uint8_t topic_ID; - uint8_t seq; - uint8_t payload_len_h; - uint8_t payload_len_l; - uint8_t crc_h; - uint8_t crc_l; - }; + bool _debug; + + uint8_t _sys_id; + uint8_t _seq_number{0}; }; class UART_node: public Transport_node { public: - UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug); + UART_node(const char *uart_name, const uint32_t baudrate, + const uint32_t poll_ms, const bool hw_flow_control, + const bool sw_flow_control, const uint8_t sys_id, + const bool debug); virtual ~UART_node(); int init(); @@ -110,20 +121,20 @@ protected: bool fds_OK(); bool baudrate_to_speed(uint32_t bauds, speed_t *speed); - int uart_fd; - char uart_name[64] = {}; - uint32_t baudrate; - uint32_t poll_ms; - bool hw_flow_control = false; - bool sw_flow_control = false; - struct pollfd poll_fd[1] = {}; + int _uart_fd; + char _uart_name[64]{}; + uint32_t _baudrate; + uint32_t _poll_ms; + bool _hw_flow_control{false}; + bool _sw_flow_control{false}; + struct pollfd _poll_fd[1]{}; }; class UDP_node: public Transport_node { public: - UDP_node(const char *_udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, - const bool _debug); + UDP_node(const char *udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, + const uint8_t sys_id, const bool debug); virtual ~UDP_node(); int init(); @@ -136,12 +147,12 @@ protected: ssize_t node_write(void *buffer, size_t len); bool fds_OK(); - int sender_fd; - int receiver_fd; - char udp_ip[16] = {}; - uint16_t udp_port_recv; - uint16_t udp_port_send; - struct sockaddr_in sender_outaddr; - struct sockaddr_in receiver_inaddr; - struct sockaddr_in receiver_outaddr; + int _sender_fd; + int _receiver_fd; + char _udp_ip[16]{}; + uint16_t _udp_port_recv; + uint16_t _udp_port_send; + struct sockaddr_in _sender_outaddr; + struct sockaddr_in _receiver_inaddr; + struct sockaddr_in _receiver_outaddr; }; diff --git a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp index 6909d50ea9..947dd1f7d5 100644 --- a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp +++ b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp @@ -168,12 +168,15 @@ static int micrortps_start(int argc, char *argv[]) return -1; } + // Set the system ID to FMU, in order to identify the client side + const uint8_t sys_id = static_cast(MicroRtps::System::FMU); + switch (_options.transport) { case options::eTransports::UART: { transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms, - _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); - PX4_INFO("UART transport: device: %s; baudrate: %" PRIu32 "; sleep: %" PRIu32 "ms; poll: %" PRIu32 - "ms; flow_control: %s", + _options.sw_flow_control, _options.hw_flow_control, sys_id, + _options.verbose_debug); + PX4_INFO("UART transport: device: %s; baudrate: %" PRIu32 "; sleep: %" PRIu32 "ms; flow_control: %s", _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); } @@ -181,7 +184,7 @@ static int micrortps_start(int argc, char *argv[]) case options::eTransports::UDP: { transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, - _options.verbose_debug); + sys_id, _options.verbose_debug); PX4_INFO("UDP transport: ip address: %s; recv port: %" PRIu16 "; send port: %" PRIu16 "; sleep: %" PRIu32 "us", _options.ip, _options.recv_port, _options.send_port, _options.sleep_us);