Raft allocator adds its own allocation entry to the log

This commit is contained in:
Pavel Kirienko
2015-05-10 20:03:17 +03:00
parent 6e287dc1b2
commit a309c6d8da
4 changed files with 98 additions and 15 deletions
+1
View File
@@ -34,6 +34,7 @@ const int16_t ErrRecursiveCall = 9;
const int16_t ErrLogic = 10;
const int16_t ErrPassiveMode = 11; ///< Operation not permitted in passive mode
const int16_t ErrTransferTooLong = 12; ///< Transfer of this length cannot be sent with given transfer type
const int16_t ErrInvalidConfiguration = 13;
/**
* @}
*/
@@ -36,6 +36,13 @@ public:
*/
virtual void handleLogCommitOnLeader(const Entry& committed_entry) = 0;
/**
* Invoked by the Raft core when the local node becomes a leader or ceases to be one.
* By default the local node is not leader.
* It is possible to commit to the log right from this method.
*/
virtual void handleLocalLeadershipChange(bool local_node_is_leader) = 0;
virtual ~IRaftLeaderMonitor() { }
};
@@ -270,24 +277,46 @@ class RaftCore : private TimerBase
void switchState(ServerState new_state)
{
if (server_state_ != new_state)
if (server_state_ == new_state)
{
UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "State switch: %d --> %d",
int(server_state_), int(new_state));
trace(TraceRaftStateSwitch, new_state);
return;
}
server_state_ = new_state;
/*
* Logging
*/
UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "State switch: %d --> %d",
int(server_state_), int(new_state));
trace(TraceRaftStateSwitch, new_state);
cluster_.resetAllServerIndices();
/*
* Updating the current state
*/
const ServerState old_state = server_state_;
server_state_ = new_state;
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
/*
* Resetting specific states
*/
cluster_.resetAllServerIndices();
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i]->cancel();
}
append_entries_client_.cancel();
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i]->cancel();
}
append_entries_client_.cancel();
/*
* Calling the switch handler
* Note that the handler may commit to the log directly
*/
if ((old_state == ServerStateLeader) ||
(new_state == ServerStateLeader))
{
leader_monitor_.handleLocalLeadershipChange(new_state == ServerStateLeader);
}
}
@@ -56,6 +56,11 @@ class Server : IAllocationRequestHandler
}
};
/*
* Constants
*/
UniqueID own_unique_id_;
/*
* States
*/
@@ -183,6 +188,26 @@ class Server : IAllocationRequestHandler
tryPublishAllocationResult(entry);
}
virtual void handleLocalLeadershipChange(bool local_node_is_leader)
{
if (!local_node_is_leader)
{
return;
}
const LazyConstructor<RaftCore::LogEntryInfo> result =
raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
if (!result.isConstructed())
{
const int res = raft_core_.appendLog(own_unique_id_, node_.getNodeID());
if (res < 0)
{
node_.registerInternalFailure("Raft log append with self ID");
}
}
}
/*
* Private methods
*/
@@ -231,14 +256,36 @@ public:
, node_discoverer_(node, tracer, *this)
{ }
int init(uint8_t cluster_size = ClusterManager::ClusterSizeUnknown)
int init(const UniqueID& own_unique_id, const uint8_t cluster_size = ClusterManager::ClusterSizeUnknown)
{
/*
* Initializing Raft core first, because the next step requires Log to be loaded
*/
int res = raft_core_.init(cluster_size);
if (res < 0)
{
return res;
}
/*
* Making sure that the server is started with the same node ID
*/
own_unique_id_ = own_unique_id;
const LazyConstructor<RaftCore::LogEntryInfo> own_log_entry =
raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
if (own_log_entry.isConstructed())
{
if (own_log_entry->entry.unique_id != own_unique_id_)
{
return -ErrInvalidConfiguration;
}
}
/*
* Misc
*/
res = allocation_request_manager_.init();
if (res < 0)
{
@@ -136,12 +136,18 @@ TEST(DynamicNodeIDServer, Main)
// Node A is Allocator, Node B is Allocatee
InterlinkedTestNodesWithSysClock nodes(uavcan::NodeID(10), uavcan::NodeID::Broadcast);
UniqueID own_unique_id;
own_unique_id[0] = 0xAA;
own_unique_id[3] = 0xCC;
own_unique_id[7] = 0xEE;
own_unique_id[9] = 0xBB;
/*
* Server
*/
DistributedServer server(nodes.a, storage, tracer);
ASSERT_LE(0, server.init(1));
ASSERT_LE(0, server.init(own_unique_id, 1));
/*
* Client