diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index f826fd9189..8bc71db869 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -132,9 +132,11 @@ enum TraceEvent TraceRaftNewLogEntry, // node ID value TraceRaftRequestIgnored, // node ID of the client TraceRaftVoteRequestReceived, // node ID of the client - TraceRaftPersistStateUpdateError, // negative error code + TraceRaftVoteRequestSucceeded, // node ID of the server // 20 + TraceRaftPersistStateUpdateError, // negative error code TraceRaftCommitIndexUpdate, // new commit index value + TraceRaftNewerTermInResponse, // new term value NumTraceEventCodes }; @@ -461,6 +463,17 @@ class RaftCore : private TimerBase ServerStateLeader }; + struct PendingAppendEntriesFields + { + Log::Index prev_log_index; + Log::Index num_entries; + + PendingAppendEntriesFields() + : prev_log_index(0) + , num_entries(0) + { } + }; + IDynamicNodeIDAllocationServerEventTracer& tracer_; /* @@ -477,6 +490,8 @@ class RaftCore : private TimerBase uint8_t next_server_index_; ///< Next server to query for AE or RV RPC uint8_t num_votes_received_in_this_campaign_; + PendingAppendEntriesFields pending_append_entries_fields_; + /* * Transport */ @@ -489,7 +504,7 @@ class RaftCore : private TimerBase * This constant defines the rate at which internal state updates happen. * It also defines timeouts for AppendEntries and RequestVote RPCs. */ - static MonotonicDuration getUpdateInterval() { return MonotonicDuration::fromMSec(50); } + static MonotonicDuration getUpdateInterval() { return MonotonicDuration::fromMSec(100); } void trace(TraceEvent event, int64_t argument) { tracer_.onEvent(event, argument); } @@ -504,6 +519,8 @@ class RaftCore : private TimerBase void switchState(ServerState new_state); void setActiveMode(bool new_active); + void tryIncrementCurrentTermFromResponse(Term new_term); + void handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response); diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index 83f6a12ee0..cf23c22265 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -901,6 +901,9 @@ void RaftCore::switchState(const ServerState new_state) { setActiveMode(false); } + + next_server_index_ = 0; + num_votes_received_in_this_campaign_ = 0; } } @@ -916,6 +919,18 @@ void RaftCore::setActiveMode(const bool new_active) } } +void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term) +{ + trace(TraceRaftNewerTermInResponse, new_term); + const int res = persistent_state_.setCurrentTerm(new_term); + if (res < 0) + { + trace(TraceRaftPersistStateUpdateError, res); + } + registerActivity(); // Deferring future elections + switchState(ServerStateFollower); +} + void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response) { @@ -1045,7 +1060,32 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure& result) { - (void)result; + if (!result.isSuccessful()) + { + return; + } + + if (result.response.term > persistent_state_.getCurrentTerm()) + { + tryIncrementCurrentTermFromResponse(result.response.term); + } + else + { + if (result.response.success) + { + cluster_.incrementServerNextIndexBy(result.server_node_id, pending_append_entries_fields_.num_entries); + cluster_.setServerMatchIndex(result.server_node_id, + Log::Index(pending_append_entries_fields_.prev_log_index + + pending_append_entries_fields_.num_entries)); + } + else + { + cluster_.decrementServerNextIndex(result.server_node_id); + } + } + + pending_append_entries_fields_ = PendingAppendEntriesFields(); + // Rest of the logic is implemented in periodic update handlers. } void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure& request, @@ -1126,11 +1166,41 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure& result) { - (void)result; + if (server_state_ != ServerStateCandidate) + { + UAVCAN_ASSERT(num_votes_received_in_this_campaign_ == 0); // Making sure it was reset + return; // State has been switched, so we don't actually need the response anymore + } + + if (!result.isSuccessful()) + { + return; + } + + trace(TraceRaftVoteRequestSucceeded, result.server_node_id.get()); + + if (result.response.term > persistent_state_.getCurrentTerm()) + { + tryIncrementCurrentTermFromResponse(result.response.term); + } + else + { + if (result.response.vote_granted) + { + num_votes_received_in_this_campaign_++; + } + } + // Rest of the logic is implemented in periodic update handlers. + // I'm no fan of asynchronous programming. At all. } void RaftCore::handleTimerEvent(const TimerEvent& event) { + if (cluster_.hadDiscoveryActivity()) + { + setActiveMode(true); + } + switch (server_state_) { case ServerStateFollower: