All Raft logic finished except time updates

This commit is contained in:
Pavel Kirienko
2015-05-07 19:48:05 +03:00
parent ce752d93bd
commit a1ee2efea0
3 changed files with 62 additions and 0 deletions
@@ -137,6 +137,7 @@ enum TraceEvent
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftCommitIndexUpdate, // new commit index value
TraceRaftNewerTermInResponse, // new term value
TraceRaftNewEntryCommitted, // new commit index value
NumTraceEventCodes
};
@@ -521,6 +522,8 @@ class RaftCore : private TimerBase
void tryIncrementCurrentTermFromResponse(Term new_term);
void propagateCommitIndex();
void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response);
@@ -883,6 +883,7 @@ void RaftCore::updateCandidate(const MonotonicTime& current_time)
void RaftCore::updateLeader(const MonotonicTime& current_time)
{
(void)current_time;
propagateCommitIndex();
}
void RaftCore::switchState(const ServerState new_state)
@@ -931,6 +932,55 @@ void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term)
switchState(ServerStateFollower);
}
void RaftCore::propagateCommitIndex()
{
// Objective is to estimate whether we can safely increment commit index value
UAVCAN_ASSERT(server_state_ == ServerStateLeader);
if (commit_index_ == persistent_state_.getLog().getLastIndex())
{
// All local entries are committed
bool commit_index_fully_replicated = true;
for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
{
const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
if (match_index != commit_index_)
{
commit_index_fully_replicated = false;
break;
}
}
if (commit_index_fully_replicated && cluster_.isClusterDiscovered())
{
setActiveMode(false); // Commit index is the same on all nodes, enabling passive mode
}
}
else
{
// Not all local entries are committed
setActiveMode(true);
uint8_t num_nodes_with_next_log_entry_available = 1; // Local node
for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
{
const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
if (match_index > commit_index_)
{
num_nodes_with_next_log_entry_available++;
}
}
if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize())
{
commit_index_++;
trace(TraceRaftNewEntryCommitted, commit_index_);
// AT THIS POINT ALLOCATION IS COMPLETE
}
}
}
void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response)
{
@@ -801,3 +801,12 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers)
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(2));
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(127));
}
TEST(DynamicNodeIDAllocationServer, ObjectSizes)
{
std::cout << "Log: " << sizeof(uavcan::dynamic_node_id_server_impl::Log) << std::endl;
std::cout << "PersistentState: " << sizeof(uavcan::dynamic_node_id_server_impl::PersistentState) << std::endl;
std::cout << "ClusterManager: " << sizeof(uavcan::dynamic_node_id_server_impl::ClusterManager) << std::endl;
std::cout << "RaftCore: " << sizeof(uavcan::dynamic_node_id_server_impl::RaftCore) << std::endl;
}