From 2231b0063743aab06b09aa4888db42d444284baf Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Tue, 26 May 2015 20:04:59 +0300 Subject: [PATCH] Raft active state extension removed --- .../server/220.AppendEntries.uavcan | 17 +- .../distributed/cluster_manager.hpp | 18 -- .../distributed/raft_core.hpp | 200 +++++------------- .../distributed/server.hpp | 11 +- .../distributed/server.cpp | 19 +- .../apps/uavcan_dynamic_node_id_server.cpp | 10 +- 6 files changed, 87 insertions(+), 188 deletions(-) diff --git a/dsdl/uavcan/protocol/dynamic_node_id/server/220.AppendEntries.uavcan b/dsdl/uavcan/protocol/dynamic_node_id/server/220.AppendEntries.uavcan index 77c32edf59..e3009286d2 100644 --- a/dsdl/uavcan/protocol/dynamic_node_id/server/220.AppendEntries.uavcan +++ b/dsdl/uavcan/protocol/dynamic_node_id/server/220.AppendEntries.uavcan @@ -3,16 +3,25 @@ # Please refer to the specification for details. # -uint16 DEFAULT_REQUEST_TIMEOUT_MS = 200 - -uint16 DEFAULT_BASE_ELECTION_TIMEOUT_MS = 1600 # request timeout * (max cluster size - 1) * 2 requests +# +# Given min election timeout and cluster size, the maximum request interval can be derived as follows: +# max request interval = (min election timeout) / 2 requests / (cluster size - 1) +# Obviously, request interval can be lower than that if needed, but not higher. +# +# Real timeout is randomized in the range (MIN, MAX]. +# +uint16 MIN_ELECTION_TIMEOUT_MS = 4000 +uint16 MAX_ELECTION_TIMEOUT_MS = 6000 uint32 term uint32 prev_log_term uint8 prev_log_index uint8 leader_commit -# Full replication: 127 requests * 0.2 sec interval * 4 followers * 2 trips of next_index value ~ 3.5 min +# +# Worst-case replication time can be computed as: +# worst replication time = (127 log entries) * (2 trips of next_index) * (cluster size - 1) * (request interval) +# Entry[<=1] entries --- diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp index f237b19436..0fa18fc17d 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp @@ -68,8 +68,6 @@ private: uint8_t cluster_size_; uint8_t num_known_servers_; - bool had_discovery_activity_; - static IStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; } INode& getNode() { return discovery_sub_.getNode(); } @@ -151,8 +149,6 @@ private: return; } - had_discovery_activity_ = true; - /* * Updating the set of known servers */ @@ -204,7 +200,6 @@ public: , discovery_pub_(node) , cluster_size_(0) , num_known_servers_(0) - , had_discovery_activity_(false) { } /** @@ -417,19 +412,6 @@ public: } } - /** - * This method returns true if there was at least one Discovery message received since last call. - */ - bool hadDiscoveryActivity() - { - if (had_discovery_activity_) - { - had_discovery_activity_ = false; - return true; - } - return false; - } - /** * Number of known servers can only grow, and it never exceeds the cluster size value. * This number does not include the local server. diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp index 18ba5f8c52..5b776553c8 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp @@ -5,6 +5,7 @@ #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED +#include #include #include #include @@ -51,26 +52,14 @@ public: * It does not implement client-server interaction at all; instead it just exposes a public method for adding * allocation entries. * + * Note that this class uses std::rand(), so the RNG must be properly seeded by the application. + * * Activity registration: * - persistent state update error * - switch to candidate (this defines timeout between reelections) * - newer term in response (also switch to follower) * - append entries request with term >= currentTerm * - vote granted - * - * Active state switch logic: - * Activation (this is default state): - * - vote request - * - allocation request at any stage - * - only if leader: - * - discovery activity detected - * - log is not fully replicated or there are uncommitted entries - * - * Deactivation: - * - switch to follower state - * - persistent state update error - * - only if leader: - * - all log entries are fully replicated and committed */ class RaftCore : private TimerBase { @@ -111,8 +100,7 @@ private: /* * Constants */ - const MonotonicDuration update_interval_; ///< AE requests will be issued at this rate - const MonotonicDuration base_activity_timeout_; + enum { MaxNumFollowers = ClusterManager::MaxClusterSize - 1 }; IEventTracer& tracer_; IRaftLeaderMonitor& leader_monitor_; @@ -125,7 +113,7 @@ private: Log::Index commit_index_; MonotonicTime last_activity_timestamp_; - bool active_mode_; + MonotonicDuration randomized_activity_timeout_; ServerState server_state_; uint8_t next_server_index_; ///< Next server to query AE from @@ -139,9 +127,7 @@ private: ServiceServer append_entries_srv_; ServiceClient append_entries_client_; ServiceServer request_vote_srv_; - - enum { NumRequestVoteCalls = ClusterManager::MaxClusterSize - 1 }; - ServiceClient request_vote_client_; + ServiceClient request_vote_client_; /* * Methods @@ -167,19 +153,33 @@ private: UAVCAN_ASSERT(num_votes_received_in_this_campaign_ <= cluster_.getClusterSize()); // Transport + UAVCAN_ASSERT(append_entries_client_.getNumPendingCalls() <= 1); + UAVCAN_ASSERT(request_vote_client_.getNumPendingCalls() <= cluster_.getNumKnownServers()); UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !append_entries_client_.hasPendingCalls()); UAVCAN_ASSERT(server_state_ != ServerStateLeader || !request_vote_client_.hasPendingCalls()); UAVCAN_ASSERT(server_state_ != ServerStateFollower || (!append_entries_client_.hasPendingCalls() && !request_vote_client_.hasPendingCalls())); } - void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); } + void registerActivity() + { + last_activity_timestamp_ = getNode().getMonotonicTime(); + + static const int32_t randomization_range_msec = + AppendEntries::Request::MAX_ELECTION_TIMEOUT_MS - AppendEntries::Request::MIN_ELECTION_TIMEOUT_MS; + + const int32_t random_msec = (std::rand() % randomization_range_msec) + 1; + + randomized_activity_timeout_ = + MonotonicDuration::fromMSec(AppendEntries::Request::MIN_ELECTION_TIMEOUT_MS + random_msec); + + UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() > AppendEntries::Request::MIN_ELECTION_TIMEOUT_MS); + UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() <= AppendEntries::Request::MAX_ELECTION_TIMEOUT_MS); + } bool isActivityTimedOut() const { - const int multiplier = static_cast(getNode().getNodeID().get()) - 1; - const MonotonicDuration activity_timeout = base_activity_timeout_ + update_interval_ * multiplier; - return getNode().getMonotonicTime() > (last_activity_timestamp_ + activity_timeout); + return getNode().getMonotonicTime() > (last_activity_timestamp_ + randomized_activity_timeout_); } void handlePersistentStateUpdateError(int error) @@ -187,13 +187,12 @@ private: UAVCAN_ASSERT(error < 0); trace(TraceRaftPersistStateUpdateError, error); switchState(ServerStateFollower); - setActiveMode(false); // Goodnight sweet prince registerActivity(); // Deferring reelections } void updateFollower() { - if (active_mode_ && isActivityTimedOut()) + if (isActivityTimedOut()) { switchState(ServerStateCandidate); registerActivity(); @@ -202,8 +201,6 @@ private: void updateCandidate() { - UAVCAN_ASSERT(active_mode_); - if (num_votes_received_in_this_campaign_ > 0) { trace(TraceRaftElectionComplete, num_votes_received_in_this_campaign_); @@ -238,7 +235,7 @@ private: req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term; req.term = persistent_state_.getCurrentTerm(); - for (uint8_t i = 0; i < NumRequestVoteCalls; i++) + for (uint8_t i = 0; i < MaxNumFollowers; i++) { const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i); if (!node_id.isUnicast()) @@ -261,17 +258,12 @@ private: void updateLeader() { - if (cluster_.getClusterSize() == 1) - { - setActiveMode(false); // Haha - } - if (append_entries_client_.hasPendingCalls()) { append_entries_client_.cancelAllCalls(); // Refer to the response callback to learn why } - if (active_mode_ || (next_server_index_ > 0)) + if (cluster_.getClusterSize() > 1) { const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_); UAVCAN_ASSERT(node_id.isUnicast()); @@ -364,18 +356,6 @@ private: } } - void setActiveMode(bool new_active) - { - if (active_mode_ != new_active) - { - UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "Active switch: %d --> %d", - int(active_mode_), int(new_active)); - trace(TraceRaftActiveSwitch, new_active); - - active_mode_ = new_active; - } - } - void tryIncrementCurrentTermFromResponse(Term new_term) { trace(TraceRaftNewerTermInResponse, new_term); @@ -386,7 +366,6 @@ private: } registerActivity(); // Deferring future elections switchState(ServerStateFollower); - setActiveMode(false); } void propagateCommitIndex() @@ -395,63 +374,12 @@ private: UAVCAN_ASSERT(server_state_ == ServerStateLeader); UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex()); - if (commit_index_ == persistent_state_.getLog().getLastIndex()) - { - /* - * All local entries are committed. - * Deciding if it is safe to go into passive mode now. - * - * We can go into passive mode if the log is known to be fully replicated and all entries are committed. - * The high-level conditions above are guaranteed to be met if all of the following lower-level conditions - * are met: - * - All local entries are committed (already checked here). - * - Match index on all nodes equals local commit index. - * - Next index on all nodes is strictly greater than the local commit index. - * - * The following code checks if the last two conditions are met. - */ - bool match_index_equals_commit_index = true; - bool next_index_greater_than_commit_index = true; - - for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++) - { - const NodeID server_node_id = cluster_.getRemoteServerNodeIDAtIndex(i); - - const Log::Index match_index = cluster_.getServerMatchIndex(server_node_id); - if (match_index != commit_index_) - { - match_index_equals_commit_index = false; - break; - } - - const Log::Index next_index = cluster_.getServerNextIndex(server_node_id); - if (next_index <= commit_index_) - { - next_index_greater_than_commit_index = false; - break; - } - } - - /* - * Now we know whether the log is replicated and whether all entries are committed, so we can make - * the decision. Remember that since we ended up in this branch, it is already known that all local - * log entries are committed. - */ - const bool all_done = - match_index_equals_commit_index && - next_index_greater_than_commit_index && - cluster_.isClusterDiscovered(); - - setActiveMode(!all_done); - } - else + if (commit_index_ < persistent_state_.getLog().getLastIndex()) { /* * Not all local entries are committed. * Deciding if it is safe to increment commit index. */ - setActiveMode(true); - uint8_t num_nodes_with_next_log_entry_available = 1; // Local node for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++) { @@ -536,7 +464,6 @@ private: registerActivity(); switchState(ServerStateFollower); - setActiveMode(false); /* * Step 2 @@ -613,14 +540,6 @@ private: if (!result.isSuccessful()) { - /* - * This callback WILL NEVER be invoked by timeout, because: - * - Any pending request will be cancelled on the next update of the Leader. - * - When the state switches (i.e. the node is not Leader anymore), all pending calls will be cancelled. - * Also note that 'pending_append_entries_fields_' invalidates after every update of the Leader, so - * if there were timeout callbacks, they would be using outdated state. - */ - UAVCAN_ASSERT(0); return; } @@ -664,8 +583,6 @@ private: UAVCAN_ASSERT(response.isResponseEnabled()); // This is default - setActiveMode(true); - /* * Checking if our current state is up to date. * The request will be ignored if persistent state cannot be updated. @@ -732,12 +649,6 @@ private: if (!result.isSuccessful()) { - /* - * This callback WILL NEVER be invoked by timeout, because all pending calls will be cancelled on - * state switch, which ALWAYS happens 100ms after elections (either to Follower or to Leader, depending - * on the number of votes collected). - */ - UAVCAN_ASSERT(0); return; } @@ -762,11 +673,6 @@ private: { checkInvariants(); - if (cluster_.hadDiscoveryActivity() && isLeader()) - { - setActiveMode(true); - } - switch (server_state_) { case ServerStateFollower: @@ -796,21 +702,15 @@ public: RaftCore(INode& node, IStorageBackend& storage, IEventTracer& tracer, - IRaftLeaderMonitor& leader_monitor, - MonotonicDuration update_interval = - MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_REQUEST_TIMEOUT_MS), - MonotonicDuration base_activity_timeout = - MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_BASE_ELECTION_TIMEOUT_MS)) + IRaftLeaderMonitor& leader_monitor) : TimerBase(node) - , update_interval_(update_interval) - , base_activity_timeout_(base_activity_timeout) , tracer_(tracer) , leader_monitor_(leader_monitor) , persistent_state_(storage, tracer) , cluster_(node, storage, persistent_state_.getLog(), tracer) , commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero , last_activity_timestamp_(node.getMonotonicTime()) - , active_mode_(true) + , randomized_activity_timeout_(MonotonicDuration::fromMSec(AppendEntries::Request::MAX_ELECTION_TIMEOUT_MS)) , server_state_(ServerStateFollower) , next_server_index_(0) , num_votes_received_in_this_campaign_(0) @@ -831,13 +731,13 @@ public: /* * Initializing state variables */ - last_activity_timestamp_ = getNode().getMonotonicTime() + update_interval_; - active_mode_ = true; server_state_ = ServerStateFollower; next_server_index_ = 0; num_votes_received_in_this_campaign_ = 0; commit_index_ = 0; + registerActivity(); + /* * Initializing internals */ @@ -872,7 +772,6 @@ public: } append_entries_client_.setCallback(AppendEntriesResponseCallback(this, &RaftCore::handleAppendEntriesResponse)); - append_entries_client_.setRequestTimeout(update_interval_); res = request_vote_client_.init(); if (res < 0) @@ -880,24 +779,34 @@ public: return res; } request_vote_client_.setCallback(RequestVoteResponseCallback(this, &RaftCore::handleRequestVoteResponse)); - request_vote_client_.setRequestTimeout(update_interval_); - startPeriodic(update_interval_); + /* + * Initializing timing constants + * Refer to the specification for the formula + */ + const uint8_t num_followers = static_cast(cluster_.getClusterSize() - 1); - trace(TraceRaftCoreInited, update_interval_.toUSec()); + const MonotonicDuration update_interval = + MonotonicDuration::fromMSec(AppendEntries::Request::MIN_ELECTION_TIMEOUT_MS / + 2 / max(static_cast(3), num_followers)); + + UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", + "Update interval: %ld msec", static_cast(update_interval.toMSec())); + + append_entries_client_.setRequestTimeout(min(append_entries_client_.getDefaultRequestTimeout(), + update_interval)); + + request_vote_client_.setRequestTimeout(min(request_vote_client_.getDefaultRequestTimeout(), + update_interval)); + + startPeriodic(update_interval); + + trace(TraceRaftCoreInited, update_interval.toUSec()); UAVCAN_ASSERT(res >= 0); return 0; } - /** - * Normally should be called when there's allocation activity on the bus. - */ - void forceActiveMode() - { - setActiveMode(true); // If the current state was Follower, active mode may be toggling quickly for some time - } - /** * This function is mostly needed for testing. */ @@ -994,8 +903,9 @@ public: const PersistentState& getPersistentState() const { return persistent_state_; } const ClusterManager& getClusterManager() const { return cluster_; } MonotonicTime getLastActivityTimestamp() const { return last_activity_timestamp_; } - bool isInActiveMode() const { return active_mode_; } ServerState getServerState() const { return server_state_; } + MonotonicDuration getUpdateInterval() const { return getPeriod(); } + MonotonicDuration getRandomizedTimeout() const { return randomized_activity_timeout_; } }; } diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/server.hpp index 03c4ede47c..31b709d441 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_server/distributed/server.hpp @@ -135,12 +135,9 @@ class UAVCAN_EXPORT Server : IAllocationRequestHandler } } - virtual void handleAllocationActivityDetection(const ReceivedDataStructure& msg) + virtual void handleAllocationActivityDetection(const ReceivedDataStructure&) { - if (msg.isAnonymousTransfer()) - { - raft_core_.forceActiveMode(); - } + // TODO: remove this method } /* @@ -315,7 +312,6 @@ struct StateReport uint8_t cluster_size; RaftCore::ServerState state; - bool is_active; Log::Index last_log_index; Log::Index commit_index; @@ -326,6 +322,7 @@ struct StateReport NodeID voted_for; MonotonicTime last_activity_timestamp; + MonotonicDuration randomized_timeout; uint8_t num_unknown_nodes; @@ -344,13 +341,13 @@ struct StateReport StateReport(const Server& s) : cluster_size (s.getRaftCore().getClusterManager().getClusterSize()) , state (s.getRaftCore().getServerState()) - , is_active (s.getRaftCore().isInActiveMode()) , last_log_index (s.getRaftCore().getPersistentState().getLog().getLastIndex()) , commit_index (s.getRaftCore().getCommitIndex()) , last_log_term (0) // See below , current_term (s.getRaftCore().getPersistentState().getCurrentTerm()) , voted_for (s.getRaftCore().getPersistentState().getVotedFor()) , last_activity_timestamp(s.getRaftCore().getLastActivityTimestamp()) + , randomized_timeout (s.getRaftCore().getRandomizedTimeout()) , num_unknown_nodes (s.getNodeDiscoverer().getNumUnknownNodes()) { const Entry* const e = s.getRaftCore().getPersistentState().getLog().getEntryAtIndex(last_log_index); diff --git a/libuavcan/test/protocol/dynamic_node_id_server/distributed/server.cpp b/libuavcan/test/protocol/dynamic_node_id_server/distributed/server.cpp index eac00afa31..cd4a67d393 100644 --- a/libuavcan/test/protocol/dynamic_node_id_server/distributed/server.cpp +++ b/libuavcan/test/protocol/dynamic_node_id_server/distributed/server.cpp @@ -68,11 +68,10 @@ TEST(dynamic_node_id_server_RaftCore, Basic) /* * Running and trying not to fall */ - nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000)); + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(9000)); - // The one with lower node ID must become a leader - ASSERT_TRUE(raft_a->isLeader()); - ASSERT_FALSE(raft_b->isLeader()); + // Either must become a leader + ASSERT_TRUE(raft_a->isLeader() || raft_b->isLeader()); ASSERT_EQ(0, raft_a->getCommitIndex()); ASSERT_EQ(0, raft_b->getCommitIndex()); @@ -83,19 +82,19 @@ TEST(dynamic_node_id_server_RaftCore, Basic) Entry::FieldTypes::unique_id unique_id; uavcan::fill_n(unique_id.begin(), 16, uint8_t(0xAA)); - raft_a->appendLog(unique_id, uavcan::NodeID(1)); + (raft_a->isLeader() ? raft_a : raft_b)->appendLog(unique_id, uavcan::NodeID(1)); - nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000)); + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(6000)); ASSERT_EQ(1, raft_a->getCommitIndex()); ASSERT_EQ(1, raft_b->getCommitIndex()); /* - * Terminating the leader - the Follower will continue to sleep + * Terminating the leader */ raft_a.reset(); - nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000)); + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(6000)); /* * Reinitializing the leader - current Follower will become the new Leader @@ -106,7 +105,7 @@ TEST(dynamic_node_id_server_RaftCore, Basic) ASSERT_LE(0, raft_a->init(2)); ASSERT_EQ(0, raft_a->getCommitIndex()); - nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000)); + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(9000)); ASSERT_FALSE(raft_a->isLeader()); ASSERT_TRUE(raft_b->isLeader()); @@ -166,7 +165,7 @@ TEST(dynamic_node_id_server_Server, Basic) /* * Fire */ - nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(6000)); + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(9000)); ASSERT_TRUE(client.isAllocationComplete()); ASSERT_EQ(PreferredNodeID, client.getAllocatedNodeID()); diff --git a/libuavcan_drivers/linux/apps/uavcan_dynamic_node_id_server.cpp b/libuavcan_drivers/linux/apps/uavcan_dynamic_node_id_server.cpp index 4b2d13f1de..03372ec396 100644 --- a/libuavcan_drivers/linux/apps/uavcan_dynamic_node_id_server.cpp +++ b/libuavcan_drivers/linux/apps/uavcan_dynamic_node_id_server.cpp @@ -360,10 +360,6 @@ void redraw(const uavcan_linux::NodePtr& node, (report.state == RaftCore::ServerStateLeader) ? CLIColor::Green : CLIColor::Default); - render_top_str("Mode", - report.is_active ? "Active" : "Passive", - colorize_if(report.is_active, CLIColor::Magenta)); - render_top_int("Last log index", report.last_log_index, CLIColor::Default); @@ -388,6 +384,10 @@ void redraw(const uavcan_linux::NodePtr& node, duration_to_string(time_since_last_activity).c_str(), CLIColor::Default); + render_top_str("Random timeout", + duration_to_string(report.randomized_timeout).c_str(), + CLIColor::Default); + render_top_int("Unknown nodes", report.num_unknown_nodes, colorize_if(report.num_unknown_nodes != 0, CLIColor::Magenta)); @@ -630,6 +630,8 @@ int main(int argc, const char** argv) { try { + std::srand(std::time(nullptr)); + if (isatty(STDOUT_FILENO) != 1) { std::cerr << "This application cannot run if stdout is not associated with a terminal" << std::endl;