Follower and candidate logic implemented

This commit is contained in:
Pavel Kirienko
2015-05-08 13:30:55 +03:00
parent 1a640e6763
commit 6985c72dd3
3 changed files with 113 additions and 26 deletions
@@ -4,8 +4,8 @@
#
uint32 term
uint32 last_log_term
uint8 last_log_index
uint8 last_log_term
---
@@ -134,6 +134,7 @@ enum TraceEvent
TraceRaftVoteRequestReceived, // node ID of the client
TraceRaftVoteRequestSucceeded, // node ID of the server
// 20
TraceRaftVoteRequestInitiation, // node ID of the server
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftCommitIndexUpdate, // new commit index value
TraceRaftNewerTermInResponse, // new term value
@@ -301,6 +302,10 @@ public:
*/
class ClusterManager : private TimerBase
{
public:
enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize };
private:
typedef MethodBinder<ClusterManager*,
void (ClusterManager::*)
(const ReceivedDataStructure<Discovery>&)>
@@ -320,8 +325,6 @@ class ClusterManager : private TimerBase
void resetIndices(const Log& log);
};
enum { MaxServers = Discovery::FieldTypes::known_nodes::MaxSize };
IDynamicNodeIDStorageBackend& storage_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
const Log& log_;
@@ -329,7 +332,7 @@ class ClusterManager : private TimerBase
Subscriber<Discovery, DiscoveryCallback> discovery_sub_;
mutable Publisher<Discovery> discovery_pub_;
Server servers_[MaxServers - 1]; ///< Minus one because the local server is not listed there.
Server servers_[MaxClusterSize - 1]; ///< Minus one because the local server is not listed there.
uint8_t cluster_size_;
uint8_t num_known_servers_;
@@ -504,8 +507,11 @@ class RaftCore : private TimerBase
*/
ServiceServer<AppendEntries, AppendEntriesCallback> append_entries_srv_;
ServiceClient<AppendEntries, AppendEntriesResponseCallback> append_entries_client_;
ServiceServer<RequestVote, RequestVoteCallback> request_vote_srv_;
ServiceClient<RequestVote, RequestVoteResponseCallback> request_vote_client_;
ServiceServer<RequestVote, RequestVoteCallback> request_vote_srv_;
enum { NumRequestVoteClients = ClusterManager::MaxClusterSize - 1 };
LazyConstructor<ServiceClient<RequestVote, RequestVoteResponseCallback> >
request_vote_clients_[NumRequestVoteClients];
void trace(TraceEvent event, int64_t argument) { tracer_.onEvent(event, argument); }
@@ -515,6 +521,8 @@ class RaftCore : private TimerBase
void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); }
bool isActivityTimedOut() const;
void handlePersistentStateUpdateError(int error);
void updateFollower();
void updateCandidate();
void updateLeader();
@@ -559,8 +567,12 @@ public:
, append_entries_srv_(node)
, append_entries_client_(node)
, request_vote_srv_(node)
, request_vote_client_(node)
{ }
{
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i].construct<INode&>(node);
}
}
/**
* Once started, the logic runs in the background until destructor is called.
@@ -588,7 +588,7 @@ const ClusterManager::Server* ClusterManager::findServer(NodeID node_id) const
void ClusterManager::addServer(NodeID node_id)
{
UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxServers - 2));
UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxClusterSize - 2));
if (!isKnownServer(node_id) && node_id.isUnicast())
{
tracer_.onEvent(TraceNewServerDiscovered, node_id.get());
@@ -715,7 +715,7 @@ int ClusterManager::init(const uint8_t init_cluster_size)
"Cluster size is neither configured nor stored in the storage");
return res;
}
if ((value == 0) || (value > MaxServers))
if ((value == 0) || (value > MaxClusterSize))
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Cluster size is invalid");
return -ErrFailure;
@@ -724,7 +724,7 @@ int ClusterManager::init(const uint8_t init_cluster_size)
}
else
{
if ((init_cluster_size == 0) || (init_cluster_size > MaxServers))
if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize))
{
return -ErrInvalidParam;
}
@@ -744,7 +744,7 @@ int ClusterManager::init(const uint8_t init_cluster_size)
tracer_.onEvent(TraceClusterSizeInited, cluster_size_);
UAVCAN_ASSERT(cluster_size_ > 0);
UAVCAN_ASSERT(cluster_size_ <= MaxServers);
UAVCAN_ASSERT(cluster_size_ <= MaxClusterSize);
/*
* Initializing pub/sub and timer
@@ -880,12 +880,81 @@ bool RaftCore::isActivityTimedOut() const
return getNode().getMonotonicTime() > (last_activity_timestamp_ + activity_timeout);
}
void RaftCore::handlePersistentStateUpdateError(int error)
{
UAVCAN_ASSERT(error < 0);
trace(TraceRaftPersistStateUpdateError, error);
switchState(ServerStateFollower);
setActiveMode(false); // Goodnight sweet prince
registerActivity(); // Deferring reelections
}
void RaftCore::updateFollower()
{
if (active_mode_ && isActivityTimedOut())
{
switchState(ServerStateCandidate);
setActiveMode(true);
registerActivity();
}
}
void RaftCore::updateCandidate()
{
if (num_votes_received_in_this_campaign_ > 0)
{
const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize();
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Election complete, won: %d", int(won));
switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader
setActiveMode(true);
}
else
{
// Set votedFor, abort on failure
int res = persistent_state_.setVotedFor(getNode().getNodeID());
if (res < 0)
{
handlePersistentStateUpdateError(res);
return;
}
// Increment current term, abort on failure
res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U);
if (res < 0)
{
handlePersistentStateUpdateError(res);
return;
}
num_votes_received_in_this_campaign_ = 1; // Voting for self
RequestVote::Request req;
req.last_log_index = persistent_state_.getLog().getLastIndex();
req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term;
req.term = persistent_state_.getCurrentTerm();
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i);
if (!node_id.isUnicast())
{
break;
}
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Requesting vote from %d", int(node_id.get()));
trace(TraceRaftVoteRequestInitiation, node_id.get());
res = request_vote_clients_[i]->call(node_id, req);
if (res < 0)
{
trace(TraceError, res);
}
}
UAVCAN_ASSERT(res >= 0);
}
}
void RaftCore::updateLeader()
@@ -905,13 +974,14 @@ void RaftCore::switchState(const ServerState new_state)
cluster_.resetAllServerIndices();
if (server_state_ == ServerStateFollower)
{
setActiveMode(false);
}
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i]->cancel();
}
append_entries_client_.cancel();
}
}
@@ -937,6 +1007,7 @@ void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term)
}
registerActivity(); // Deferring future elections
switchState(ServerStateFollower);
setActiveMode(false);
}
void RaftCore::propagateCommitIndex()
@@ -1022,6 +1093,7 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntr
}
switchState(ServerStateFollower);
setActiveMode(false);
if (!response.isResponseEnabled())
{
@@ -1046,6 +1118,7 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntr
}
switchState(ServerStateFollower);
setActiveMode(false);
/*
* Step 2
@@ -1117,6 +1190,8 @@ void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntr
void RaftCore::handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result)
{
UAVCAN_ASSERT(server_state_ == ServerStateLeader); // When state switches, all requests must be cancelled
if (!result.isSuccessful())
{
return;
@@ -1181,6 +1256,7 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote:
}
switchState(ServerStateFollower);
setActiveMode(false);
if (!response.isResponseEnabled())
{
@@ -1223,11 +1299,7 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote:
void RaftCore::handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result)
{
if (server_state_ != ServerStateCandidate)
{
UAVCAN_ASSERT(num_votes_received_in_this_campaign_ == 0); // Making sure it was reset
return; // State has been switched, so we don't actually need the response anymore
}
UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled
if (!result.isSuccessful())
{
@@ -1329,12 +1401,15 @@ int RaftCore::init(uint8_t cluster_size)
}
append_entries_client_.setRequestTimeout(update_interval_);
res = request_vote_client_.init();
if (res < 0)
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
return res;
res = request_vote_clients_[i]->init();
if (res < 0)
{
return res;
}
request_vote_clients_[i]->setRequestTimeout(update_interval_);
}
request_vote_client_.setRequestTimeout(update_interval_);
startPeriodic(update_interval_);