ClusterManager test for the case of one server

This commit is contained in:
Pavel Kirienko
2015-05-05 11:29:51 +03:00
parent 92d74d35ea
commit f11f49a7d2
4 changed files with 122 additions and 33 deletions
@@ -278,7 +278,7 @@ class ClusterManager : private TimerBase
void handleDiscovery(const ReceivedDataStructure<protocol::dynamic_node_id::server::Discovery>& msg);
void publishDiscovery();
void startDiscoveryPublishingTimerIfNotRunning();
public:
enum { ClusterSizeUnknown = 0 };
@@ -346,6 +346,8 @@ public:
uint8_t getNumKnownServers() const { return num_known_servers_; }
uint8_t getClusterSize() const { return cluster_size_; }
uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); }
bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
};
/**
@@ -586,13 +586,42 @@ void ClusterManager::addServer(NodeID node_id)
void ClusterManager::handleTimerEvent(const TimerEvent&)
{
UAVCAN_ASSERT(num_known_servers_ < cluster_size_);
if (num_known_servers_ < (cluster_size_ - 1))
/*
* Filling the message
*/
protocol::dynamic_node_id::server::Discovery msg;
msg.configured_cluster_size = cluster_size_;
for (uint8_t i = 0; i < num_known_servers_; i++)
{
publishDiscovery();
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
msg.known_nodes.push_back(servers_[i].node_id.get());
}
else
msg.known_nodes.push_back(getNode().getNodeID().get());
UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1));
/*
* Broadcasting
*/
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Broadcasting Discovery message; known nodes: %d of %d",
int(msg.known_nodes.size()), int(cluster_size_));
const int res = discovery_pub_.broadcast(msg);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Cluster is fully discovered, no more broadcasts");
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Discovery broadcst failed: %d", res);
getNode().registerInternalFailure("Raft discovery broadcast");
}
/*
* Termination condition
*/
if (isClusterDiscovered())
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Discovery broadcasting timer stopped");
stop();
}
}
@@ -616,7 +645,7 @@ void ClusterManager::handleDiscovery(const ReceivedDataStructure<protocol::dynam
*/
for (uint8_t i = 0; i < msg.known_nodes.size(); i++)
{
if (num_known_servers_ >= (cluster_size_ - 1))
if (isClusterDiscovered())
{
break;
}
@@ -629,39 +658,20 @@ void ClusterManager::handleDiscovery(const ReceivedDataStructure<protocol::dynam
}
/*
* Publishing a new Discovery request if the timer is stopped already and the publishing server needs to
* learn about more servers.
* Publishing a new Discovery request if the publishing server needs to learn about more servers.
*/
if ((msg.configured_cluster_size > msg.known_nodes.size()) && !isRunning())
if (msg.configured_cluster_size > msg.known_nodes.size())
{
publishDiscovery();
startDiscoveryPublishingTimerIfNotRunning();
}
}
void ClusterManager::publishDiscovery()
void ClusterManager::startDiscoveryPublishingTimerIfNotRunning()
{
protocol::dynamic_node_id::server::Discovery msg;
msg.configured_cluster_size = cluster_size_;
for (uint8_t i = 0; i < num_known_servers_; i++)
if (!isRunning())
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
msg.known_nodes.push_back(servers_[i].node_id.get());
}
UAVCAN_ASSERT(msg.known_nodes.size() == num_known_servers_);
msg.known_nodes.push_back(getNode().getNodeID().get());
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Broadcasting Discovery message; known nodes: %d of %d",
int(msg.known_nodes.size()), int(cluster_size_));
const int res = discovery_pub_.broadcast(msg);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Discovery broadcst failed: %d", res);
getNode().registerInternalFailure("Raft discovery broadcast");
startPeriodic(
MonotonicDuration::fromMSec(protocol::dynamic_node_id::server::Discovery::BROADCASTING_INTERVAL_MS));
}
}
@@ -726,7 +736,7 @@ int ClusterManager::init(const uint8_t init_cluster_size)
return res;
}
startPeriodic(MonotonicDuration::fromMSec(protocol::dynamic_node_id::server::Discovery::BROADCASTING_INTERVAL_MS));
startDiscoveryPublishingTimerIfNotRunning();
/*
* Misc
+3
View File
@@ -16,10 +16,12 @@ struct TestNode : public uavcan::INode
uavcan::MarshalBufferProvider<> buffer_provider;
uavcan::OutgoingTransferRegistry<8> otr;
uavcan::Scheduler scheduler;
uint64_t internal_failure_count;
TestNode(uavcan::ICanDriver& can_driver, uavcan::ISystemClock& clock_driver, uavcan::NodeID self_node_id)
: otr(poolmgr)
, scheduler(can_driver, poolmgr, clock_driver, otr)
, internal_failure_count(0)
{
poolmgr.addPool(&pool);
setNodeID(self_node_id);
@@ -28,6 +30,7 @@ struct TestNode : public uavcan::INode
virtual void registerInternalFailure(const char* msg)
{
std::cout << "TestNode internal failure: " << msg << std::endl;
internal_failure_count++;
}
virtual uavcan::PoolManager<1>& getAllocator() { return poolmgr; }
@@ -594,3 +594,77 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerInitialization)
ASSERT_EQ(1, storage.getNumKeys());
}
}
TEST(DynamicNodeIDAllocationServer, ClusterManagerOneServer)
{
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::Discovery> _reg1;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log);
/*
* Pub and sub
*/
SubscriberWithCollector<uavcan::protocol::dynamic_node_id::server::Discovery> sub(nodes.b);
uavcan::Publisher<uavcan::protocol::dynamic_node_id::server::Discovery> pub(nodes.b);
ASSERT_LE(0, sub.start());
ASSERT_LE(0, pub.init());
/*
* Starting
*/
ASSERT_LE(0, mgr.init(1));
ASSERT_EQ(0, mgr.getNumKnownServers());
ASSERT_TRUE(mgr.isClusterDiscovered());
ASSERT_EQ(0, nodes.a.internal_failure_count);
/*
* Broadcasting discovery with wrong cluster size, it will be reported as internal failure
*/
uavcan::protocol::dynamic_node_id::server::Discovery msg;
msg.configured_cluster_size = 2;
msg.known_nodes.push_back(2U);
ASSERT_LE(0, pub.broadcast(msg));
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(10));
ASSERT_EQ(1, nodes.a.internal_failure_count);
/*
* Discovery rate limiting test
*/
ASSERT_FALSE(sub.collector.msg.get());
msg = uavcan::protocol::dynamic_node_id::server::Discovery();
msg.configured_cluster_size = 1; // Correct value
ASSERT_LE(0, pub.broadcast(msg)); // List of known nodes is empty, intentionally
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(100));
ASSERT_FALSE(sub.collector.msg.get());
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1000));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(1, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(1, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
sub.collector.msg.reset();
// Rinse repeat
ASSERT_LE(0, pub.broadcast(msg));
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(100));
ASSERT_FALSE(sub.collector.msg.get());
nodes.spinBoth(uavcan::MonotonicDuration::fromMSec(1000));
ASSERT_TRUE(sub.collector.msg.get());
ASSERT_EQ(1, sub.collector.msg->configured_cluster_size);
ASSERT_EQ(1, sub.collector.msg->known_nodes.size());
ASSERT_EQ(1, sub.collector.msg->known_nodes[0]);
sub.collector.msg.reset();
}