Allocation request manager (untested)

This commit is contained in:
Pavel Kirienko
2015-05-08 20:00:30 +03:00
parent 6a35e65ecc
commit 952009c284
3 changed files with 273 additions and 11 deletions
@@ -671,6 +671,67 @@ public:
}
};
/**
* The main allocator must implement this interface.
*/
class IAllocationRequestHandler
{
public:
typedef protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id UniqueID;
virtual void handleAllocationRequest(const UniqueID& unique_id, NodeID preferred_node_id) = 0;
virtual ~IAllocationRequestHandler() { }
};
/**
* This class manages communication with allocation clients.
* Three-stage unique ID exchange is implemented here, as well as response publication.
*/
class AllocationRequestManager
{
typedef MethodBinder<AllocationRequestManager*,
void (AllocationRequestManager::*)
(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>&)>
AllocationCallback;
const MonotonicDuration stage_timeout_;
bool active_;
MonotonicTime last_message_timestamp_;
protocol::dynamic_node_id::Allocation::FieldTypes::unique_id current_unique_id_;
IAllocationRequestHandler& handler_;
Subscriber<protocol::dynamic_node_id::Allocation, AllocationCallback> allocation_sub_;
Publisher<protocol::dynamic_node_id::Allocation> allocation_pub_;
enum { InvalidStage = 0 };
static uint8_t detectRequestStage(const protocol::dynamic_node_id::Allocation& msg);
uint8_t getExpectedStage() const;
void broadcastIntermediateAllocationResponse();
void handleAllocation(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>& msg);
public:
AllocationRequestManager(INode& node, IAllocationRequestHandler& handler)
: stage_timeout_(MonotonicDuration::fromMSec(protocol::dynamic_node_id::Allocation::DEFAULT_REQUEST_PERIOD_MS))
, active_(false)
, handler_(handler)
, allocation_sub_(node)
, allocation_pub_(node)
{ }
int init();
void setActive(bool x);
bool isActive() const { return active_; }
int broadcastAllocationResponse(const IAllocationRequestHandler::UniqueID& unique_id, NodeID allocated_node_id);
};
} // namespace dynamic_node_id_server_impl
/**
@@ -678,21 +739,20 @@ public:
*/
class DynamicNodeIDAllocationServer
{
typedef MethodBinder<DynamicNodeIDAllocationServer*,
void (DynamicNodeIDAllocationServer::*)
(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>&)>
AllocationCallback;
typedef MethodBinder<DynamicNodeIDAllocationServer*,
void (DynamicNodeIDAllocationServer::*)
(const ReceivedDataStructure<protocol::NodeStatus>&)> NodeStatusCallback;
typedef Map<NodeID, uint8_t, 10> PendingGetNodeInfoAttemptsMap;
/*
* States
*/
PendingGetNodeInfoAttemptsMap pending_get_node_info_attempts_;
Subscriber<protocol::dynamic_node_id::Allocation, AllocationCallback> allocation_sub_;
Publisher<protocol::dynamic_node_id::Allocation> allocation_pub_;
/*
* Transport
*/
Subscriber<protocol::NodeStatus, NodeStatusCallback> node_status_sub_;
@@ -799,6 +799,7 @@ int ClusterManager::init(const uint8_t init_cluster_size)
{
return res;
}
(void)discovery_pub_.setPriority(TransferPriorityLow);
res = discovery_sub_.start(DiscoveryCallback(this, &ClusterManager::handleDiscovery));
if (res < 0)
@@ -1532,6 +1533,191 @@ int RaftCore::appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID no
}
}
/*
* AllocationRequestManager
*/
uint8_t AllocationRequestManager::detectRequestStage(const protocol::dynamic_node_id::Allocation& msg)
{
const uint8_t max_bytes_per_request = protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST;
if ((msg.unique_id.size() != max_bytes_per_request) &&
(msg.unique_id.size() != (msg.unique_id.capacity() - max_bytes_per_request * 2U)) &&
(msg.unique_id.size() != msg.unique_id.capacity())) // Future proofness for CAN FD
{
return InvalidStage;
}
if (msg.first_part_of_unique_id)
{
return 1; // Note that CAN FD frames can deliver the unique ID in one stage!
}
if (msg.unique_id.size() == protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 2;
}
if (msg.unique_id.size() < protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 3;
}
return InvalidStage;
}
uint8_t AllocationRequestManager::getExpectedStage() const
{
if (current_unique_id_.empty())
{
return 1;
}
if (current_unique_id_.size() >= (protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST * 2))
{
return 3;
}
if (current_unique_id_.size() >= protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 2;
}
return InvalidStage;
}
void AllocationRequestManager::broadcastIntermediateAllocationResponse()
{
UAVCAN_ASSERT(active_);
protocol::dynamic_node_id::Allocation msg;
msg.unique_id = current_unique_id_;
UAVCAN_ASSERT(msg.unique_id.size() < msg.unique_id.capacity());
const int res = allocation_pub_.broadcast(msg);
if (res < 0)
{
allocation_pub_.getNode().registerInternalFailure("Dynamic allocation broadcast");
}
}
void AllocationRequestManager::handleAllocation(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>& msg)
{
if (!msg.isAnonymousTransfer())
{
return; // This is a response from another allocator, ignore
}
if (!active_)
{
return; // The local node is not the leader, ignore
}
/*
* Reset the expected stage on timeout
*/
if (msg.getMonotonicTimestamp() > (last_message_timestamp_ + stage_timeout_))
{
UAVCAN_TRACE("AllocationRequestReceiver", "Stage timeout, reset");
current_unique_id_.clear();
}
last_message_timestamp_ = msg.getMonotonicTimestamp();
/*
* Checking if request stage matches the expected stage
*/
const uint8_t request_stage = detectRequestStage(msg);
if (request_stage == InvalidStage)
{
return; // No way
}
const uint8_t expected_stage = getExpectedStage();
if (request_stage == InvalidStage)
{
current_unique_id_.clear();
return;
}
if (request_stage != expected_stage)
{
return; // Ignore - stage mismatch
}
const uint8_t max_expected_bytes = static_cast<uint8_t>(current_unique_id_.capacity() - current_unique_id_.size());
UAVCAN_ASSERT(max_expected_bytes > 0);
if (msg.unique_id.size() > max_expected_bytes)
{
return; // Malformed request
}
/*
* Updating the local state
*/
for (uint8_t i = 0; i < msg.unique_id.size(); i++)
{
current_unique_id_.push_back(msg.unique_id[i]);
}
/*
* Proceeding with allocation if possible
*/
if (current_unique_id_.size() == current_unique_id_.capacity())
{
UAVCAN_TRACE("AllocationRequestReceiver", "Allocation request received; preferred node ID: %d",
int(msg.node_id));
IAllocationRequestHandler::UniqueID unique_id;
copy(current_unique_id_.begin(), current_unique_id_.end(), unique_id.begin());
current_unique_id_.clear();
handler_.handleAllocationRequest(unique_id, msg.node_id);
}
else
{
broadcastIntermediateAllocationResponse();
}
}
int AllocationRequestManager::init()
{
int res = allocation_pub_.init();
if (res < 0)
{
return res;
}
(void)allocation_pub_.setPriority(TransferPriorityLow);
res = allocation_sub_.start(AllocationCallback(this, &AllocationRequestManager::handleAllocation));
if (res < 0)
{
return res;
}
allocation_sub_.allowAnonymousTransfers();
return 0;
}
void AllocationRequestManager::setActive(bool x)
{
active_ = x;
if (!active_)
{
current_unique_id_.clear();
}
}
int AllocationRequestManager::broadcastAllocationResponse(const IAllocationRequestHandler::UniqueID& unique_id,
NodeID allocated_node_id)
{
if (!active_)
{
UAVCAN_ASSERT(0);
return -ErrLogic;
}
protocol::dynamic_node_id::Allocation msg;
msg.unique_id.resize(msg.unique_id.capacity());
copy(unique_id.begin(), unique_id.end(), msg.unique_id.begin());
msg.node_id = allocated_node_id.get();
return allocation_pub_.broadcast(msg);
}
} // dynamic_node_id_server_impl
}
@@ -940,10 +940,26 @@ TEST(DynamicNodeIDAllocationServer, EventCodeToString)
}
TEST(DynamicNodeIDAllocationServer, AllocationRequestManager)
{
}
TEST(DynamicNodeIDAllocationServer, ObjectSizes)
{
std::cout << "Log: " << sizeof(uavcan::dynamic_node_id_server_impl::Log) << std::endl;
std::cout << "PersistentState: " << sizeof(uavcan::dynamic_node_id_server_impl::PersistentState) << std::endl;
std::cout << "ClusterManager: " << sizeof(uavcan::dynamic_node_id_server_impl::ClusterManager) << std::endl;
std::cout << "RaftCore: " << sizeof(uavcan::dynamic_node_id_server_impl::RaftCore) << std::endl;
std::cout << "Log: "
<< sizeof(uavcan::dynamic_node_id_server_impl::Log) << std::endl;
std::cout << "PersistentState: "
<< sizeof(uavcan::dynamic_node_id_server_impl::PersistentState) << std::endl;
std::cout << "ClusterManager: "
<< sizeof(uavcan::dynamic_node_id_server_impl::ClusterManager) << std::endl;
std::cout << "RaftCore: "
<< sizeof(uavcan::dynamic_node_id_server_impl::RaftCore) << std::endl;
std::cout << "AllocationRequestManager: "
<< sizeof(uavcan::dynamic_node_id_server_impl::AllocationRequestManager) << std::endl;
}