From e48fa77d85bce9c7dfce2b88369c1beecd9f5c29 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Fri, 8 May 2015 18:43:27 +0300 Subject: [PATCH] Raft logic fixes & more tests --- .../dynamic_node_id_allocation_server.hpp | 2 +- .../uc_dynamic_node_id_allocation_server.cpp | 100 +++++++++--------- .../dynamic_node_id_allocation_server.cpp | 67 +++++++++--- 3 files changed, 101 insertions(+), 68 deletions(-) diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index 8521859e84..e71c6752f5 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -136,7 +136,7 @@ enum TraceEvent TraceRaftCoreInited, // update interval in usec TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader // 15 - TraceRaftModeSwitch, // 0 - Passive, 1 - Active + TraceRaftActiveSwitch, // 0 - Passive, 1 - Active TraceRaftNewLogEntry, // node ID value TraceRaftRequestIgnored, // node ID of the client TraceRaftVoteRequestReceived, // node ID of the client diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index 6d216a484e..db6a793f29 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -42,7 +42,7 @@ const char* IDynamicNodeIDAllocationServerEventTracer::getEventName(uint16_t cod "InvalidClusterSizeReceived", "RaftCoreInited", "RaftStateSwitch", - "RaftModeSwitch", + "RaftActiveSwitch", "RaftNewLogEntry", "RaftRequestIgnored", "RaftVoteRequestReceived", @@ -939,13 +939,14 @@ void RaftCore::updateFollower() if (active_mode_ && isActivityTimedOut()) { switchState(ServerStateCandidate); - setActiveMode(true); registerActivity(); } } void RaftCore::updateCandidate() { + UAVCAN_ASSERT(active_mode_); + if (num_votes_received_in_this_campaign_ > 0) { const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize(); @@ -953,7 +954,6 @@ void RaftCore::updateCandidate() UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Election complete, won: %d", int(won)); switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader - setActiveMode(true); } else { @@ -1002,52 +1002,55 @@ void RaftCore::updateCandidate() void RaftCore::updateLeader() { - propagateCommitIndex(); - - // Leader simply emits one AppendEntry at every update, iterating over all available servers - if (next_server_index_ >= cluster_.getClusterSize()) + if (active_mode_ || (next_server_index_ > 0)) { - next_server_index_ = 0; - } + const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_); + UAVCAN_ASSERT(node_id.isUnicast()); - const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_); - UAVCAN_ASSERT(node_id.isUnicast()); - - AppendEntries::Request req; - req.term = persistent_state_.getCurrentTerm(); - req.leader_commit = commit_index_; - - req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U); - - const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index); - if (entry == NULL) - { - UAVCAN_ASSERT(0); - handlePersistentStateUpdateError(-ErrLogic); - return; - } - - req.prev_log_term = entry->term; - - for (Log::Index index = cluster_.getServerNextIndex(node_id); - index <= persistent_state_.getLog().getLastIndex(); - index++) - { - req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index)); - if (req.entries.size() == req.entries.capacity()) + next_server_index_++; + if (next_server_index_ >= cluster_.getNumKnownServers()) { - break; + next_server_index_ = 0; + } + + AppendEntries::Request req; + req.term = persistent_state_.getCurrentTerm(); + req.leader_commit = commit_index_; + + req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U); + + const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index); + if (entry == NULL) + { + UAVCAN_ASSERT(0); + handlePersistentStateUpdateError(-ErrLogic); + return; + } + + req.prev_log_term = entry->term; + + for (Log::Index index = cluster_.getServerNextIndex(node_id); + index <= persistent_state_.getLog().getLastIndex(); + index++) + { + req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index)); + if (req.entries.size() == req.entries.capacity()) + { + break; + } + } + + pending_append_entries_fields_.num_entries = req.entries.size(); + pending_append_entries_fields_.prev_log_index = req.prev_log_index; + + const int res = append_entries_client_.call(node_id, req); + if (res < 0) + { + trace(TraceRaftAppendEntriesCallFailure, res); } } - pending_append_entries_fields_.num_entries = req.entries.size(); - pending_append_entries_fields_.prev_log_index = req.prev_log_index; - - const int res = append_entries_client_.call(node_id, req); - if (res < 0) - { - trace(TraceRaftAppendEntriesCallFailure, res); - } + propagateCommitIndex(); } void RaftCore::switchState(const ServerState new_state) @@ -1077,9 +1080,9 @@ void RaftCore::setActiveMode(const bool new_active) { if (active_mode_ != new_active) { - UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Mode switch: %d --> %d", + UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Active switch: %d --> %d", int(active_mode_), int(new_active)); - trace(TraceRaftModeSwitch, new_active); + trace(TraceRaftActiveSwitch, new_active); active_mode_ = new_active; } @@ -1119,10 +1122,8 @@ void RaftCore::propagateCommitIndex() } } - if (commit_index_fully_replicated && cluster_.isClusterDiscovered()) - { - setActiveMode(false); // Commit index is the same on all nodes, enabling passive mode - } + const bool all_done = commit_index_fully_replicated && cluster_.isClusterDiscovered(); + setActiveMode(!all_done); // Enable passive mode if commit index is the same on all nodes } else { @@ -1348,7 +1349,6 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure */ +#if __GNUC__ +// We need auto_ptr for compatibility reasons +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#endif + #include #include +#include #include #include "helpers.hpp" @@ -840,10 +846,13 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers) TEST(DynamicNodeIDAllocationServer, RaftCoreBasic) { + using namespace uavcan::dynamic_node_id_server_impl; + using namespace uavcan::protocol::dynamic_node_id::server; + uavcan::GlobalDataTypeRegistry::instance().reset(); - uavcan::DefaultDataTypeRegistrator _reg1; - uavcan::DefaultDataTypeRegistrator _reg2; - uavcan::DefaultDataTypeRegistrator _reg3; + uavcan::DefaultDataTypeRegistrator _reg1; + uavcan::DefaultDataTypeRegistrator _reg2; + uavcan::DefaultDataTypeRegistrator _reg3; EventTracer tracer_a("a"); EventTracer tracer_b("b"); @@ -854,14 +863,14 @@ TEST(DynamicNodeIDAllocationServer, RaftCoreBasic) InterlinkedTestNodesWithSysClock nodes; - uavcan::dynamic_node_id_server_impl::RaftCore raft_a(nodes.a, storage_a, tracer_a, commit_handler_a); - uavcan::dynamic_node_id_server_impl::RaftCore raft_b(nodes.b, storage_b, tracer_b, commit_handler_b); + std::auto_ptr raft_a(new RaftCore(nodes.a, storage_a, tracer_a, commit_handler_a)); + std::auto_ptr raft_b(new RaftCore(nodes.b, storage_b, tracer_b, commit_handler_b)); /* * Initialization */ - ASSERT_LE(0, raft_a.init(2)); - ASSERT_LE(0, raft_b.init(2)); + ASSERT_LE(0, raft_a->init(2)); + ASSERT_LE(0, raft_b->init(2)); /* * Running and trying not to fall @@ -869,24 +878,48 @@ TEST(DynamicNodeIDAllocationServer, RaftCoreBasic) nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000)); // The one with lower node ID must become a leader - ASSERT_TRUE(raft_a.isLeader()); - ASSERT_FALSE(raft_b.isLeader()); + ASSERT_TRUE(raft_a->isLeader()); + ASSERT_FALSE(raft_b->isLeader()); - ASSERT_EQ(0, raft_a.getCommitIndex()); - ASSERT_EQ(0, raft_b.getCommitIndex()); + ASSERT_EQ(0, raft_a->getCommitIndex()); + ASSERT_EQ(0, raft_b->getCommitIndex()); /* * Adding some stuff */ - uavcan::protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id unique_id; + Entry::FieldTypes::unique_id unique_id; uavcan::fill_n(unique_id.begin(), 16, uint8_t(0xAA)); - ASSERT_LE(0, raft_a.appendLog(unique_id, uavcan::NodeID(1))); + ASSERT_LE(0, raft_a->appendLog(unique_id, uavcan::NodeID(1))); nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000)); - ASSERT_EQ(1, raft_a.getCommitIndex()); - ASSERT_EQ(1, raft_b.getCommitIndex()); + ASSERT_EQ(1, raft_a->getCommitIndex()); + ASSERT_EQ(1, raft_b->getCommitIndex()); + + /* + * Terminating the leader - the Follower will continue to sleep + */ + raft_a.reset(); + + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000)); + + /* + * Reinitializing the leader - current Follower will become the new Leader + */ + storage_a.reset(); + + raft_a.reset(new RaftCore(nodes.a, storage_a, tracer_a, commit_handler_a)); + ASSERT_LE(0, raft_a->init(2)); + ASSERT_EQ(0, raft_a->getCommitIndex()); + + nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000)); + + ASSERT_FALSE(raft_a->isLeader()); + ASSERT_TRUE(raft_b->isLeader()); + + ASSERT_EQ(1, raft_a->getCommitIndex()); + ASSERT_EQ(1, raft_b->getCommitIndex()); } @@ -898,8 +931,8 @@ TEST(DynamicNodeIDAllocationServer, EventCodeToString) // Simply checking some error codes ASSERT_STREQ("Error", IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceError)); - ASSERT_STREQ("RaftModeSwitch", - IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftModeSwitch)); + ASSERT_STREQ("RaftActiveSwitch", + IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftActiveSwitch)); ASSERT_STREQ("RaftAppendEntriesCallFailure", IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftAppendEntriesCallFailure)); ASSERT_STREQ("DiscoveryReceived",