From e251c64c5f50e23f3198c951cb282374255fe97e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beat=20K=C3=BCng?= Date: Wed, 1 Mar 2017 14:38:22 +0100 Subject: [PATCH] refactor replay: add some overrideable methods --- src/modules/replay/replay.hpp | 53 +++++++++++++++++++++++++----- src/modules/replay/replay_main.cpp | 39 ++++++++++++++++------ 2 files changed, 73 insertions(+), 19 deletions(-) diff --git a/src/modules/replay/replay.hpp b/src/modules/replay/replay.hpp index 0f96b8828b..8caaa72520 100644 --- a/src/modules/replay/replay.hpp +++ b/src/modules/replay/replay.hpp @@ -59,7 +59,7 @@ public: Replay(); /// Destructor, also waits for task exit - ~Replay(); + virtual ~Replay(); /** * Start task. @@ -82,15 +82,8 @@ public: static void setupReplayFile(const char *file_name); static bool isSetup() { return _replay_file; } -private: - bool _task_should_exit = false; - std::set _overridden_params; - std::map _file_formats; ///< all formats we read from the file - uint64_t _file_start_time; - uint64_t _replay_start_time; - std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message - std::vector _read_buffer; +protected: struct Subscription { @@ -102,6 +95,48 @@ private: std::streampos next_read_pos; uint64_t next_timestamp; ///< timestamp of the file }; + + /** + * publish an orb topic + * @param sub + * @param data + * @return true if published, false otherwise + */ + bool publishTopic(Subscription &sub, void *data); + + /** + * called when entering the main replay loop + */ + virtual void onEnterMainLoop() {}; + /** + * called when exiting the main replay loop + */ + virtual void onExitMainLoop() {}; + + /** + * handle delay until topic can be published. + * @param next_file_timestamp timestamp of next message to publish + * @param timestamp_offset offset between file start time and replay start time + * @return timestamp that the message to publish should have + */ + virtual uint64_t handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset); + + /** + * handle the publication of a topic update + * @return true if published, false otherwise + */ + virtual bool handleTopicUpdate(Subscription &sub, void *data); + +private: + bool _task_should_exit = false; + std::set _overridden_params; + std::map _file_formats; ///< all formats we read from the file + + uint64_t _file_start_time; + uint64_t _replay_start_time; + std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message + std::vector _read_buffer; + std::vector _subscriptions; /** keep track of file position to avoid adding a subscription multiple times. */ diff --git a/src/modules/replay/replay_main.cpp b/src/modules/replay/replay_main.cpp index 6a3d8fb941..8291927358 100644 --- a/src/modules/replay/replay_main.cpp +++ b/src/modules/replay/replay_main.cpp @@ -627,6 +627,8 @@ void Replay::task_main() return; } + onEnterMainLoop(); + _replay_start_time = hrt_absolute_time(); PX4_INFO("Replay in progress..."); @@ -687,13 +689,8 @@ void Replay::task_main() last_additional_message_pos = next_additional_message_pos; - //wait if necessary - const uint64_t publish_timestamp = next_file_time + timestamp_offset; - uint64_t cur_time = hrt_absolute_time(); - // if some topics have a timestamp smaller than the log file start, publish them immediately - if (cur_time < publish_timestamp && next_file_time > _file_start_time) { - usleep(publish_timestamp - cur_time); - } + const uint64_t publish_timestamp = handleTopicDelay(next_file_time, timestamp_offset); + //It's time to publish const size_t msg_read_size = sub.orb_meta->o_size_no_padding; @@ -703,11 +700,10 @@ void Replay::task_main() replay_file.read((char *)_read_buffer.data(), msg_read_size); *(uint64_t *)(_read_buffer.data() + sub.timestamp_offset) = publish_timestamp; - if (publishTopic(sub, _read_buffer.data())) { + if (handleTopicUpdate(sub, _read_buffer.data())) { ++nr_published_messages; } - nextDataMessage(replay_file, _subscriptions[next_msg_id], next_msg_id); //TODO: output status (eg. every sec), including total duration... @@ -726,6 +722,29 @@ void Replay::task_main() //TODO: should we close the log file & exit (optionally, by adding a parameter -q) ? } + + onExitMainLoop(); +} + +bool Replay::handleTopicUpdate(Subscription &sub, void *data) +{ + return publishTopic(sub, data); +} + +uint64_t Replay::handleTopicDelay(uint64_t next_file_time, uint64_t timestamp_offset) +{ + + const uint64_t publish_timestamp = next_file_time + timestamp_offset; + + //wait if necessary + uint64_t cur_time = hrt_absolute_time(); + + // if some topics have a timestamp smaller than the log file start, publish them immediately + if (cur_time < publish_timestamp && next_file_time > _file_start_time) { + usleep(publish_timestamp - cur_time); + } + + return publish_timestamp; } bool Replay::publishTopic(Subscription &sub, void *data) @@ -875,7 +894,7 @@ int replay_main(int argc, char *argv[]) return 1; } - if (PX4_OK != replay::instance->start(quiet, apply_params_only)) { + if (PX4_OK != Replay::start(quiet, apply_params_only)) { PX4_ERR("start failed"); return 1; }