mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-05-23 16:17:35 +08:00
Raft logic fixes & more tests
This commit is contained in:
@@ -136,7 +136,7 @@ enum TraceEvent
|
||||
TraceRaftCoreInited, // update interval in usec
|
||||
TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader
|
||||
// 15
|
||||
TraceRaftModeSwitch, // 0 - Passive, 1 - Active
|
||||
TraceRaftActiveSwitch, // 0 - Passive, 1 - Active
|
||||
TraceRaftNewLogEntry, // node ID value
|
||||
TraceRaftRequestIgnored, // node ID of the client
|
||||
TraceRaftVoteRequestReceived, // node ID of the client
|
||||
|
||||
@@ -42,7 +42,7 @@ const char* IDynamicNodeIDAllocationServerEventTracer::getEventName(uint16_t cod
|
||||
"InvalidClusterSizeReceived",
|
||||
"RaftCoreInited",
|
||||
"RaftStateSwitch",
|
||||
"RaftModeSwitch",
|
||||
"RaftActiveSwitch",
|
||||
"RaftNewLogEntry",
|
||||
"RaftRequestIgnored",
|
||||
"RaftVoteRequestReceived",
|
||||
@@ -939,13 +939,14 @@ void RaftCore::updateFollower()
|
||||
if (active_mode_ && isActivityTimedOut())
|
||||
{
|
||||
switchState(ServerStateCandidate);
|
||||
setActiveMode(true);
|
||||
registerActivity();
|
||||
}
|
||||
}
|
||||
|
||||
void RaftCore::updateCandidate()
|
||||
{
|
||||
UAVCAN_ASSERT(active_mode_);
|
||||
|
||||
if (num_votes_received_in_this_campaign_ > 0)
|
||||
{
|
||||
const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize();
|
||||
@@ -953,7 +954,6 @@ void RaftCore::updateCandidate()
|
||||
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Election complete, won: %d", int(won));
|
||||
|
||||
switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader
|
||||
setActiveMode(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1002,52 +1002,55 @@ void RaftCore::updateCandidate()
|
||||
|
||||
void RaftCore::updateLeader()
|
||||
{
|
||||
propagateCommitIndex();
|
||||
|
||||
// Leader simply emits one AppendEntry at every update, iterating over all available servers
|
||||
if (next_server_index_ >= cluster_.getClusterSize())
|
||||
if (active_mode_ || (next_server_index_ > 0))
|
||||
{
|
||||
next_server_index_ = 0;
|
||||
}
|
||||
const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_);
|
||||
UAVCAN_ASSERT(node_id.isUnicast());
|
||||
|
||||
const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_);
|
||||
UAVCAN_ASSERT(node_id.isUnicast());
|
||||
|
||||
AppendEntries::Request req;
|
||||
req.term = persistent_state_.getCurrentTerm();
|
||||
req.leader_commit = commit_index_;
|
||||
|
||||
req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U);
|
||||
|
||||
const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index);
|
||||
if (entry == NULL)
|
||||
{
|
||||
UAVCAN_ASSERT(0);
|
||||
handlePersistentStateUpdateError(-ErrLogic);
|
||||
return;
|
||||
}
|
||||
|
||||
req.prev_log_term = entry->term;
|
||||
|
||||
for (Log::Index index = cluster_.getServerNextIndex(node_id);
|
||||
index <= persistent_state_.getLog().getLastIndex();
|
||||
index++)
|
||||
{
|
||||
req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index));
|
||||
if (req.entries.size() == req.entries.capacity())
|
||||
next_server_index_++;
|
||||
if (next_server_index_ >= cluster_.getNumKnownServers())
|
||||
{
|
||||
break;
|
||||
next_server_index_ = 0;
|
||||
}
|
||||
|
||||
AppendEntries::Request req;
|
||||
req.term = persistent_state_.getCurrentTerm();
|
||||
req.leader_commit = commit_index_;
|
||||
|
||||
req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U);
|
||||
|
||||
const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index);
|
||||
if (entry == NULL)
|
||||
{
|
||||
UAVCAN_ASSERT(0);
|
||||
handlePersistentStateUpdateError(-ErrLogic);
|
||||
return;
|
||||
}
|
||||
|
||||
req.prev_log_term = entry->term;
|
||||
|
||||
for (Log::Index index = cluster_.getServerNextIndex(node_id);
|
||||
index <= persistent_state_.getLog().getLastIndex();
|
||||
index++)
|
||||
{
|
||||
req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index));
|
||||
if (req.entries.size() == req.entries.capacity())
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pending_append_entries_fields_.num_entries = req.entries.size();
|
||||
pending_append_entries_fields_.prev_log_index = req.prev_log_index;
|
||||
|
||||
const int res = append_entries_client_.call(node_id, req);
|
||||
if (res < 0)
|
||||
{
|
||||
trace(TraceRaftAppendEntriesCallFailure, res);
|
||||
}
|
||||
}
|
||||
|
||||
pending_append_entries_fields_.num_entries = req.entries.size();
|
||||
pending_append_entries_fields_.prev_log_index = req.prev_log_index;
|
||||
|
||||
const int res = append_entries_client_.call(node_id, req);
|
||||
if (res < 0)
|
||||
{
|
||||
trace(TraceRaftAppendEntriesCallFailure, res);
|
||||
}
|
||||
propagateCommitIndex();
|
||||
}
|
||||
|
||||
void RaftCore::switchState(const ServerState new_state)
|
||||
@@ -1077,9 +1080,9 @@ void RaftCore::setActiveMode(const bool new_active)
|
||||
{
|
||||
if (active_mode_ != new_active)
|
||||
{
|
||||
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Mode switch: %d --> %d",
|
||||
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Active switch: %d --> %d",
|
||||
int(active_mode_), int(new_active));
|
||||
trace(TraceRaftModeSwitch, new_active);
|
||||
trace(TraceRaftActiveSwitch, new_active);
|
||||
|
||||
active_mode_ = new_active;
|
||||
}
|
||||
@@ -1119,10 +1122,8 @@ void RaftCore::propagateCommitIndex()
|
||||
}
|
||||
}
|
||||
|
||||
if (commit_index_fully_replicated && cluster_.isClusterDiscovered())
|
||||
{
|
||||
setActiveMode(false); // Commit index is the same on all nodes, enabling passive mode
|
||||
}
|
||||
const bool all_done = commit_index_fully_replicated && cluster_.isClusterDiscovered();
|
||||
setActiveMode(!all_done); // Enable passive mode if commit index is the same on all nodes
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1348,7 +1349,6 @@ void RaftCore::handleRequestVoteRequest(const ReceivedDataStructure<RequestVote:
|
||||
}
|
||||
|
||||
switchState(ServerStateFollower);
|
||||
setActiveMode(false);
|
||||
|
||||
if (!response.isResponseEnabled())
|
||||
{
|
||||
|
||||
@@ -2,8 +2,14 @@
|
||||
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
|
||||
*/
|
||||
|
||||
#if __GNUC__
|
||||
// We need auto_ptr for compatibility reasons
|
||||
# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
|
||||
#endif
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <uavcan/protocol/dynamic_node_id_allocation_server.hpp>
|
||||
#include "helpers.hpp"
|
||||
|
||||
@@ -840,10 +846,13 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers)
|
||||
|
||||
TEST(DynamicNodeIDAllocationServer, RaftCoreBasic)
|
||||
{
|
||||
using namespace uavcan::dynamic_node_id_server_impl;
|
||||
using namespace uavcan::protocol::dynamic_node_id::server;
|
||||
|
||||
uavcan::GlobalDataTypeRegistry::instance().reset();
|
||||
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::Discovery> _reg1;
|
||||
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::AppendEntries> _reg2;
|
||||
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::RequestVote> _reg3;
|
||||
uavcan::DefaultDataTypeRegistrator<Discovery> _reg1;
|
||||
uavcan::DefaultDataTypeRegistrator<AppendEntries> _reg2;
|
||||
uavcan::DefaultDataTypeRegistrator<RequestVote> _reg3;
|
||||
|
||||
EventTracer tracer_a("a");
|
||||
EventTracer tracer_b("b");
|
||||
@@ -854,14 +863,14 @@ TEST(DynamicNodeIDAllocationServer, RaftCoreBasic)
|
||||
|
||||
InterlinkedTestNodesWithSysClock nodes;
|
||||
|
||||
uavcan::dynamic_node_id_server_impl::RaftCore raft_a(nodes.a, storage_a, tracer_a, commit_handler_a);
|
||||
uavcan::dynamic_node_id_server_impl::RaftCore raft_b(nodes.b, storage_b, tracer_b, commit_handler_b);
|
||||
std::auto_ptr<RaftCore> raft_a(new RaftCore(nodes.a, storage_a, tracer_a, commit_handler_a));
|
||||
std::auto_ptr<RaftCore> raft_b(new RaftCore(nodes.b, storage_b, tracer_b, commit_handler_b));
|
||||
|
||||
/*
|
||||
* Initialization
|
||||
*/
|
||||
ASSERT_LE(0, raft_a.init(2));
|
||||
ASSERT_LE(0, raft_b.init(2));
|
||||
ASSERT_LE(0, raft_a->init(2));
|
||||
ASSERT_LE(0, raft_b->init(2));
|
||||
|
||||
/*
|
||||
* Running and trying not to fall
|
||||
@@ -869,24 +878,48 @@ TEST(DynamicNodeIDAllocationServer, RaftCoreBasic)
|
||||
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000));
|
||||
|
||||
// The one with lower node ID must become a leader
|
||||
ASSERT_TRUE(raft_a.isLeader());
|
||||
ASSERT_FALSE(raft_b.isLeader());
|
||||
ASSERT_TRUE(raft_a->isLeader());
|
||||
ASSERT_FALSE(raft_b->isLeader());
|
||||
|
||||
ASSERT_EQ(0, raft_a.getCommitIndex());
|
||||
ASSERT_EQ(0, raft_b.getCommitIndex());
|
||||
ASSERT_EQ(0, raft_a->getCommitIndex());
|
||||
ASSERT_EQ(0, raft_b->getCommitIndex());
|
||||
|
||||
/*
|
||||
* Adding some stuff
|
||||
*/
|
||||
uavcan::protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id unique_id;
|
||||
Entry::FieldTypes::unique_id unique_id;
|
||||
uavcan::fill_n(unique_id.begin(), 16, uint8_t(0xAA));
|
||||
|
||||
ASSERT_LE(0, raft_a.appendLog(unique_id, uavcan::NodeID(1)));
|
||||
ASSERT_LE(0, raft_a->appendLog(unique_id, uavcan::NodeID(1)));
|
||||
|
||||
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000));
|
||||
|
||||
ASSERT_EQ(1, raft_a.getCommitIndex());
|
||||
ASSERT_EQ(1, raft_b.getCommitIndex());
|
||||
ASSERT_EQ(1, raft_a->getCommitIndex());
|
||||
ASSERT_EQ(1, raft_b->getCommitIndex());
|
||||
|
||||
/*
|
||||
* Terminating the leader - the Follower will continue to sleep
|
||||
*/
|
||||
raft_a.reset();
|
||||
|
||||
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(2000));
|
||||
|
||||
/*
|
||||
* Reinitializing the leader - current Follower will become the new Leader
|
||||
*/
|
||||
storage_a.reset();
|
||||
|
||||
raft_a.reset(new RaftCore(nodes.a, storage_a, tracer_a, commit_handler_a));
|
||||
ASSERT_LE(0, raft_a->init(2));
|
||||
ASSERT_EQ(0, raft_a->getCommitIndex());
|
||||
|
||||
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(5000));
|
||||
|
||||
ASSERT_FALSE(raft_a->isLeader());
|
||||
ASSERT_TRUE(raft_b->isLeader());
|
||||
|
||||
ASSERT_EQ(1, raft_a->getCommitIndex());
|
||||
ASSERT_EQ(1, raft_b->getCommitIndex());
|
||||
}
|
||||
|
||||
|
||||
@@ -898,8 +931,8 @@ TEST(DynamicNodeIDAllocationServer, EventCodeToString)
|
||||
// Simply checking some error codes
|
||||
ASSERT_STREQ("Error",
|
||||
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceError));
|
||||
ASSERT_STREQ("RaftModeSwitch",
|
||||
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftModeSwitch));
|
||||
ASSERT_STREQ("RaftActiveSwitch",
|
||||
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftActiveSwitch));
|
||||
ASSERT_STREQ("RaftAppendEntriesCallFailure",
|
||||
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftAppendEntriesCallFailure));
|
||||
ASSERT_STREQ("DiscoveryReceived",
|
||||
|
||||
Reference in New Issue
Block a user