diff --git a/msg/CMakeLists.txt b/msg/CMakeLists.txt index 77c070bdca..2a6ee45ccf 100644 --- a/msg/CMakeLists.txt +++ b/msg/CMakeLists.txt @@ -128,6 +128,7 @@ set(msg_files tecs_status.msg telemetry_status.msg test_motor.msg + timesync.msg timesync_status.msg trajectory_bezier.msg trajectory_waypoint.msg diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index b1cb6a145b..d686a4c687 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -73,7 +73,6 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se std::cerr << "Failed starting @(topic) subscriber" << std::endl; return false; } - @[end for]@ std::cout << "--------------------" << std::endl << std::endl; @[end if]@ @@ -83,11 +82,13 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se @[for topic in send_topics]@ if (_@(topic)_pub.init()) { std::cout << "- @(topic) publisher started" << std::endl; +@[ if topic == 'Timesync']@ + _timesync->start(&_@(topic)_pub); +@[ end if]@ } else { std::cerr << "ERROR starting @(topic) publisher" << std::endl; return false; } - @[end for]@ std::cout << "--------------------" << std::endl; @[end if]@ @@ -118,6 +119,9 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len) eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len); eprosima::fastcdr::Cdr cdr_des(cdrbuffer); st.deserialize(cdr_des); +@[ if topic == 'Timesync']@ + _timesync->processTimesyncMsg(&st); +@[ end if]@ _@(topic)_pub.publish(&st); } break; diff --git a/msg/templates/urtps/RtpsTopics.h.em b/msg/templates/urtps/RtpsTopics.h.em index daa3cf709c..a1fc2db19b 100644 --- a/msg/templates/urtps/RtpsTopics.h.em +++ b/msg/templates/urtps/RtpsTopics.h.em @@ -57,6 +57,8 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer #include #include +#include "microRTPS_timesync.h" + @[for topic in send_topics]@ #include "@(topic)_Publisher.h" @[end for]@ @@ -67,6 +69,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer class RtpsTopics { public: bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue); + void set_timesync(const std::shared_ptr& timesync) { _timesync = timesync; }; @[if send_topics]@ void publish(uint8_t topic_ID, char data_buffer[], size_t len); @[end if]@ @@ -88,5 +91,7 @@ private: @(topic)_Subscriber _@(topic)_sub; @[end for]@ + std::shared_ptr _timesync; + @[end if]@ }; diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 8ced730e96..321e1863e0 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -70,6 +70,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer #include #include "microRTPS_transport.h" +#include "microRTPS_timesync.h" #include "RtpsTopics.h" #define BUFFER_SIZE 1024 @@ -92,6 +93,9 @@ Transport_node *transport_node = nullptr; RtpsTopics topics; uint32_t total_sent = 0, sent = 0; +// Init timesync +std::shared_ptr timeSync = std::make_shared(); + struct options { enum class eTransports { @@ -160,6 +164,7 @@ void signal_handler(int signum) printf("Interrupt signal (%d) received.\n", signum); running = 0; transport_node->close(); + timeSync->stop(); } @[if recv_topics]@ @@ -188,6 +193,7 @@ void t_send(void*) /* make room for the header to fill in later */ eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length); eprosima::fastcdr::Cdr scdr(cdrbuffer); + if (topics.getMsg(topic_ID, scdr)) { length = scdr.getSerializedDataLength(); @@ -253,6 +259,8 @@ int main(int argc, char** argv) std::chrono::time_point start, end; @[end if]@ + topics.set_timesync(timeSync); + @[if recv_topics]@ topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue); @[end if]@ @@ -300,5 +308,7 @@ int main(int argc, char** argv) delete transport_node; transport_node = nullptr; + timeSync->reset(); + return 0; } diff --git a/msg/templates/urtps/microRTPS_timesync.cpp b/msg/templates/urtps/microRTPS_timesync.cpp new file mode 100644 index 0000000000..e001ab3c91 --- /dev/null +++ b/msg/templates/urtps/microRTPS_timesync.cpp @@ -0,0 +1,180 @@ +/**************************************************************************** + * + * Copyright (c) 2020 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/*! + * @file microRTPS_timesync.cpp + * @brief source code for time sync implementation + */ + +#include +#include +#include + +#include "microRTPS_timesync.h" + +#include "Timesync_Publisher.h" +#include "Timesync_Subscriber.h" + +TimeSync::TimeSync() + : _offset_ns(-1), + _skew_ns_per_sync(0.0), + _num_samples(0), + _request_reset_counter(0), + _last_msg_seq(0) +{} + +TimeSync::~TimeSync() { stop(); } + +void TimeSync::setNewOffsetCB(std::function callback) { _notifyNewOffset = callback; } + +void TimeSync::start(const Timesync_Publisher* pub) { + stop(); + + _Timesync_pub = (*pub); + + auto run = [this]() { + while (!_request_stop) { + px4_msgs::msg::Timesync msg = newTimesyncMsg(); + + _Timesync_pub.publish(&msg); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }; + _request_stop = false; + _send_timesync_thread.reset(new std::thread(run)); +} + +void TimeSync::stop() { + _request_stop = true; + if (_send_timesync_thread && _send_timesync_thread->joinable()) _send_timesync_thread->join(); + _send_timesync_thread.reset(); +} + +void TimeSync::reset() { + _num_samples = 0; + _request_reset_counter = 0; +} + +bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) { + int64_t rtti = local_t3_ns - local_t1_ns; + + // assume rtti is evenly split both directions + int64_t remote_t3_ns = remote_t2_ns + rtti / 2ll; + + int64_t measurement_offset = remote_t3_ns - local_t3_ns; + + if (_request_reset_counter > REQUEST_RESET_COUNTER_THRESHOLD) { + reset(); + std::cout << std::endl << "Timesync clock changed, resetting" << std::endl; + } + + if (_num_samples >= WINDOW_SIZE) { + if (std::abs(measurement_offset - _offset_ns) > TRIGGER_RESET_THRESHOLD_NS) { + _request_reset_counter++; + std::cout << std::endl << "Timesync offset outlier, discarding" << std::endl; + return false; + } else { + _request_reset_counter = 0; + } + } + + if (_num_samples == 0) { + _offset_ns = measurement_offset; + _skew_ns_per_sync = 0; + } + + { + int64_t local_t2 = remote_t2_ns - _offset_ns; + int64_t time_there = local_t2 - local_t1_ns; + + int64_t remote_t3 = local_t3_ns + _offset_ns; + int64_t time_back = remote_t3 - remote_t2_ns; + + if (std::abs(time_back - time_there) > 3ll * 1000ll * 1000ll) { + std::cout << "trip there: " << time_there / 1e6f << "ms trip back: " << time_back / 1e6f + << "ms , discarding" << std::endl; + return false; + } + } + + // ignore if rtti > 10ms + if (rtti > 15ll * 1000ll * 1000ll) { + std::cout << std::endl + << "RTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms" << std::endl; + return false; + } + + double schedule = std::min((double)_num_samples / WINDOW_SIZE, 1.); + double alpha = ALPHA_INITIAL * (1. - schedule) + ALPHA_FINAL * schedule; + double beta = BETA_INTIIAL * (1. - schedule) + BETA_FINAL * schedule; + + int64_t offset_prev = _offset_ns; + _offset_ns = static_cast((_skew_ns_per_sync + _offset_ns) * (1. - alpha) + measurement_offset * alpha); + _skew_ns_per_sync = static_cast(beta * (_offset_ns - offset_prev) + (1. - beta) * _skew_ns_per_sync); + + _num_samples++; + + return true; +} + +void TimeSync::processTimesyncMsg(const px4_msgs::msg::Timesync* msg) { + if (msg->seq() == _last_msg_seq) return; + _last_msg_seq = msg->seq(); + + int64_t now_time = getSystemMonoNanos(); + + if (msg->tc1() == 0) { + px4_msgs::msg::Timesync reply = (*msg); + reply.timestamp() = getSystemTime(); + reply.seq() = _last_msg_seq; + reply.tc1() = now_time; + + _Timesync_pub.publish(&reply); + + } else if (msg->tc1() > 0) { + bool added = addMeasurement(msg->ts1(), msg->tc1(), now_time); + if (added && _notifyNewOffset) _notifyNewOffset(_offset_ns); + } +} + +px4_msgs::msg::Timesync TimeSync::newTimesyncMsg() { + px4_msgs::msg::Timesync msg{}; + msg.timestamp() = getSystemTime(); + msg.seq() = _last_msg_seq; + msg.tc1() = 0; + msg.ts1() = getSystemMonoNanos(); + + _last_msg_seq++; + + return msg; +} diff --git a/msg/templates/urtps/microRTPS_timesync.h b/msg/templates/urtps/microRTPS_timesync.h new file mode 100644 index 0000000000..25f7341a87 --- /dev/null +++ b/msg/templates/urtps/microRTPS_timesync.h @@ -0,0 +1,108 @@ +/**************************************************************************** + * + * Copyright (c) 2020 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/*! + * @file microRTPS_timesync.h + * @brief Adds time sync for the microRTPS bridge + * @author Nuno Marques + * @author Julian Kent + */ + +#pragma once + +#include +#include +#include + +#include "Timesync_Publisher.h" +#include "Timesync_Subscriber.h" + +static constexpr double ALPHA_INITIAL = 0.05; +static constexpr double ALPHA_FINAL = 0.003; +static constexpr double BETA_INTIIAL = 0.05; +static constexpr double BETA_FINAL = 0.003; +static constexpr int WINDOW_SIZE = 500; +static constexpr int64_t UNKNOWN = 0; +static constexpr int64_t TRIGGER_RESET_THRESHOLD_NS = 100ll * 1000ll * 1000ll; +static constexpr int REQUEST_RESET_COUNTER_THRESHOLD = 5; + +class TimeSync { +public: + TimeSync(); + virtual ~TimeSync(); + + void start(const Timesync_Publisher* pub); + void reset(); + void stop(); + + void setNewOffsetCB(std::function callback); + + /** + * Returns clock monotonic time in nanoseconds + */ + inline int64_t getSystemMonoNanos() { + timespec t; + clock_gettime(CLOCK_MONOTONIC_RAW, &t); + return (int64_t)t.tv_sec * 1000000000ll + t.tv_nsec; + } + + /** + * Returns system realtime in seconds + */ + inline uint64_t getSystemTime() { + timespec t; + clock_gettime(CLOCK_REALTIME, &t); + return (uint64_t)t.tv_sec + t.tv_nsec; + } + + bool addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns); + + void processTimesyncMsg(const px4_msgs::msg::Timesync* msg); + + px4_msgs::msg::Timesync newTimesyncMsg(); + +private: + int64_t _offset_ns; + int64_t _skew_ns_per_sync; + int64_t _num_samples; + + int32_t _request_reset_counter; + uint8_t _last_msg_seq; + + Timesync_Publisher _Timesync_pub; + Timesync_Subscriber _Timesync_sub; + + std::unique_ptr _send_timesync_thread; + std::atomic _request_stop{false}; + + std::function _notifyNewOffset; +}; diff --git a/msg/timesync_status.msg b/msg/timesync.msg similarity index 70% rename from msg/timesync_status.msg rename to msg/timesync.msg index 38ff31db75..754e6eb675 100644 --- a/msg/timesync_status.msg +++ b/msg/timesync.msg @@ -1,4 +1,8 @@ uint64 timestamp # time since system start (microseconds) +uint8 seq # timesync msg sequence +int64 tc1 # time sync timestamp 1 +int64 ts1 # time sync timestamp 2 + uint64 remote_timestamp # remote system timestamp (microseconds) int64 observed_offset # raw time offset directly observed from this timesync packet (microseconds) int64 estimated_offset # smoothed time offset between companion system and PX4 (microseconds) diff --git a/msg/tools/generate_microRTPS_bridge.py b/msg/tools/generate_microRTPS_bridge.py index b10f837a6c..c72d928424 100644 --- a/msg/tools/generate_microRTPS_bridge.py +++ b/msg/tools/generate_microRTPS_bridge.py @@ -416,6 +416,8 @@ def generate_agent(out_dir): shutil.rmtree(os.path.join(out_dir, "fastrtpsgen")) cp_wildcard(os.path.join(urtps_templates_dir, "microRTPS_transport.*"), agent_out_dir) + cp_wildcard(os.path.join(urtps_templates_dir, + "microRTPS_timesync.*"), agent_out_dir) if cmakelists: os.rename(os.path.join(out_dir, "microRTPS_agent_CMakeLists.txt"), os.path.join(out_dir, "CMakeLists.txt")) @@ -457,6 +459,12 @@ def generate_client(out_dir): if os.path.isfile(def_file): os.rename(def_file, def_file.replace(".cpp", ".cpp_")) def_file = os.path.join(default_client_out, "microRTPS_transport.h") + if os.path.isfile(def_file): + os.rename(def_file, def_file.replace(".cpp", ".cpp_")) + def_file = os.path.join(default_client_out, "microRTPS_timesync.cpp") + if os.path.isfile(def_file): + os.rename(def_file, def_file.replace(".cpp", ".cpp_")) + def_file = os.path.join(default_client_out, "microRTPS_timesync.h") if os.path.isfile(def_file): os.rename(def_file, def_file.replace(".h", ".h_")) @@ -466,6 +474,8 @@ def generate_client(out_dir): # Final steps to install client cp_wildcard(os.path.join(urtps_templates_dir, "microRTPS_transport.*"), out_dir) + cp_wildcard(os.path.join(urtps_templates_dir, + "microRTPS_timesync.*"), out_dir) return 0 diff --git a/msg/tools/uorb_rtps_message_ids.yaml b/msg/tools/uorb_rtps_message_ids.yaml index daaacff6b9..f8f4197c92 100644 --- a/msg/tools/uorb_rtps_message_ids.yaml +++ b/msg/tools/uorb_rtps_message_ids.yaml @@ -177,8 +177,10 @@ rtps: id: 76 - msg: test_motor id: 77 - - msg: timesync_status + - msg: timesync id: 78 + receive: true + send: true - msg: trajectory_waypoint id: 79 receive: true diff --git a/src/modules/mavlink/mavlink_timesync.cpp b/src/modules/mavlink/mavlink_timesync.cpp index 48714ec16f..501726c6c6 100644 --- a/src/modules/mavlink/mavlink_timesync.cpp +++ b/src/modules/mavlink/mavlink_timesync.cpp @@ -138,7 +138,7 @@ MavlinkTimesync::handle_message(const mavlink_message_t *msg) } // Publish status message - timesync_status_s tsync_status{}; + timesync_s tsync_status{}; tsync_status.timestamp = hrt_absolute_time(); tsync_status.remote_timestamp = tsync.tc1 / 1000ULL; @@ -146,7 +146,7 @@ MavlinkTimesync::handle_message(const mavlink_message_t *msg) tsync_status.estimated_offset = (int64_t)_time_offset; tsync_status.round_trip_time = rtt_us; - _timesync_status_pub.publish(tsync_status); + _timesync_pub.publish(tsync_status); } break; diff --git a/src/modules/mavlink/mavlink_timesync.h b/src/modules/mavlink/mavlink_timesync.h index 3204696f42..bdb0c11ee4 100644 --- a/src/modules/mavlink/mavlink_timesync.h +++ b/src/modules/mavlink/mavlink_timesync.h @@ -43,7 +43,7 @@ #include "mavlink_bridge_header.h" #include -#include +#include #include @@ -132,7 +132,7 @@ protected: */ void reset_filter(); - uORB::PublicationMulti _timesync_status_pub{ORB_ID(timesync_status)}; + uORB::PublicationMulti _timesync_pub{ORB_ID(timesync)}; uint32_t _sequence{0};