Raft AE and RV RPC response handlers

This commit is contained in:
Pavel Kirienko
2015-05-07 19:11:13 +03:00
parent 944ac75d93
commit ce752d93bd
2 changed files with 91 additions and 4 deletions
@@ -132,9 +132,11 @@ enum TraceEvent
TraceRaftNewLogEntry, // node ID value
TraceRaftRequestIgnored, // node ID of the client
TraceRaftVoteRequestReceived, // node ID of the client
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftVoteRequestSucceeded, // node ID of the server
// 20
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftCommitIndexUpdate, // new commit index value
TraceRaftNewerTermInResponse, // new term value
NumTraceEventCodes
};
@@ -461,6 +463,17 @@ class RaftCore : private TimerBase
ServerStateLeader
};
struct PendingAppendEntriesFields
{
Log::Index prev_log_index;
Log::Index num_entries;
PendingAppendEntriesFields()
: prev_log_index(0)
, num_entries(0)
{ }
};
IDynamicNodeIDAllocationServerEventTracer& tracer_;
/*
@@ -477,6 +490,8 @@ class RaftCore : private TimerBase
uint8_t next_server_index_; ///< Next server to query for AE or RV RPC
uint8_t num_votes_received_in_this_campaign_;
PendingAppendEntriesFields pending_append_entries_fields_;
/*
* Transport
*/
@@ -489,7 +504,7 @@ class RaftCore : private TimerBase
* This constant defines the rate at which internal state updates happen.
* It also defines timeouts for AppendEntries and RequestVote RPCs.
*/
static MonotonicDuration getUpdateInterval() { return MonotonicDuration::fromMSec(50); }
static MonotonicDuration getUpdateInterval() { return MonotonicDuration::fromMSec(100); }
void trace(TraceEvent event, int64_t argument) { tracer_.onEvent(event, argument); }
@@ -504,6 +519,8 @@ class RaftCore : private TimerBase
void switchState(ServerState new_state);
void setActiveMode(bool new_active);
void tryIncrementCurrentTermFromResponse(Term new_term);
void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response);
@@ -901,6 +901,9 @@ void RaftCore::switchState(const ServerState new_state)
{
setActiveMode(false);
}
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
}
}
@@ -916,6 +919,18 @@ void RaftCore::setActiveMode(const bool new_active)
}
}
void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term)
{
trace(TraceRaftNewerTermInResponse, new_term);
const int res = persistent_state_.setCurrentTerm(new_term);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
}
registerActivity(); // Deferring future elections
switchState(ServerStateFollower);
}
void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response)
{
@@ -1045,7 +1060,32 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntr
void RaftCore::handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result)
{
(void)result;
if (!result.isSuccessful())
{
return;
}
if (result.response.term > persistent_state_.getCurrentTerm())
{
tryIncrementCurrentTermFromResponse(result.response.term);
}
else
{
if (result.response.success)
{
cluster_.incrementServerNextIndexBy(result.server_node_id, pending_append_entries_fields_.num_entries);
cluster_.setServerMatchIndex(result.server_node_id,
Log::Index(pending_append_entries_fields_.prev_log_index +
pending_append_entries_fields_.num_entries));
}
else
{
cluster_.decrementServerNextIndex(result.server_node_id);
}
}
pending_append_entries_fields_ = PendingAppendEntriesFields();
// Rest of the logic is implemented in periodic update handlers.
}
void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request,
@@ -1126,11 +1166,41 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote:
void RaftCore::handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result)
{
(void)result;
if (server_state_ != ServerStateCandidate)
{
UAVCAN_ASSERT(num_votes_received_in_this_campaign_ == 0); // Making sure it was reset
return; // State has been switched, so we don't actually need the response anymore
}
if (!result.isSuccessful())
{
return;
}
trace(TraceRaftVoteRequestSucceeded, result.server_node_id.get());
if (result.response.term > persistent_state_.getCurrentTerm())
{
tryIncrementCurrentTermFromResponse(result.response.term);
}
else
{
if (result.response.vote_granted)
{
num_votes_received_in_this_campaign_++;
}
}
// Rest of the logic is implemented in periodic update handlers.
// I'm no fan of asynchronous programming. At all.
}
void RaftCore::handleTimerEvent(const TimerEvent& event)
{
if (cluster_.hadDiscoveryActivity())
{
setActiveMode(true);
}
switch (server_state_)
{
case ServerStateFollower: