From 2c2d7605a7e0c28ef26c8e6a29dbb03a2302e77a Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Sun, 9 Mar 2014 17:58:32 +0400 Subject: [PATCH] Subscriber with simple test --- .../uavcan/received_data_structure.hpp | 48 +++++ libuavcan/include/uavcan/subscriber.hpp | 158 +++++++++++++++++ libuavcan/test/subscriber.cpp | 164 ++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 libuavcan/include/uavcan/received_data_structure.hpp create mode 100644 libuavcan/include/uavcan/subscriber.hpp create mode 100644 libuavcan/test/subscriber.cpp diff --git a/libuavcan/include/uavcan/received_data_structure.hpp b/libuavcan/include/uavcan/received_data_structure.hpp new file mode 100644 index 0000000000..2c021951d0 --- /dev/null +++ b/libuavcan/include/uavcan/received_data_structure.hpp @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2014 Pavel Kirienko + */ + +#pragma once + +#include +#include + +namespace uavcan +{ + +template +class ReceivedDataStructure : public DataType_ +{ + const IncomingTransfer* transfer_; + + template + Ret safeget() const + { + if (!transfer_) + { + assert(0); + return Ret(); + } + return (transfer_->*Fun)(); + } + +protected: + ReceivedDataStructure() : transfer_(NULL) { } + + void setTransfer(const IncomingTransfer* transfer) + { + assert(transfer); + transfer_ = transfer; + } + +public: + typedef DataType_ DataType; + + uint64_t getMonotonicTimestamp() const { return safeget(); } + uint64_t getUtcTimestamp() const { return safeget(); } + TransferType getTransferType() const { return safeget(); } + TransferID getTransferID() const { return safeget(); } + NodeID getSrcNodeID() const { return safeget(); } +}; + +} diff --git a/libuavcan/include/uavcan/subscriber.hpp b/libuavcan/include/uavcan/subscriber.hpp new file mode 100644 index 0000000000..5aa896570d --- /dev/null +++ b/libuavcan/include/uavcan/subscriber.hpp @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2014 Pavel Kirienko + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uavcan +{ + +template &), + unsigned int NumStaticBufs = 1, + unsigned int NumStaticReceivers = NumStaticBufs + 1> +class Subscriber : Noncopyable +{ + typedef Subscriber SelfType; + +public: + typedef DataType_ DataType; + +private: + typedef TransferListener::Result, + NumStaticBufs ? NumStaticBufs : 1, // TODO: add support for zero buffers + NumStaticReceivers ? NumStaticReceivers : 1> TransferListenerType; + + // We need to break the inheritance chain here to implement lazy initialization + class TransferForwarder : public TransferListenerType + { + SelfType& obj_; + + void handleIncomingTransfer(IncomingTransfer& transfer) + { + obj_.handleIncomingTransfer(transfer); + } + + public: + TransferForwarder(SelfType& obj, const DataTypeDescriptor& data_type, IAllocator& allocator) + : TransferListenerType(data_type, allocator) + , obj_(obj) + { } + }; + + struct ReceivedDataStructureSpec : public ReceivedDataStructure + { + using ReceivedDataStructure::setTransfer; + }; + + Scheduler& scheduler_; + IAllocator& allocator_; + Callback callback_; + LazyConstructor forwarder_; + ReceivedDataStructureSpec message_; + uint32_t failure_count_; + + bool checkInit() + { + if (forwarder_) + return true; + + GlobalDataTypeRegistry::instance().freeze(); + + const DataTypeDescriptor* const descr = + GlobalDataTypeRegistry::instance().find(DataTypeKindMessage, DataType::getDataTypeFullName()); + if (!descr) + { + UAVCAN_TRACE("Subscriber", "Type [%s] is not registered", DataType::getDataTypeFullName()); + return false; + } + forwarder_.template construct(*this, *descr, allocator_); + return true; + } + + void handleIncomingTransfer(IncomingTransfer& transfer) + { + assert(transfer.getTransferType() == TransferTypeMessageBroadcast || + transfer.getTransferType() == TransferTypeMessageUnicast); + + { + BitStream bitstream(transfer); + ScalarCodec codec(bitstream); + + const int decode_res = DataType::decode(message_, codec); + // We don't need the data anymore, the memory can be reused from the callback: + transfer.release(); + if (decode_res <= 0) + { + UAVCAN_TRACE("Subscriber", "Unable to decode the message [%i] [%s]", + decode_res, DataType::getDataTypeFullName()); + failure_count_++; + return; + } + } + + message_.setTransfer(&transfer); + if (try_implicit_cast(callback_, true)) + callback_(message_); // Callback can accept non-const message reference and mutate it, that's OK + else + assert(0); + } + +public: + Subscriber(Scheduler& scheduler, IAllocator& allocator) + : scheduler_(scheduler) + , allocator_(allocator) + , callback_() + , failure_count_(0) + { + StaticAssert::check(); + } + + int start(Callback callback) + { + stop(); + + if (!try_implicit_cast(callback, true)) + { + UAVCAN_TRACE("Subscriber", "Invalid callback"); + return -1; + } + callback_ = callback; + + if (!checkInit()) + { + UAVCAN_TRACE("Subscriber", "Initialization failure [%s]", DataType::getDataTypeFullName()); + return -1; + } + + if (!scheduler_.getDispatcher().registerMessageListener(forwarder_)) + { + UAVCAN_TRACE("Subscriber", "Failed to register message listener [%s]", DataType::getDataTypeFullName()); + return -1; + } + return 1; + } + + void stop() + { + if (forwarder_) + scheduler_.getDispatcher().unregisterMessageListener(forwarder_); + } + + Scheduler& getScheduler() const { return scheduler_; } + + uint32_t getFailureCount() const { return failure_count_; } +}; + +} diff --git a/libuavcan/test/subscriber.cpp b/libuavcan/test/subscriber.cpp new file mode 100644 index 0000000000..c63b566ea4 --- /dev/null +++ b/libuavcan/test/subscriber.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2014 Pavel Kirienko + */ + +#include +#include +#include +#include +#include "common.hpp" +#include "transport/can/iface_mock.hpp" + + +template +struct SubscriptionListener +{ + typedef uavcan::ReceivedDataStructure ReceivedDataStructure; + + struct ReceivedDataStructureCopy + { + uint64_t ts_monotonic; + uint64_t ts_utc; + uavcan::TransferType transfer_type; + uavcan::TransferID transfer_id; + uavcan::NodeID src_node_id; + DataType msg; + + ReceivedDataStructureCopy(const ReceivedDataStructure& s) + : ts_monotonic(s.getMonotonicTimestamp()) + , ts_utc(s.getUtcTimestamp()) + , transfer_type(s.getTransferType()) + , transfer_id(s.getTransferID()) + , src_node_id(s.getSrcNodeID()) + , msg(s) + { } + }; + + std::vector simple; + std::vector extended; + + void receiveExtended(ReceivedDataStructure& msg) + { + extended.push_back(msg); + } + + void receiveSimple(DataType& msg) + { + simple.push_back(msg); + } + + typedef SubscriptionListener SelfType; + typedef uavcan::MethodBinder ExtendedBinder; + typedef uavcan::MethodBinder SimpleBinder; + + ExtendedBinder bindExtended() { return ExtendedBinder(this, &SelfType::receiveExtended); } + SimpleBinder bindSimple() { return SimpleBinder(this, &SelfType::receiveSimple); } +}; + +// TODO: add autogenerated comparison operators, then remove this +static bool operator==(const uavcan::mavlink::Message& a, const uavcan::mavlink::Message& b) +{ + return + a.seq == b.seq && + a.sysid == b.sysid && + a.compid == b.compid && + a.msgid == b.msgid && + a.payload == b.payload; +} + +TEST(Subscriber, Basic) +{ + uavcan::PoolAllocator pool; + uavcan::PoolManager<1> poolmgr; + poolmgr.addPool(&pool); + + // Manual type registration - we can't rely on the GDTR state + uavcan::GlobalDataTypeRegistry::instance().reset(); + uavcan::DefaultDataTypeRegistrator _registrator; + + SystemClockDriver clock_driver; + CanDriverMock can_driver(2, clock_driver); + + uavcan::OutgoingTransferRegistry<8> out_trans_reg(poolmgr); + + uavcan::Scheduler sch(can_driver, poolmgr, clock_driver, out_trans_reg, uavcan::NodeID(1)); + + typedef SubscriptionListener Listener; + + uavcan::Subscriber sub_extended(sch, poolmgr); + uavcan::Subscriber sub_simple(sch, poolmgr); + + // Null binder - will fail + ASSERT_EQ(-1, sub_extended.start(Listener::ExtendedBinder(NULL, NULL))); + + Listener listener; + + /* + * Message layout: + * uint8 seq + * uint8 sysid + * uint8 compid + * uint8 msgid + * uint8[<256] payload + */ + uavcan::mavlink::Message expected_msg; + expected_msg.seq = 0x42; + expected_msg.sysid = 0x72; + expected_msg.compid = 0x08; + expected_msg.msgid = 0xa5; + expected_msg.payload = "Msg"; + + const uint8_t transfer_payload[] = {0x42, 0x72, 0x08, 0xa5, 'M', 's', 'g'}; + + /* + * RxFrame generation + */ + std::vector rx_frames; + for (int i = 0; i < 4; i++) + { + uavcan::TransferType tt = (i & 1) ? uavcan::TransferTypeMessageUnicast : uavcan::TransferTypeMessageBroadcast; + uavcan::NodeID dni = (tt == uavcan::TransferTypeMessageBroadcast) ? + uavcan::NodeID::Broadcast : sch.getDispatcher().getSelfNodeID(); + // uint_fast16_t data_type_id, TransferType transfer_type, NodeID src_node_id, NodeID dst_node_id, + // uint_fast8_t frame_index, TransferID transfer_id, bool last_frame + uavcan::Frame frame(uavcan::mavlink::Message::DefaultDataTypeID, tt, uavcan::NodeID(i + 100), dni, 0, i, true); + frame.setPayload(transfer_payload, 7); + uavcan::RxFrame rx_frame(frame, clock_driver.getMonotonicMicroseconds(), clock_driver.getUtcMicroseconds(), 0); + rx_frames.push_back(rx_frame); + } + + /* + * Reception + */ + ASSERT_EQ(1, sub_extended.start(listener.bindExtended())); + ASSERT_EQ(1, sub_simple.start(listener.bindSimple())); + + for (unsigned int i = 0; i < rx_frames.size(); i++) + { + can_driver.ifaces[0].pushRx(rx_frames[i]); + can_driver.ifaces[1].pushRx(rx_frames[i]); + } + + ASSERT_LE(0, sch.spin(clock_driver.getMonotonicMicroseconds() + 10000)); + + /* + * Validation + */ + for (unsigned int i = 0; i < rx_frames.size(); i++) + { + const Listener::ReceivedDataStructureCopy s = listener.extended.at(i); + ASSERT_TRUE(s.msg == expected_msg); + ASSERT_EQ(rx_frames[i].getSrcNodeID(), s.src_node_id); + ASSERT_EQ(rx_frames[i].getTransferID(), s.transfer_id); + ASSERT_EQ(rx_frames[i].getTransferType(), s.transfer_type); + ASSERT_EQ(rx_frames[i].getMonotonicTimestamp(), s.ts_monotonic); + } + + for (unsigned int i = 0; i < rx_frames.size(); i++) + { + ASSERT_TRUE(listener.simple.at(i) == expected_msg); + } + + ASSERT_EQ(0, sub_extended.getFailureCount()); + ASSERT_EQ(0, sub_simple.getFailureCount()); +}