Raft AE and RV servers

This commit is contained in:
Pavel Kirienko
2015-05-07 18:16:26 +03:00
parent 5e56c8a612
commit 944ac75d93
2 changed files with 277 additions and 39 deletions
@@ -124,11 +124,17 @@ enum TraceEvent
// 10
TraceDiscoveryReceived, // node ID of the sender
TraceClusterSizeInited, // cluster size
TraceInvalidClusterSizeReceived, // received cluster size
TraceRaftCoreInited, // update interval in usec
TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader
TraceRaftModeSwitch, // 0 - Passive, 1 - Active
// 15
TraceRaftModeSwitch, // 0 - Passive, 1 - Active
TraceRaftNewLogEntry, // node ID value
TraceRaftRequestIgnored, // node ID of the client
TraceRaftVoteRequestReceived, // node ID of the client
TraceRaftPersistStateUpdateError, // negative error code
// 20
TraceRaftCommitIndexUpdate, // new commit index value
NumTraceEventCodes
};
@@ -227,6 +233,7 @@ public:
* Returned value indicates whether the requested operation has been carried out successfully.
*/
int removeEntriesWhereIndexGreaterOrEqual(Index index);
int removeEntriesWhereIndexGreater(Index index);
/**
* Returns nullptr if there's no such index.
@@ -269,6 +276,7 @@ public:
Term getCurrentTerm() const { return current_term_; }
NodeID getVotedFor() const { return voted_for_; }
bool isVotedForSet() const { return voted_for_.isUnicast(); }
Log& getLog() { return log_; }
const Log& getLog() const { return log_; }
@@ -282,6 +290,7 @@ public:
* Invokes storage IO.
*/
int setVotedFor(NodeID node_id);
int resetVotedFor() { return setVotedFor(NodeID(0)); }
};
/**
@@ -331,7 +340,6 @@ class ClusterManager : private TimerBase
Server* findServer(NodeID node_id);
const Server* findServer(NodeID node_id) const;
bool isKnownServer(NodeID node_id) const;
void addServer(NodeID node_id);
virtual void handleTimerEvent(const TimerEvent&);
@@ -368,6 +376,11 @@ public:
*/
int init(uint8_t init_cluster_size = ClusterSizeUnknown);
/**
* Whether such server has been discovered earlier.
*/
bool isKnownServer(NodeID node_id) const;
/**
* An invalid node ID will be returned if there's no such server.
* The local server is not listed there.
@@ -482,11 +495,14 @@ class RaftCore : private TimerBase
INode& getNode() { return append_entries_srv_.getNode(); }
void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); }
void updateFollower(const MonotonicTime& current_time);
void updateCandidate(const MonotonicTime& current_time);
void updateLeader(const MonotonicTime& current_time);
void switchState(ServerState new_state);
void setActiveMode(bool new_active);
void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response);
@@ -344,32 +344,41 @@ int Log::removeEntriesWhereIndexGreaterOrEqual(Index index)
{
UAVCAN_ASSERT(last_index_ < Capacity);
if (((index) >= Capacity) || (index == 0))
if (((index) >= Capacity) || (index <= 0))
{
return -ErrLogic;
}
tracer_.onEvent(TraceLogRemove, index - 1U);
MarshallingStorageDecorator io(storage_);
uint32_t new_last_index = index - 1U;
int res = io.setAndGetBack(getLastIndexKey(), new_last_index);
if (res < 0)
tracer_.onEvent(TraceLogRemove, new_last_index);
if (new_last_index != last_index_)
{
return res;
MarshallingStorageDecorator io(storage_);
int res = io.setAndGetBack(getLastIndexKey(), new_last_index);
if (res < 0)
{
return res;
}
if (new_last_index != index - 1U)
{
return -ErrFailure;
}
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Entries removed, last index %u --> %u",
unsigned(last_index_), unsigned(new_last_index));
last_index_ = Index(new_last_index);
}
if (new_last_index != index - 1U)
{
return -ErrFailure;
}
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Entries removed, last index %u --> %u",
unsigned(last_index_), unsigned(new_last_index));
last_index_ = Index(new_last_index);
// Removal operation leaves dangling entries in storage, it's OK
return 0;
}
int Log::removeEntriesWhereIndexGreater(Index index)
{
return removeEntriesWhereIndexGreaterOrEqual(Index(index + 1U));
}
const Entry* Log::getEntryAtIndex(Index index) const
{
UAVCAN_ASSERT(last_index_ < Capacity);
@@ -577,24 +586,6 @@ const ClusterManager::Server* ClusterManager::findServer(NodeID node_id) const
return const_cast<ClusterManager*>(this)->findServer(node_id);
}
bool ClusterManager::isKnownServer(NodeID node_id) const
{
if (node_id == getNode().getNodeID())
{
return true;
}
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID());
if (servers_[i].node_id == node_id)
{
return true;
}
}
return false;
}
void ClusterManager::addServer(NodeID node_id)
{
UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxServers - 2));
@@ -666,6 +657,7 @@ void ClusterManager::handleDiscovery(const ReceivedDataStructure<Discovery>& msg
*/
if (msg.configured_cluster_size != cluster_size_)
{
tracer_.onEvent(TraceInvalidClusterSizeReceived, msg.configured_cluster_size);
getNode().registerInternalFailure("Bad Raft cluster size");
return;
}
@@ -778,6 +770,24 @@ int ClusterManager::init(const uint8_t init_cluster_size)
return 0;
}
bool ClusterManager::isKnownServer(NodeID node_id) const
{
if (node_id == getNode().getNodeID())
{
return true;
}
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID());
if (servers_[i].node_id == node_id)
{
return true;
}
}
return false;
}
NodeID ClusterManager::getRemoteServerNodeIDAtIndex(uint8_t index) const
{
if (index < num_known_servers_)
@@ -875,19 +885,162 @@ void RaftCore::updateLeader(const MonotonicTime& current_time)
(void)current_time;
}
void RaftCore::switchState(ServerState new_state)
void RaftCore::switchState(const ServerState new_state)
{
if (server_state_ != new_state)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "State switch: %d --> %d",
int(server_state_), int(new_state));
trace(TraceRaftStateSwitch, new_state);
server_state_ = new_state;
cluster_.resetAllServerIndices();
if (server_state_ == ServerStateFollower)
{
setActiveMode(false);
}
}
}
void RaftCore::setActiveMode(const bool new_active)
{
if (active_mode_ != new_active)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Mode switch: %d --> %d",
int(active_mode_), int(new_active));
trace(TraceRaftModeSwitch, new_active);
active_mode_ = new_active;
}
}
void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response)
{
(void)request;
(void)response;
if (!cluster_.isKnownServer(request.getSrcNodeID()))
{
trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
return;
}
registerActivity();
UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
/*
* Checking if our current state is up to date.
* The request will be ignored if persistent state cannot be updated.
*/
if (request.term > persistent_state_.getCurrentTerm())
{
int res = persistent_state_.setCurrentTerm(request.term);
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
res = persistent_state_.resetVotedFor();
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
switchState(ServerStateFollower);
if (!response.isResponseEnabled())
{
return;
}
}
/*
* Preparing the response
*/
response.term = persistent_state_.getCurrentTerm();
response.success = false;
/*
* Step 1 (see Raft paper)
* Reject the request if the leader has stale term number.
*/
if (request.term < persistent_state_.getCurrentTerm())
{
response.setResponseEnabled(true);
return;
}
switchState(ServerStateFollower);
/*
* Step 2
* Reject the request if the assumed log index does not exist on the local node.
*/
const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index);
if (prev_entry == NULL)
{
response.setResponseEnabled(true);
return;
}
/*
* Step 3
* Drop log entries if term number does not match.
* Ignore the request if the persistent state cannot be updated.
*/
if (prev_entry->term != request.prev_log_term)
{
const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index);
response.setResponseEnabled(res >= 0);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
}
return;
}
/*
* Step 4
* Update the log with new entries - this will possibly require to rewrite existing entries.
* Ignore the request if the persistent state cannot be updated.
*/
if (request.prev_log_index != persistent_state_.getLog().getLastIndex())
{
const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return;
}
}
for (uint8_t i = 0; i < request.entries.size(); i++)
{
const int res = persistent_state_.getLog().append(request.entries[i]);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return; // Response will not be sent, the server will assume that we're dead
}
}
/*
* Step 5
* Update the commit index.
*/
if (request.leader_commit > commit_index_)
{
commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex());
trace(TraceRaftCommitIndexUpdate, commit_index_);
}
response.setResponseEnabled(true);
response.success = true;
}
void RaftCore::handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result)
@@ -898,8 +1051,77 @@ void RaftCore::handleAppendEntriesResponse(const ServiceCallResult<AppendEntries
void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request,
ServiceResponseDataStructure<RequestVote::Response>& response)
{
(void)request;
(void)response;
trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get());
if (!cluster_.isKnownServer(request.getSrcNodeID()))
{
trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
return;
}
UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
setActiveMode(true);
/*
* Checking if our current state is up to date.
* The request will be ignored if persistent state cannot be updated.
*/
if (request.term > persistent_state_.getCurrentTerm())
{
int res = persistent_state_.setCurrentTerm(request.term);
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
res = persistent_state_.resetVotedFor();
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
switchState(ServerStateFollower);
if (!response.isResponseEnabled())
{
return;
}
}
/*
* Preparing the response
*/
response.term = persistent_state_.getCurrentTerm();
if (request.term < response.term)
{
response.vote_granted = false;
}
else
{
const bool can_vote = !persistent_state_.isVotedForSet() ||
(persistent_state_.getVotedFor() == request.getSrcNodeID());
const bool log_is_up_to_date =
persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term);
response.vote_granted = can_vote && log_is_up_to_date;
if (response.vote_granted)
{
registerActivity(); // This is necessary to avoid excessive elections
const int res = persistent_state_.setVotedFor(request.getSrcNodeID());
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return;
}
}
}
}
void RaftCore::handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result)