From 6f22745e55e45b8bb2c0b8501a5c170bf96e05e3 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Tue, 9 Jun 2015 19:49:16 +0300 Subject: [PATCH] Multithreading test for Linux --- .../linux/apps/test_multithreading.cpp | 355 ++++++++++++++---- 1 file changed, 289 insertions(+), 66 deletions(-) diff --git a/libuavcan_drivers/linux/apps/test_multithreading.cpp b/libuavcan_drivers/linux/apps/test_multithreading.cpp index 210e187cfe..f1bcf79c63 100644 --- a/libuavcan_drivers/linux/apps/test_multithreading.cpp +++ b/libuavcan_drivers/linux/apps/test_multithreading.cpp @@ -2,18 +2,143 @@ * Copyright (C) 2015 Pavel Kirienko */ +#ifndef NDEBUG +# define UAVCAN_DEBUG 1 +#endif + #include #include #include #include #include #include +#include #include "debug.hpp" /** - * Objects of this class are owned by the sub-node thread. + * Generic queue based on the linked list class defined in libuavcan. + * This class does not use heap memory. */ -class VirtualCanIface : public uavcan::ICanIface, uavcan::Noncopyable +template +class Queue +{ + struct Item : public uavcan::LinkedListNode + { + T payload; + + template + Item(Args... args) : payload(args...) { } + }; + + uavcan::LimitedPoolAllocator allocator_; + uavcan::LinkedListRoot list_; + +public: + Queue(uavcan::IPoolAllocator& arg_allocator, std::size_t block_allocation_quota) : + allocator_(arg_allocator, block_allocation_quota) + { + uavcan::IsDynamicallyAllocatable::check(); + } + + bool isEmpty() const { return list_.isEmpty(); } + + /** + * Creates one item in-place at the end of the list. + * Returns true if the item was appended successfully, false if there's not enough memory. + * Complexity is O(N) where N is queue length. + */ + template + bool tryEmplace(Args... args) + { + // Allocating memory + void* const ptr = allocator_.allocate(sizeof(Item)); + if (ptr == nullptr) + { + return false; + } + + // Constructing the new item + Item* const item = new (ptr) Item(args...); + assert(item != nullptr); + + // Inserting the new item at the end of the list + Item* p = list_.get(); + if (p == nullptr) + { + list_.insert(item); + } + else + { + while (p->getNextListNode() != nullptr) + { + p = p->getNextListNode(); + } + assert(p->getNextListNode() == nullptr); + p->setNextListNode(item); + assert(p->getNextListNode()->getNextListNode() == nullptr); + } + + return true; + } + + /** + * Accesses the first element. + * Nullptr will be returned if the queue is empty. + * Complexity is O(1). + */ + T* peek() { return isEmpty() ? nullptr : &list_.get()->payload; } + const T* peek() const { return isEmpty() ? nullptr : &list_.get()->payload; } + + /** + * Removes the first element. + * If the queue is empty, nothing will be done and assertion failure will be triggered. + * Complexity is O(1). + */ + void pop() + { + Item* const item = list_.get(); + assert(item != nullptr); + if (item != nullptr) + { + list_.remove(item); + item->~Item(); + allocator_.deallocate(item); + } + } +}; + +/** + * Feel free to remove. + */ +static void testQueue() +{ + uavcan::PoolAllocator<1024, uavcan::MemPoolBlockSize> allocator; + Queue::Type> q(allocator, 4); + ENFORCE(q.isEmpty()); + ENFORCE(q.peek() == nullptr); + ENFORCE(q.tryEmplace("One")); + ENFORCE(q.tryEmplace("Two")); + ENFORCE(q.tryEmplace("Three")); + ENFORCE(q.tryEmplace("Four")); + ENFORCE(!q.tryEmplace("Five")); + ENFORCE(*q.peek() == "One"); + q.pop(); + ENFORCE(*q.peek() == "Two"); + q.pop(); + ENFORCE(*q.peek() == "Three"); + q.pop(); + ENFORCE(*q.peek() == "Four"); + q.pop(); + ENFORCE(q.isEmpty()); + ENFORCE(q.peek() == nullptr); +} + +/** + * Objects of this class are owned by the sub-node thread. + * This class does not use heap memory. + */ +class VirtualCanIface : public uavcan::ICanIface, + uavcan::Noncopyable { struct RxItem { @@ -27,13 +152,13 @@ class VirtualCanIface : public uavcan::ICanIface, uavcan::Noncopyable }; std::mutex& mutex_; - uavcan::CanTxQueue tx_queue_; - std::queue rx_queue_; + uavcan::CanTxQueue prioritized_tx_queue_; + Queue rx_queue_; int16_t send(const uavcan::CanFrame& frame, uavcan::MonotonicTime tx_deadline, uavcan::CanIOFlags flags) override { std::lock_guard lock(mutex_); - tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags); + prioritized_tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags); return 1; } @@ -42,12 +167,12 @@ class VirtualCanIface : public uavcan::ICanIface, uavcan::Noncopyable { std::lock_guard lock(mutex_); - if (rx_queue_.empty()) + if (rx_queue_.isEmpty()) { return 0; } - const auto item = rx_queue_.front(); + const auto item = *rx_queue_.peek(); rx_queue_.pop(); out_frame = item.frame; @@ -62,25 +187,27 @@ class VirtualCanIface : public uavcan::ICanIface, uavcan::Noncopyable uint16_t getNumFilters() const override { return 0; } uint64_t getErrorCount() const override { return 0; } - static unsigned computeTxQueuePoolQuota(uavcan::INode& node) - { - return node.getAllocator().getNumBlocks() / 4; - } - public: - VirtualCanIface(uavcan::INode& node, std::mutex& arg_mutex) : + VirtualCanIface(uavcan::IPoolAllocator& allocator, uavcan::ISystemClock& clock, + std::mutex& arg_mutex, unsigned quota_per_queue) : mutex_(arg_mutex), - tx_queue_(node.getAllocator(), node.getSystemClock(), computeTxQueuePoolQuota(node)) + prioritized_tx_queue_(allocator, clock, quota_per_queue), + rx_queue_(allocator, quota_per_queue) { } /** + * Note that RX queue overwrites oldest items when overflowed. * Call this from the main thread only. * No additional locking is required. */ void addRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) { std::lock_guard lock(mutex_); - rx_queue_.emplace(frame, flags); + if (!rx_queue_.tryEmplace(frame, flags) && !rx_queue_.isEmpty()) + { + rx_queue_.pop(); + (void)rx_queue_.tryEmplace(frame, flags); + } } /** @@ -93,15 +220,18 @@ public: const std::uint8_t iface_mask = static_cast(1U << iface_index); - while (auto e = tx_queue_.peek()) + while (auto e = prioritized_tx_queue_.peek()) { + UAVCAN_TRACE("VirtualCanIface", "TX injection [iface=0x%02x]: %s", + unsigned(iface_mask), e->toString().c_str()); + const int res = main_node.injectTxFrame(e->frame, e->deadline, iface_mask, uavcan::CanTxQueue::Qos(e->qos), e->flags); if (res <= 0) { break; } - tx_queue_.remove(e); + prioritized_tx_queue_.remove(e); } } @@ -112,14 +242,38 @@ public: bool hasDataInRxQueue() const { std::lock_guard lock(mutex_); - return !rx_queue_.empty(); + return !rx_queue_.isEmpty(); } }; /** - * Objects of this class are owned by the sub-node thread. + * This interface defines one method that will be called by the main node thread periodically in order to + * transfer contents of TX queue of the sub-node into the TX queue of the main node. */ -class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListener +class ITxQueueInjector +{ +public: + virtual ~ITxQueueInjector() { } + + /** + * Flush contents of TX queues into the main node. + * @param main_node Reference to the main node. + */ + virtual void injectTxFramesInto(uavcan::INode& main_node) = 0; +}; + +/** + * Objects of this class are owned by the sub-node thread. + * This class does not use heap memory. + * @tparam SharedMemoryPoolSize Amount of memory, in bytes, that will be statically allocated for the + * memory pool that will be shared across all interfaces for RX/TX queues. + * Typically this value should be no less than 4K per interface. + */ +template +class VirtualCanDriver : public uavcan::ICanDriver, + public uavcan::IRxFrameListener, + public ITxQueueInjector, + uavcan::Noncopyable { class Event { @@ -139,17 +293,19 @@ class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListe void signal() { cv_.notify_all(); } }; - Event event_; - std::mutex mutex_; - uavcan::INode& sub_node_; - std::vector> ifaces_; + Event event_; ///< Used to unblock the select() call when IO happens. + std::mutex mutex_; ///< Shared across all ifaces + uavcan::PoolAllocator allocator_; ///< Shared across all ifaces + uavcan::LazyConstructor ifaces_[uavcan::MaxCanIfaces]; + const unsigned num_ifaces_; + uavcan_linux::SystemClock clock_; uavcan::ICanIface* getIface(uint8_t iface_index) override { - return ifaces_.at(iface_index).get(); + return (iface_index < num_ifaces_) ? ifaces_[iface_index].operator VirtualCanIface*() : nullptr; } - uint8_t getNumIfaces() const override { return ifaces_.size(); } + uint8_t getNumIfaces() const override { return num_ifaces_; } /** * This and other methods of ICanDriver will be invoked by the sub-node thread. @@ -157,7 +313,7 @@ class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListe int16_t select(uavcan::CanSelectMasks& inout_masks, uavcan::MonotonicTime blocking_deadline) override { bool need_block = (inout_masks.write == 0); // Write queue is infinite - for (unsigned i = 0; need_block && (i < ifaces_.size()); i++) + for (unsigned i = 0; need_block && (i < num_ifaces_); i++) { const bool need_read = inout_masks.read & (1U << i); if (need_read && ifaces_[i]->hasDataInRxQueue()) @@ -168,11 +324,11 @@ class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListe if (need_block) { - event_.waitFor(blocking_deadline - sub_node_.getMonotonicTime()); + event_.waitFor(blocking_deadline - clock_.getMonotonic()); } inout_masks = uavcan::CanSelectMasks(); - for (unsigned i = 0; i < ifaces_.size(); i++) + for (unsigned i = 0; i < num_ifaces_; i++) { const std::uint8_t iface_mask = 1U << i; inout_masks.write |= iface_mask; // Always ready to write @@ -182,7 +338,7 @@ class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListe } } - return ifaces_.size(); // We're always ready for write, hence > 0. + return num_ifaces_; // We're always ready to write, hence > 0. } /** @@ -190,34 +346,47 @@ class VirtualCanDriver : public uavcan::ICanDriver, public uavcan::IRxFrameListe */ void handleRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) override { - ifaces_.at(frame.iface_index)->addRxFrame(frame, flags); - event_.signal(); - } - -public: - VirtualCanDriver(uavcan::INode& arg_sub_node, unsigned num_ifaces) : - sub_node_(arg_sub_node) - { - for (unsigned i = 0; i < num_ifaces; i++) + UAVCAN_TRACE("VirtualCanDriver", "RX [flags=%u]: %s", unsigned(flags), frame.toString().c_str()); + if (frame.iface_index < num_ifaces_) { - ifaces_.emplace_back(new VirtualCanIface(arg_sub_node, mutex_)); + ifaces_[frame.iface_index]->addRxFrame(frame, flags); + event_.signal(); + } + else + { + assert(false); } } /** - * This method should be invoked from the main node thread periodically in order to move contents of the - * sub-node's TX queues into the TX queues of the main node. A good practice is to call this method - * immediately after executing spinOnce() on the main node. - * No additional locking is required. + * This method will be invoked by the main node thread. */ - void flushTxQueuesTo(uavcan::INode& main_node) + void injectTxFramesInto(uavcan::INode& main_node) override { - for (unsigned i = 0; i < ifaces_.size(); i++) + for (unsigned i = 0; i < num_ifaces_; i++) { - ifaces_.at(i)->flushTxQueueTo(main_node, i); + ifaces_[i]->flushTxQueueTo(main_node, i); } event_.signal(); } + +public: + VirtualCanDriver(unsigned arg_num_ifaces) : num_ifaces_(arg_num_ifaces) + { + assert(num_ifaces_ > 0 && num_ifaces_ <= uavcan::MaxCanIfaces); + + const unsigned quota_per_iface = allocator_.getNumBlocks() / num_ifaces_; + const unsigned quota_per_queue = quota_per_iface; // 2x overcommit + + UAVCAN_TRACE("VirtualCanDriver", "Total blocks: %u, quota per queue: %u", + unsigned(allocator_.getNumBlocks()), unsigned(quota_per_queue)); + + for (unsigned i = 0; i < num_ifaces_; i++) + { + ifaces_[i].template construct(allocator_, clock_, mutex_, quota_per_queue); + } + } }; static uavcan_linux::NodePtr initMainNode(const std::vector& ifaces, uavcan::NodeID nid, @@ -246,11 +415,19 @@ static uavcan_linux::NodePtr initMainNode(const std::vector& ifaces return node; } -static uavcan_linux::SubNodePtr initSubNode(const std::vector& ifaces, uavcan::NodeID nid) +template +static uavcan_linux::SubNodePtr initSubNode(unsigned num_ifaces, uavcan::INode& main_node) { std::cout << "Initializing sub node" << std::endl; - auto node = uavcan_linux::makeSubNode(ifaces); - node->setNodeID(nid); + + typedef VirtualCanDriver Driver; + + std::shared_ptr driver(new Driver(num_ifaces)); + auto node = uavcan_linux::makeSubNode(driver); + node->setNodeID(main_node.getNodeID()); + + main_node.getDispatcher().installRxFrameListener(driver.get()); + return node; } @@ -258,20 +435,32 @@ static void runMainNode(const uavcan_linux::NodePtr& node) { std::cout << "Running main node" << std::endl; - auto do_nothing_once_a_minute = [&node](const uavcan::TimerEvent&) + auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(10000), [&node](const uavcan::TimerEvent&) + { + node->logInfo("timer", "Your time is running out."); + node->setVendorSpecificStatusCode(static_cast(std::rand())); + }); + + /* + * We know that in this implementation, VirtualCanDriver inherits uavcan::IRxFrameListener, so we can simply + * restore the reference to ITxQueueInjector using dynamic_cast. In other implementations this may be + * unacceptable, so a reference to ITxQueueInjector will have to be passed using some other means. + */ + if (node->getDispatcher().getRxFrameListener() == nullptr) { - node->logInfo("timer", "Another minute passed..."); - node->setVendorSpecificStatusCode(static_cast(std::rand())); // Setting to an arbitrary value - }; - auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(60000), do_nothing_once_a_minute); + throw std::logic_error("RX frame listener is not configured"); + } + ITxQueueInjector& tx_injector = dynamic_cast(*node->getDispatcher().getRxFrameListener()); while (true) { - const int res = node->spin(uavcan::MonotonicDuration::getInfinite()); + const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1)); if (res < 0) { node->logError("spin", "Error %*", res); } + // TX queue transfer occurs here. + tx_injector.injectTxFramesInto(*node); } } @@ -279,12 +468,18 @@ static void runSubNode(const uavcan_linux::SubNodePtr& node) { std::cout << "Running sub node" << std::endl; - auto log_handler = [](const uavcan::ReceivedDataStructure& msg) - { - std::cout << msg << std::endl; - }; - auto log_sub = node->makeSubscriber(log_handler); + /* + * Log subscriber + */ + auto log_sub = node->makeSubscriber( + [](const uavcan::ReceivedDataStructure& msg) + { + std::cout << msg << std::endl; + }); + /* + * Node status monitor + */ struct NodeStatusMonitor : public uavcan::NodeStatusMonitor { explicit NodeStatusMonitor(uavcan::INode& node) : uavcan::NodeStatusMonitor(node) { } @@ -292,17 +487,41 @@ static void runSubNode(const uavcan_linux::SubNodePtr& node) virtual void handleNodeStatusChange(const NodeStatusChangeEvent& event) override { std::cout << "Remote node NID " << int(event.node_id.get()) << " changed status: " - << int(event.old_status.status_code) << " --> " - << int(event.status.status_code) << std::endl; + << int(event.old_status.status_code) << " --> " << int(event.status.status_code) << std::endl; } }; - NodeStatusMonitor nsm(*node); ENFORCE(0 == nsm.start()); + /* + * KV subscriber + */ + auto kv_sub = node->makeSubscriber( + [](const uavcan::ReceivedDataStructure& msg) + { + std::cout << msg << std::endl; + }); + + /* + * KV publisher + */ + unsigned kv_value = 0; + auto kv_pub = node->makePublisher(); + auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(5000), [&](const uavcan::TimerEvent&) + { + uavcan::protocol::debug::KeyValue kv; + kv.key = "five_seconds"; + kv.value = kv_value++; + const int res = kv_pub->broadcast(kv); + if (res < 0) + { + std::cerr << "Sub KV pub err " << res << std::endl; + } + }); + while (true) { - const int res = node->spin(uavcan::MonotonicDuration::getInfinite()); + const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1000)); if (res < 0) { std::cerr << "SubNode spin error: " << res << std::endl; @@ -314,6 +533,10 @@ int main(int argc, const char** argv) { try { + testQueue(); + + constexpr unsigned VirtualIfacePoolSize = 32768; + if (argc < 3) { std::cerr << "Usage:\n\t" << argv[0] << " [can-iface-name-N...]" << std::endl; @@ -324,7 +547,7 @@ int main(int argc, const char** argv) std::vector iface_names(argv + 2, argv + argc); auto node = initMainNode(iface_names, self_node_id, "org.uavcan.linux_test_node"); - auto sub_node = initSubNode(iface_names, self_node_id); + auto sub_node = initSubNode(iface_names.size(), *node); std::thread sub_thread([&sub_node](){ runSubNode(sub_node); });