diff --git a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em index d611e720b8..bda05e09b7 100644 --- a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em +++ b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em @@ -140,11 +140,10 @@ void *send(void *args) if (subs->@(topic)_sub.update(&@(topic)_data)) { @[ if topic == 'Timesync' or topic == 'timesync']@ - if (@(topic)_data.sys_id == 0 && @(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) { + if (@(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) { last_remote_msg_seq = @(topic)_data.seq; @(topic)_data.timestamp = hrt_absolute_time(); - @(topic)_data.sys_id = 1; @(topic)_data.seq = last_msg_seq; @(topic)_data.tc1 = hrt_absolute_time() * 1000ULL; @(topic)_data.ts1 = @(topic)_data.ts1; diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 1e0d5b0329..ff70b5f99b 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -117,11 +117,11 @@ void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len) eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len); eprosima::fastcdr::Cdr cdr_des(cdrbuffer); st.deserialize(cdr_des); + @[ if topic == 'Timesync' or topic == 'timesync']@ _timesync->processTimesyncMsg(&st, &_@(topic)_pub); - - if (getMsgSysID(&st) == 1) { @[ end if]@ + // apply timestamp offset uint64_t timestamp = getMsgTimestamp(&st); uint64_t timestamp_sample = getMsgTimestampSample(&st); @@ -129,11 +129,8 @@ void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len) setMsgTimestamp(&st, timestamp); _timesync->subtractOffset(timestamp_sample); setMsgTimestampSample(&st, timestamp_sample); - _@(topic)_pub.publish(&st); -@[ if topic == 'Timesync' or topic == 'timesync']@ - } -@[ end if]@ + _@(topic)_pub.publish(&st); } break; @[end for]@ @@ -157,10 +154,7 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) case @(rtps_message_id(ids, topic)): // @(topic) if (_@(topic)_sub.hasMsg()) { @(topic)_msg_t msg = _@(topic)_sub.getMsg(); -@[ if topic == 'Timesync' or topic == 'timesync']@ - if (getMsgSysID(&msg) == 0) { -@[ end if]@ // apply timestamps offset uint64_t timestamp = getMsgTimestamp(&msg); uint64_t timestamp_sample = getMsgTimestampSample(&msg); @@ -168,12 +162,10 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) setMsgTimestamp(&msg, timestamp); _timesync->addOffset(timestamp_sample); setMsgTimestampSample(&msg, timestamp_sample); + msg.serialize(scdr); ret = true; -@[ if topic == 'Timesync' or topic == 'timesync']@ - } -@[ end if]@ _@(topic)_sub.unlockMsg(); } diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index ad2103764e..ccf464c579 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -286,10 +286,19 @@ int main(int argc, char **argv) printf("[ micrortps_agent ]\tUsing only the localhost network...\n"); } + /** + * Set the system ID to Mission Computer, in order to identify the agent side + * + * Note: theoretically a multi-agent system is possible, but this would require + * adjustments in the way the timesync is done (would have to create a timesync + * instance per agent). Keeping it contained for a 1:1 link for now is reasonable. + */ + const uint8_t sys_id = static_cast(MicroRtps::System::MISSION_COMPUTER); + switch (_options.transport) { case options::eTransports::UART: { transport_node = std::make_unique(_options.device, _options.baudrate, _options.poll_ms, - _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); + _options.sw_flow_control, _options.hw_flow_control, sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n", _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); @@ -297,7 +306,8 @@ int main(int argc, char **argv) break; case options::eTransports::UDP: { - transport_node = std::make_unique(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug); + transport_node = std::make_unique(_options.ip, _options.recv_port, _options.send_port, + sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n", _options.ip, _options.recv_port, _options.send_port, _options.sleep_us); } diff --git a/msg/templates/urtps/microRTPS_timesync.cpp.em b/msg/templates/urtps/microRTPS_timesync.cpp.em index 7b15cd8bb2..6cb71a90bd 100644 --- a/msg/templates/urtps/microRTPS_timesync.cpp.em +++ b/msg/templates/urtps/microRTPS_timesync.cpp.em @@ -227,7 +227,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub) { - if (getMsgSysID(msg) == 1 && getMsgSeq(msg) != _last_remote_msg_seq) { + if (getMsgSeq(msg) != _last_remote_msg_seq) { _last_remote_msg_seq = getMsgSeq(msg); if (getMsgTC1(msg) > 0) { @@ -245,7 +245,6 @@ void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub) @[else]@ setMsgTimestamp(msg, getSteadyTimeUSec()); @[end if]@ - setMsgSysID(msg, 0); setMsgSeq(msg, getMsgSeq(msg) + 1); @[if ros2_distro]@ setMsgTC1(msg, getROSTimeNSec()); @@ -267,7 +266,6 @@ timesync_msg_t TimeSync::newTimesyncMsg() @[else]@ setMsgTimestamp(&msg, getSteadyTimeUSec()); @[end if]@ - setMsgSysID(&msg, 0); setMsgSeq(&msg, _last_msg_seq); setMsgTC1(&msg, 0); @[if ros2_distro]@ diff --git a/msg/templates/urtps/microRTPS_timesync.h.em b/msg/templates/urtps/microRTPS_timesync.h.em index e272cf67db..0614906aa3 100644 --- a/msg/templates/urtps/microRTPS_timesync.h.em +++ b/msg/templates/urtps/microRTPS_timesync.h.em @@ -259,13 +259,11 @@ private: /** Timesync msg Getters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp_(); } - inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id_(); } inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq_(); } inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1_(); } inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1_(); } @[elif ros2_distro]@ inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp(); } - inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id(); } inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq(); } inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1(); } inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); } @@ -282,12 +280,10 @@ private: /** Timesync msg Setters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; } @[elif ros2_distro]@ - inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; } diff --git a/msg/templates/urtps/microRTPS_transport.cpp b/msg/templates/urtps/microRTPS_transport.cpp index c4f3719f26..60b04a9ba3 100644 --- a/msg/templates/urtps/microRTPS_transport.cpp +++ b/msg/templates/urtps/microRTPS_transport.cpp @@ -80,9 +80,10 @@ uint16_t const crc16_table[256] = { 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 }; -Transport_node::Transport_node(const bool _debug): - rx_buff_pos(0), - debug(_debug) +Transport_node::Transport_node(const uint8_t sys_id, const bool debug): + _rx_buff_pos(0), + _debug(debug), + _sys_id(sys_id) { } @@ -106,15 +107,15 @@ uint16_t Transport_node::crc16(uint8_t const *buffer, size_t len) return crc; } -ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len) +ssize_t Transport_node::read(uint8_t *topic_id, char out_buffer[], size_t buffer_len) { - if (nullptr == out_buffer || nullptr == topic_ID || !fds_OK()) { + if (nullptr == out_buffer || nullptr == topic_id || !fds_OK()) { return -1; } - *topic_ID = 255; + *topic_id = 255; - ssize_t len = node_read((void *)(rx_buffer + rx_buff_pos), sizeof(rx_buffer) - rx_buff_pos); + ssize_t len = node_read((void *)(_rx_buffer + _rx_buff_pos), sizeof(_rx_buffer) - _rx_buff_pos); if (len < 0) { int errsv = errno; @@ -122,11 +123,11 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (errsv && EAGAIN != errsv && ETIMEDOUT != errsv) { #ifndef PX4_DEBUG - if (debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); } + if (_debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); } #else - if (debug) { PX4_DEBUG("Read fail %d", errsv); } + if (_debug) { PX4_DEBUG("Read fail %d", errsv); } #endif /* PX4_DEBUG */ } @@ -134,26 +135,26 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer return len; } - rx_buff_pos += len; + _rx_buff_pos += len; // We read some size_t header_size = sizeof(struct Header); // but not enough - if (rx_buff_pos < header_size) { + if (_rx_buff_pos < header_size) { return 0; } uint32_t msg_start_pos = 0; - for (msg_start_pos = 0; msg_start_pos <= rx_buff_pos - header_size; ++msg_start_pos) { - if ('>' == rx_buffer[msg_start_pos] && memcmp(rx_buffer + msg_start_pos, ">>>", 3) == 0) { + for (msg_start_pos = 0; msg_start_pos <= _rx_buff_pos - header_size; ++msg_start_pos) { + if ('>' == _rx_buffer[msg_start_pos] && memcmp(_rx_buffer + msg_start_pos, ">>>", 3) == 0) { break; } } // Start not found - if (msg_start_pos > (rx_buff_pos - header_size)) { + if (msg_start_pos > (_rx_buff_pos - header_size)) { #ifndef PX4_DEBUG if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } @@ -165,25 +166,36 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer #endif /* PX4_DEBUG */ // All we've checked so far is garbage, drop it - but save unchecked bytes - memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos); - rx_buff_pos -= msg_start_pos; + memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); + _rx_buff_pos -= msg_start_pos; return -1; } - // [>,>,>,topic_ID,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] - struct Header *header = (struct Header *)&rx_buffer[msg_start_pos]; + // [>,>,>,topic_id,sys_id,seq,payload_length_H,payload_length_L,CRCHigh,CRCLow,payloadStart, ... ,payloadEnd] + struct Header *header = (struct Header *)&_rx_buffer[msg_start_pos]; uint32_t payload_len = ((uint32_t)header->payload_len_h << 8) | header->payload_len_l; + // The received message comes from this system. Discard it. + // This might happen when: + // 1. The same UDP port is being used to send a rcv packets or + // 2. The same topic on the agent is being used for outgoing and incoming data + if (header->sys_id == _sys_id) { + // Drop the message and continue with the read buffer + memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); + _rx_buff_pos -= (msg_start_pos + 1); + return -1; + } + // The message won't fit the buffer. if (buffer_len < header_size + payload_len) { // Drop the message and continue with the read buffer - memmove(rx_buffer, rx_buffer + msg_start_pos + 1, rx_buff_pos - (msg_start_pos + 1)); - rx_buff_pos -= (msg_start_pos + 1); + memmove(_rx_buffer, _rx_buffer + msg_start_pos + 1, _rx_buff_pos - (msg_start_pos + 1)); + _rx_buff_pos -= (msg_start_pos + 1); return -EMSGSIZE; } // We do not have a complete message yet - if (msg_start_pos + header_size + payload_len > rx_buff_pos) { + if (msg_start_pos + header_size + payload_len > _rx_buff_pos) { // If there's garbage at the beginning, drop it if (msg_start_pos > 0) { #ifndef PX4_DEBUG @@ -195,15 +207,15 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); } #endif /* PX4_DEBUG */ - memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos); - rx_buff_pos -= msg_start_pos; + memmove(_rx_buffer, _rx_buffer + msg_start_pos, _rx_buff_pos - msg_start_pos); + _rx_buff_pos -= msg_start_pos; } return 0; } uint16_t read_crc = ((uint16_t)header->crc_h << 8) | header->crc_l; - uint16_t calc_crc = crc16((uint8_t *)rx_buffer + msg_start_pos + header_size, payload_len); + uint16_t calc_crc = crc16((uint8_t *)_rx_buffer + msg_start_pos + header_size, payload_len); if (read_crc != calc_crc) { #ifndef PX4_DEBUG @@ -212,27 +224,27 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer #else - if (debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } + if (_debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } #endif /* PX4_DEBUG */ // Drop garbage up just beyond the start of the message - memmove(rx_buffer, rx_buffer + (msg_start_pos + 1), rx_buff_pos); + memmove(_rx_buffer, _rx_buffer + (msg_start_pos + 1), _rx_buff_pos); // If there is a CRC error, the payload len cannot be trusted - rx_buff_pos -= (msg_start_pos + 1); + _rx_buff_pos -= (msg_start_pos + 1); len = -1; } else { // copy message to outbuffer and set other return values - memmove(out_buffer, rx_buffer + msg_start_pos + header_size, payload_len); - *topic_ID = header->topic_ID; + memmove(out_buffer, _rx_buffer + msg_start_pos + header_size, payload_len); + *topic_id = header->topic_id; len = payload_len + header_size; - // discard message from rx_buffer - rx_buff_pos -= msg_start_pos + header_size + payload_len; - memmove(rx_buffer, rx_buffer + msg_start_pos + header_size + payload_len, rx_buff_pos); + // discard message from _rx_buffer + _rx_buff_pos -= msg_start_pos + header_size + payload_len; + memmove(_rx_buffer, _rx_buffer + msg_start_pos + header_size + payload_len, _rx_buff_pos); } return len; @@ -243,18 +255,19 @@ size_t Transport_node::get_header_length() return sizeof(struct Header); } -ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length) +ssize_t Transport_node::write(const uint8_t topic_id, char buffer[], size_t length) { if (!fds_OK()) { return -1; } - static struct Header header = {{'>', '>', '>'}, 0u, 0u, 0u, 0u, 0u, 0u}; + static struct Header header = {{'>', '>', '>'}, 0u, 0u, 0u, 0u, 0u, 0u, 0u}; - // [>,>,>,topic_ID,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end] + // [>,>,>,topic_id,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end] uint16_t crc = crc16((uint8_t *)&buffer[sizeof(header)], length); - header.topic_ID = topic_ID; + header.topic_id = topic_id; + header.sys_id = _sys_id; header.seq = _seq_number++; header.payload_len_h = (length >> 8) & 0xff; header.payload_len_l = length & 0xff; @@ -273,19 +286,20 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng return len + sizeof(header); } -UART_node::UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug): - Transport_node(_debug), - uart_fd(-1), - baudrate(_baudrate), - poll_ms(_poll_ms), - hw_flow_control(_hw_flow_control), - sw_flow_control(_sw_flow_control) +UART_node::UART_node(const char *uart_name, const uint32_t baudrate, + const uint32_t poll_ms, const bool hw_flow_control, + const bool sw_flow_control, const uint8_t sys_id, + const bool debug): + Transport_node(sys_id, debug), + _uart_fd(-1), + _baudrate(baudrate), + _poll_ms(poll_ms), + _hw_flow_control(hw_flow_control), + _sw_flow_control(sw_flow_control) { - if (nullptr != _uart_name) { - strcpy(uart_name, _uart_name); + if (nullptr != uart_name) { + strcpy(_uart_name, uart_name); } } @@ -297,20 +311,20 @@ UART_node::~UART_node() int UART_node::init() { // Open a serial port - uart_fd = open(uart_name, O_RDWR | O_NOCTTY | O_NONBLOCK); + _uart_fd = open(_uart_name, O_RDWR | O_NOCTTY | O_NONBLOCK); - if (uart_fd < 0) { + if (_uart_fd < 0) { #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: Failed to open device: %s (%d)\033[0m\n", uart_name, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: Failed to open device: %s (%d)\033[0m\n", _uart_name, errno); #else - PX4_ERR("UART transport: Failed to open device: %s (%d)", uart_name, errno); + PX4_ERR("UART transport: Failed to open device: %s (%d)", _uart_name, errno); #endif /* PX4_ERR */ return -errno; } // If using shared UART, no need to set it up - if (baudrate == 0) { - return uart_fd; + if (_baudrate == 0) { + return _uart_fd; } // Try to set baud rate @@ -318,13 +332,13 @@ int UART_node::init() int termios_state; // Back up the original uart configuration to restore it after exit - if ((termios_state = tcgetattr(uart_fd, &uart_config)) < 0) { + if ((termios_state = tcgetattr(_uart_fd, &uart_config)) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state, + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", _uart_name, termios_state, errno); #else - PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", uart_name, termios_state, errno); + PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", _uart_name, termios_state, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; @@ -343,11 +357,11 @@ int UART_node::init() uart_config.c_lflag &= ~(ISIG | ICANON | ECHO | TOSTOP | IEXTEN); // Flow control - if (hw_flow_control) { + if (_hw_flow_control) { // HW flow control uart_config.c_lflag |= CRTSCTS; - } else if (sw_flow_control) { + } else if (_sw_flow_control) { // SW flow control uart_config.c_lflag |= (IXON | IXOFF | IXANY); } @@ -355,10 +369,10 @@ int UART_node::init() // Set baud rate speed_t speed; - if (!baudrate_to_speed(baudrate, &speed)) { + if (!baudrate_to_speed(_baudrate, &speed)) { #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n", - uart_name, baudrate); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported _baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n", + _uart_name, _baudrate); #else PX4_ERR("UART transport: ERR SET BAUD %s: Unsupported baudrate: %" PRIu32 "\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\n", uart_name, baudrate); @@ -370,21 +384,21 @@ int UART_node::init() if (cfsetispeed(&uart_config, speed) < 0 || cfsetospeed(&uart_config, speed) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state, + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", _uart_name, termios_state, errno); #else - PX4_ERR("ERR SET BAUD %s: %d (%d)", uart_name, termios_state, errno); + PX4_ERR("ERR SET BAUD %s: %d (%d)", _uart_name, termios_state, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; } - if ((termios_state = tcsetattr(uart_fd, TCSANOW, &uart_config)) < 0) { + if ((termios_state = tcsetattr(_uart_fd, TCSANOW, &uart_config)) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET CONF %s (%d)\033[0m\n", uart_name, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET CONF %s (%d)\033[0m\n", _uart_name, errno); #else - PX4_ERR("UART transport: ERR SET CONF %s (%d)", uart_name, errno); + PX4_ERR("UART transport: ERR SET CONF %s (%d)", _uart_name, errno); #endif /* PX4_ERR */ close(); return -errno_bkp; @@ -393,7 +407,7 @@ int UART_node::init() char aux[64]; bool flush = false; - while (0 < ::read(uart_fd, (void *)&aux, 64)) { + while (0 < ::read(_uart_fd, (void *)&aux, 64)) { flush = true; #ifndef px4_usleep usleep(1000); @@ -406,48 +420,48 @@ int UART_node::init() if (flush) { #ifndef PX4_DEBUG - if (debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); } + if (_debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); } #else - if (debug) { PX4_DEBUG("UART transport: Flush"); } + if (_debug) { PX4_DEBUG("UART transport: Flush"); } #endif /* PX4_DEBUG */ } else { #ifndef PX4_DEBUG - if (debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); } + if (_debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); } #else - if (debug) { PX4_DEBUG("UART transport: No flush"); } + if (_debug) { PX4_DEBUG("UART transport: No flush"); } #endif /* PX4_INFO */ } - poll_fd[0].fd = uart_fd; - poll_fd[0].events = POLLIN; + _poll_fd[0].fd = _uart_fd; + _poll_fd[0].events = POLLIN; - return uart_fd; + return _uart_fd; } bool UART_node::fds_OK() { - return (-1 != uart_fd); + return (-1 != _uart_fd); } uint8_t UART_node::close() { - if (-1 != uart_fd) { + if (-1 != _uart_fd) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tClosed UART.\n\033[0m"); #else PX4_WARN("Closed UART."); #endif /* PX4_WARN */ - ::close(uart_fd); - uart_fd = -1; - memset(&poll_fd, 0, sizeof(poll_fd)); + ::close(_uart_fd); + _uart_fd = -1; + memset(&_poll_fd, 0, sizeof(_poll_fd)); } return 0; @@ -460,10 +474,10 @@ ssize_t UART_node::node_read(void *buffer, size_t len) } ssize_t ret = 0; - int r = poll(poll_fd, 1, poll_ms); + int r = poll(_poll_fd, 1, _poll_ms); - if (r == 1 && (poll_fd[0].revents & POLLIN)) { - ret = ::read(uart_fd, buffer, len); + if (r == 1 && (_poll_fd[0].revents & POLLIN)) { + ret = ::read(_uart_fd, buffer, len); } return ret; @@ -475,7 +489,7 @@ ssize_t UART_node::node_write(void *buffer, size_t len) return -1; } - return ::write(uart_fd, buffer, len); + return ::write(_uart_fd, buffer, len); } bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed) @@ -577,16 +591,16 @@ bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed) return true; } -UDP_node::UDP_node(const char *_udp_ip, uint16_t _udp_port_recv, - uint16_t _udp_port_send, const bool _debug): - Transport_node(_debug), - sender_fd(-1), - receiver_fd(-1), - udp_port_recv(_udp_port_recv), - udp_port_send(_udp_port_send) +UDP_node::UDP_node(const char *udp_ip, uint16_t udp_port_recv, + uint16_t udp_port_send, const uint8_t sys_id, const bool debug): + Transport_node(sys_id, debug), + _sender_fd(-1), + _receiver_fd(-1), + _udp_port_recv(udp_port_recv), + _udp_port_send(udp_port_send) { - if (nullptr != _udp_ip) { - strcpy(udp_ip, _udp_ip); + if (nullptr != udp_ip) { + strcpy(_udp_ip, udp_ip); } } @@ -597,7 +611,7 @@ UDP_node::~UDP_node() int UDP_node::init() { - if (0 > init_receiver(udp_port_recv) || 0 > init_sender(udp_port_send)) { + if (0 > init_receiver(_udp_port_recv) || 0 > init_sender(_udp_port_send)) { return -1; } @@ -606,19 +620,19 @@ int UDP_node::init() bool UDP_node::fds_OK() { - return (-1 != sender_fd && -1 != receiver_fd); + return (-1 != _sender_fd && -1 != _receiver_fd); } int UDP_node::init_receiver(uint16_t udp_port) { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) // udp socket data - memset((char *)&receiver_inaddr, 0, sizeof(receiver_inaddr)); - receiver_inaddr.sin_family = AF_INET; - receiver_inaddr.sin_port = htons(udp_port); - receiver_inaddr.sin_addr.s_addr = htonl(INADDR_ANY); + memset((char *)&_receiver_inaddr, 0, sizeof(_receiver_inaddr)); + _receiver_inaddr.sin_family = AF_INET; + _receiver_inaddr.sin_port = htons(udp_port); + _receiver_inaddr.sin_addr.s_addr = htonl(INADDR_ANY); - if ((receiver_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + if ((_receiver_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Create socket failed\033[0m\n"); #else @@ -633,7 +647,7 @@ int UDP_node::init_receiver(uint16_t udp_port) PX4_INFO("UDP transport: Trying to connect..."); #endif /* PX4_INFO */ - if (bind(receiver_fd, (struct sockaddr *)&receiver_inaddr, sizeof(receiver_inaddr)) < 0) { + if (bind(_receiver_fd, (struct sockaddr *)&_receiver_inaddr, sizeof(_receiver_inaddr)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Bind failed\033[0m\n"); #else @@ -655,7 +669,7 @@ int UDP_node::init_sender(uint16_t udp_port) { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - if ((sender_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + if ((_sender_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: Create socket failed\033[0m\n"); #else @@ -664,11 +678,11 @@ int UDP_node::init_sender(uint16_t udp_port) return -1; } - memset((char *) &sender_outaddr, 0, sizeof(sender_outaddr)); - sender_outaddr.sin_family = AF_INET; - sender_outaddr.sin_port = htons(udp_port); + memset((char *) &_sender_outaddr, 0, sizeof(_sender_outaddr)); + _sender_outaddr.sin_family = AF_INET; + _sender_outaddr.sin_port = htons(udp_port); - if (inet_aton(udp_ip, &sender_outaddr.sin_addr) == 0) { + if (inet_aton(_udp_ip, &_sender_outaddr.sin_addr) == 0) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUDP transport: inet_aton() failed\033[0m\n"); #else @@ -686,26 +700,26 @@ uint8_t UDP_node::close() { #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - if (sender_fd != -1) { + if (_sender_fd != -1) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tUDP transport: Closed sender socket!\033[0m\n"); #else PX4_WARN("UDP transport: Closed sender socket!"); #endif /* PX4_WARN */ - shutdown(sender_fd, SHUT_RDWR); - ::close(sender_fd); - sender_fd = -1; + shutdown(_sender_fd, SHUT_RDWR); + ::close(_sender_fd); + _sender_fd = -1; } - if (receiver_fd != -1) { + if (_receiver_fd != -1) { #ifndef PX4_WARN printf("\033[1;33m[ micrortps_transport ]\tUDP transport: Closed receiver socket!\033[0m\n"); #else PX4_WARN("UDP transport: Closed receiver socket!"); #endif /* PX4_WARN */ - shutdown(receiver_fd, SHUT_RDWR); - ::close(receiver_fd); - receiver_fd = -1; + shutdown(_receiver_fd, SHUT_RDWR); + ::close(_receiver_fd); + _receiver_fd = -1; } #endif /* __PX4_NUTTX */ @@ -721,8 +735,8 @@ ssize_t UDP_node::node_read(void *buffer, size_t len) ssize_t ret = 0; #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) // Blocking call - static socklen_t addrlen = sizeof(receiver_outaddr); - ret = recvfrom(receiver_fd, buffer, len, 0, (struct sockaddr *)&receiver_outaddr, &addrlen); + static socklen_t addrlen = sizeof(_receiver_outaddr); + ret = recvfrom(_receiver_fd, buffer, len, 0, (struct sockaddr *)&_receiver_outaddr, &addrlen); #endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */ return ret; } @@ -735,7 +749,7 @@ ssize_t UDP_node::node_write(void *buffer, size_t len) ssize_t ret = 0; #if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) - ret = sendto(sender_fd, buffer, len, 0, (struct sockaddr *)&sender_outaddr, sizeof(sender_outaddr)); + ret = sendto(_sender_fd, buffer, len, 0, (struct sockaddr *)&_sender_outaddr, sizeof(_sender_outaddr)); #endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */ return ret; } diff --git a/msg/templates/urtps/microRTPS_transport.h b/msg/templates/urtps/microRTPS_transport.h index ac6351a7c3..3605b63f8c 100644 --- a/msg/templates/urtps/microRTPS_transport.h +++ b/msg/templates/urtps/microRTPS_transport.h @@ -41,19 +41,26 @@ #define BUFFER_SIZE 1024 #define DEFAULT_UART "/dev/ttyACM0" +namespace MicroRtps { + enum class System { + FMU, + MISSION_COMPUTER + }; +} + class Transport_node { public: - Transport_node(const bool _debug); + Transport_node(const uint8_t sys_id, const bool debug); virtual ~Transport_node(); virtual int init() {return 0;} virtual uint8_t close() {return 0;} - ssize_t read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len); + ssize_t read(uint8_t *topic_id, char out_buffer[], size_t buffer_len); /** * write a buffer - * @param topic_ID + * @param topic_id * @param buffer buffer to write: it must leave get_header_length() bytes free at the beginning. This will be * filled with the header. length does not include get_header_length(). So buffer looks like this: * ------------------------------------------------- @@ -63,11 +70,23 @@ public: * @param length buffer length excluding header length * @return length on success, <0 on error */ - ssize_t write(const uint8_t topic_ID, char buffer[], size_t length); + ssize_t write(const uint8_t topic_id, char buffer[], size_t length); /** Get the Length of struct Header to make headroom for the size of struct Header along with payload */ size_t get_header_length(); +private: + struct __attribute__((packed)) Header { + char marker[3]; + uint8_t topic_id; + uint8_t sys_id; + uint8_t seq; + uint8_t payload_len_h; + uint8_t payload_len_l; + uint8_t crc_h; + uint8_t crc_l; + }; + protected: virtual ssize_t node_read(void *buffer, size_t len) = 0; virtual ssize_t node_write(void *buffer, size_t len) = 0; @@ -75,30 +94,22 @@ protected: uint16_t crc16_byte(uint16_t crc, const uint8_t data); uint16_t crc16(uint8_t const *buffer, size_t len); -protected: - uint32_t rx_buff_pos; - char rx_buffer[BUFFER_SIZE] = {}; - bool debug = false; - uint8_t _seq_number{0}; + uint32_t _rx_buff_pos; + char _rx_buffer[BUFFER_SIZE]{}; -private: - struct __attribute__((packed)) Header { - char marker[3]; - uint8_t topic_ID; - uint8_t seq; - uint8_t payload_len_h; - uint8_t payload_len_l; - uint8_t crc_h; - uint8_t crc_l; - }; + bool _debug; + + uint8_t _sys_id; + uint8_t _seq_number{0}; }; class UART_node: public Transport_node { public: - UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug); + UART_node(const char *uart_name, const uint32_t baudrate, + const uint32_t poll_ms, const bool hw_flow_control, + const bool sw_flow_control, const uint8_t sys_id, + const bool debug); virtual ~UART_node(); int init(); @@ -110,20 +121,20 @@ protected: bool fds_OK(); bool baudrate_to_speed(uint32_t bauds, speed_t *speed); - int uart_fd; - char uart_name[64] = {}; - uint32_t baudrate; - uint32_t poll_ms; - bool hw_flow_control = false; - bool sw_flow_control = false; - struct pollfd poll_fd[1] = {}; + int _uart_fd; + char _uart_name[64]{}; + uint32_t _baudrate; + uint32_t _poll_ms; + bool _hw_flow_control{false}; + bool _sw_flow_control{false}; + struct pollfd _poll_fd[1]{}; }; class UDP_node: public Transport_node { public: - UDP_node(const char *_udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, - const bool _debug); + UDP_node(const char *udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, + const uint8_t sys_id, const bool debug); virtual ~UDP_node(); int init(); @@ -136,12 +147,12 @@ protected: ssize_t node_write(void *buffer, size_t len); bool fds_OK(); - int sender_fd; - int receiver_fd; - char udp_ip[16] = {}; - uint16_t udp_port_recv; - uint16_t udp_port_send; - struct sockaddr_in sender_outaddr; - struct sockaddr_in receiver_inaddr; - struct sockaddr_in receiver_outaddr; + int _sender_fd; + int _receiver_fd; + char _udp_ip[16]{}; + uint16_t _udp_port_recv; + uint16_t _udp_port_send; + struct sockaddr_in _sender_outaddr; + struct sockaddr_in _receiver_inaddr; + struct sockaddr_in _receiver_outaddr; }; diff --git a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp index 6909d50ea9..947dd1f7d5 100644 --- a/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp +++ b/src/modules/micrortps_bridge/micrortps_client/microRTPS_client_main.cpp @@ -168,12 +168,15 @@ static int micrortps_start(int argc, char *argv[]) return -1; } + // Set the system ID to FMU, in order to identify the client side + const uint8_t sys_id = static_cast(MicroRtps::System::FMU); + switch (_options.transport) { case options::eTransports::UART: { transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms, - _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); - PX4_INFO("UART transport: device: %s; baudrate: %" PRIu32 "; sleep: %" PRIu32 "ms; poll: %" PRIu32 - "ms; flow_control: %s", + _options.sw_flow_control, _options.hw_flow_control, sys_id, + _options.verbose_debug); + PX4_INFO("UART transport: device: %s; baudrate: %" PRIu32 "; sleep: %" PRIu32 "ms; flow_control: %s", _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); } @@ -181,7 +184,7 @@ static int micrortps_start(int argc, char *argv[]) case options::eTransports::UDP: { transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, - _options.verbose_debug); + sys_id, _options.verbose_debug); PX4_INFO("UDP transport: ip address: %s; recv port: %" PRIu16 "; send port: %" PRIu16 "; sleep: %" PRIu32 "us", _options.ip, _options.recv_port, _options.send_port, _options.sleep_us);