ClusterManager tests

This commit is contained in:
Pavel Kirienko
2015-05-05 12:53:33 +03:00
parent 3af95e3dd4
commit 2273df059a
3 changed files with 127 additions and 4 deletions
@@ -247,6 +247,8 @@ class ClusterManager : private TimerBase
: next_index(0)
, match_index(0)
{ }
void resetIndices(const Log& log);
};
enum { MaxServers = protocol::dynamic_node_id::server::Discovery::FieldTypes::known_nodes::MaxSize };
@@ -539,6 +539,12 @@ int PersistentState::setVotedFor(const NodeID node_id)
/*
* ClusterManager
*/
void ClusterManager::Server::resetIndices(const Log& log)
{
next_index = Log::Index(log.getLastIndex() + 1U);
match_index = 0;
}
ClusterManager::Server* ClusterManager::findServer(NodeID node_id)
{
for (uint8_t i = 0; i < num_known_servers_; i++)
@@ -581,6 +587,7 @@ void ClusterManager::addServer(NodeID node_id)
if (!isKnownServer(node_id) && node_id.isUnicast())
{
servers_[num_known_servers_].node_id = node_id;
servers_[num_known_servers_].resetIndices(log_);
num_known_servers_ = static_cast<uint8_t>(num_known_servers_ + 1U);
}
else
@@ -599,14 +606,14 @@ void ClusterManager::handleTimerEvent(const TimerEvent&)
protocol::dynamic_node_id::server::Discovery msg;
msg.configured_cluster_size = cluster_size_;
msg.known_nodes.push_back(getNode().getNodeID().get()); // Putting ourselves at index 0
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
msg.known_nodes.push_back(servers_[i].node_id.get());
}
msg.known_nodes.push_back(getNode().getNodeID().get());
UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1));
/*
@@ -826,8 +833,7 @@ void ClusterManager::resetAllServerIndices()
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
servers_[i].next_index = Log::Index(log_.getLastIndex() + 1U);
servers_[i].match_index = 0;
servers_[i].resetIndices(log_);
}
}
@@ -668,3 +668,118 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerOneServer)
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
sub.collector.msg.reset();
}
TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers)
{
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::Discovery> _reg1;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log);
/*
* Pub and sub
*/
SubscriberWithCollector<uavcan::protocol::dynamic_node_id::server::Discovery> sub(nodes.b);
uavcan::Publisher<uavcan::protocol::dynamic_node_id::server::Discovery> pub(nodes.b);
ASSERT_LE(0, sub.start());
ASSERT_LE(0, pub.init());
/*
* Starting
*/
ASSERT_LE(0, mgr.init(3));
ASSERT_EQ(0, mgr.getNumKnownServers());
ASSERT_FALSE(mgr.isClusterDiscovered());
/*
* Discovery publishing rate check
*/
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(100));
ASSERT_FALSE(sub.collector.msg.get());
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1000));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(3, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(1, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
sub.collector.msg.reset();
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(100));
ASSERT_FALSE(sub.collector.msg.get());
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1000));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(3, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(1, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
sub.collector.msg.reset();
/*
* Discovering other nodes
*/
uavcan::protocol::dynamic_node_id::server::Discovery msg;
msg.configured_cluster_size = 3;
msg.known_nodes.push_back(2U);
ASSERT_LE(0, pub.broadcast(msg));
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1050));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(3, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(2, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
ASSERT_EQ(2, sub.collector.msg->known_nodes[1]);
sub.collector.msg.reset();
ASSERT_FALSE(mgr.isClusterDiscovered());
// This will complete the discovery
msg.known_nodes.push_back(127U);
ASSERT_LE(0, pub.broadcast(msg));
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1050));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(3, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(3, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
ASSERT_EQ(2, sub.collector.msg->known_nodes[1]);
ASSERT_EQ(127, sub.collector.msg->known_nodes[2]);
sub.collector.msg.reset();
// Making sure discovery is now terminated
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1500));
ASSERT_FALSE(sub.collector.msg.get());
/*
* Checking Raft states
*/
ASSERT_EQ(uavcan::NodeID(2), mgr.getRemoteServerNodeIDAtIndex(0));
ASSERT_EQ(uavcan::NodeID(127), mgr.getRemoteServerNodeIDAtIndex(1));
ASSERT_EQ(uavcan::NodeID(), mgr.getRemoteServerNodeIDAtIndex(2));
ASSERT_EQ(0, mgr.getServerMatchIndex(2));
ASSERT_EQ(0, mgr.getServerMatchIndex(127));
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(2));
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(127));
mgr.setServerMatchIndex(2, 10);
ASSERT_EQ(10, mgr.getServerMatchIndex(2));
mgr.incrementServerNextIndexBy(2, 5);
ASSERT_EQ(log.getLastIndex() + 1 + 5, mgr.getServerNextIndex(2));
mgr.decrementServerNextIndex(2);
ASSERT_EQ(log.getLastIndex() + 1 + 5 - 1, mgr.getServerNextIndex(2));
mgr.resetAllServerIndices();
ASSERT_EQ(0, mgr.getServerMatchIndex(2));
ASSERT_EQ(0, mgr.getServerMatchIndex(127));
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(2));
ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(127));
}