From 6985c72dd32d27ba489482c79e6d7df2aada7329 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Fri, 8 May 2015 13:30:55 +0300 Subject: [PATCH] Follower and candidate logic implemented --- .../server/221.RequestVote.uavcan | 2 +- .../dynamic_node_id_allocation_server.hpp | 26 ++-- .../uc_dynamic_node_id_allocation_server.cpp | 111 +++++++++++++++--- 3 files changed, 113 insertions(+), 26 deletions(-) diff --git a/dsdl/uavcan/protocol/dynamic_node_id/server/221.RequestVote.uavcan b/dsdl/uavcan/protocol/dynamic_node_id/server/221.RequestVote.uavcan index 958be2612e..113c6ee3d2 100644 --- a/dsdl/uavcan/protocol/dynamic_node_id/server/221.RequestVote.uavcan +++ b/dsdl/uavcan/protocol/dynamic_node_id/server/221.RequestVote.uavcan @@ -4,8 +4,8 @@ # uint32 term +uint32 last_log_term uint8 last_log_index -uint8 last_log_term --- diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index 4f1e046a0a..1fa392ec46 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -134,6 +134,7 @@ enum TraceEvent TraceRaftVoteRequestReceived, // node ID of the client TraceRaftVoteRequestSucceeded, // node ID of the server // 20 + TraceRaftVoteRequestInitiation, // node ID of the server TraceRaftPersistStateUpdateError, // negative error code TraceRaftCommitIndexUpdate, // new commit index value TraceRaftNewerTermInResponse, // new term value @@ -301,6 +302,10 @@ public: */ class ClusterManager : private TimerBase { +public: + enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize }; + +private: typedef MethodBinder&)> @@ -320,8 +325,6 @@ class ClusterManager : private TimerBase void resetIndices(const Log& log); }; - enum { MaxServers = Discovery::FieldTypes::known_nodes::MaxSize }; - IDynamicNodeIDStorageBackend& storage_; IDynamicNodeIDAllocationServerEventTracer& tracer_; const Log& log_; @@ -329,7 +332,7 @@ class ClusterManager : private TimerBase Subscriber discovery_sub_; mutable Publisher discovery_pub_; - Server servers_[MaxServers - 1]; ///< Minus one because the local server is not listed there. + Server servers_[MaxClusterSize - 1]; ///< Minus one because the local server is not listed there. uint8_t cluster_size_; uint8_t num_known_servers_; @@ -504,8 +507,11 @@ class RaftCore : private TimerBase */ ServiceServer append_entries_srv_; ServiceClient append_entries_client_; - ServiceServer request_vote_srv_; - ServiceClient request_vote_client_; + ServiceServer request_vote_srv_; + + enum { NumRequestVoteClients = ClusterManager::MaxClusterSize - 1 }; + LazyConstructor > + request_vote_clients_[NumRequestVoteClients]; void trace(TraceEvent event, int64_t argument) { tracer_.onEvent(event, argument); } @@ -515,6 +521,8 @@ class RaftCore : private TimerBase void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); } bool isActivityTimedOut() const; + void handlePersistentStateUpdateError(int error); + void updateFollower(); void updateCandidate(); void updateLeader(); @@ -559,8 +567,12 @@ public: , append_entries_srv_(node) , append_entries_client_(node) , request_vote_srv_(node) - , request_vote_client_(node) - { } + { + for (uint8_t i = 0; i < NumRequestVoteClients; i++) + { + request_vote_clients_[i].construct(node); + } + } /** * Once started, the logic runs in the background until destructor is called. diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index 3a913c9058..c8a7ba4410 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -588,7 +588,7 @@ const ClusterManager::Server* ClusterManager::findServer(NodeID node_id) const void ClusterManager::addServer(NodeID node_id) { - UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxServers - 2)); + UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxClusterSize - 2)); if (!isKnownServer(node_id) && node_id.isUnicast()) { tracer_.onEvent(TraceNewServerDiscovered, node_id.get()); @@ -715,7 +715,7 @@ int ClusterManager::init(const uint8_t init_cluster_size) "Cluster size is neither configured nor stored in the storage"); return res; } - if ((value == 0) || (value > MaxServers)) + if ((value == 0) || (value > MaxClusterSize)) { UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Cluster size is invalid"); return -ErrFailure; @@ -724,7 +724,7 @@ int ClusterManager::init(const uint8_t init_cluster_size) } else { - if ((init_cluster_size == 0) || (init_cluster_size > MaxServers)) + if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize)) { return -ErrInvalidParam; } @@ -744,7 +744,7 @@ int ClusterManager::init(const uint8_t init_cluster_size) tracer_.onEvent(TraceClusterSizeInited, cluster_size_); UAVCAN_ASSERT(cluster_size_ > 0); - UAVCAN_ASSERT(cluster_size_ <= MaxServers); + UAVCAN_ASSERT(cluster_size_ <= MaxClusterSize); /* * Initializing pub/sub and timer @@ -880,12 +880,81 @@ bool RaftCore::isActivityTimedOut() const return getNode().getMonotonicTime() > (last_activity_timestamp_ + activity_timeout); } +void RaftCore::handlePersistentStateUpdateError(int error) +{ + UAVCAN_ASSERT(error < 0); + trace(TraceRaftPersistStateUpdateError, error); + switchState(ServerStateFollower); + setActiveMode(false); // Goodnight sweet prince + registerActivity(); // Deferring reelections +} + void RaftCore::updateFollower() { + if (active_mode_ && isActivityTimedOut()) + { + switchState(ServerStateCandidate); + setActiveMode(true); + registerActivity(); + } } void RaftCore::updateCandidate() { + if (num_votes_received_in_this_campaign_ > 0) + { + const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize(); + + UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Election complete, won: %d", int(won)); + + switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader + setActiveMode(true); + } + else + { + // Set votedFor, abort on failure + int res = persistent_state_.setVotedFor(getNode().getNodeID()); + if (res < 0) + { + handlePersistentStateUpdateError(res); + return; + } + + // Increment current term, abort on failure + res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U); + if (res < 0) + { + handlePersistentStateUpdateError(res); + return; + } + + num_votes_received_in_this_campaign_ = 1; // Voting for self + + RequestVote::Request req; + req.last_log_index = persistent_state_.getLog().getLastIndex(); + req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term; + req.term = persistent_state_.getCurrentTerm(); + + for (uint8_t i = 0; i < NumRequestVoteClients; i++) + { + const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i); + if (!node_id.isUnicast()) + { + break; + } + + UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Requesting vote from %d", int(node_id.get())); + trace(TraceRaftVoteRequestInitiation, node_id.get()); + + res = request_vote_clients_[i]->call(node_id, req); + if (res < 0) + { + trace(TraceError, res); + } + } + + UAVCAN_ASSERT(res >= 0); + } } void RaftCore::updateLeader() @@ -905,13 +974,14 @@ void RaftCore::switchState(const ServerState new_state) cluster_.resetAllServerIndices(); - if (server_state_ == ServerStateFollower) - { - setActiveMode(false); - } - next_server_index_ = 0; num_votes_received_in_this_campaign_ = 0; + + for (uint8_t i = 0; i < NumRequestVoteClients; i++) + { + request_vote_clients_[i]->cancel(); + } + append_entries_client_.cancel(); } } @@ -937,6 +1007,7 @@ void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term) } registerActivity(); // Deferring future elections switchState(ServerStateFollower); + setActiveMode(false); } void RaftCore::propagateCommitIndex() @@ -1022,6 +1093,7 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure& result) { + UAVCAN_ASSERT(server_state_ == ServerStateLeader); // When state switches, all requests must be cancelled + if (!result.isSuccessful()) { return; @@ -1181,6 +1256,7 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure& result) { - if (server_state_ != ServerStateCandidate) - { - UAVCAN_ASSERT(num_votes_received_in_this_campaign_ == 0); // Making sure it was reset - return; // State has been switched, so we don't actually need the response anymore - } + UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled if (!result.isSuccessful()) { @@ -1329,12 +1401,15 @@ int RaftCore::init(uint8_t cluster_size) } append_entries_client_.setRequestTimeout(update_interval_); - res = request_vote_client_.init(); - if (res < 0) + for (uint8_t i = 0; i < NumRequestVoteClients; i++) { - return res; + res = request_vote_clients_[i]->init(); + if (res < 0) + { + return res; + } + request_vote_clients_[i]->setRequestTimeout(update_interval_); } - request_vote_client_.setRequestTimeout(update_interval_); startPeriodic(update_interval_);