diff --git a/libuavcan/include/uavcan/internal/node/generic_publisher.hpp b/libuavcan/include/uavcan/internal/node/generic_publisher.hpp new file mode 100644 index 0000000000..11c872babd --- /dev/null +++ b/libuavcan/include/uavcan/internal/node/generic_publisher.hpp @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2014 Pavel Kirienko + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uavcan +{ + +/** + * Generic publisher, suitable for messages and services. + * DataSpec - data type specification class + * DataStruct - instantiable class + */ +template +class GenericPublisher +{ +public: + enum { DefaultTxTimeoutUsec = 2500 }; // 2500 ms --> 400Hz max + enum { MinTxTimeoutUsec = 200 }; + +private: + enum { Qos = (DataTypeKind(DataSpec::DataTypeKind) == DataTypeKindMessage) ? + CanTxQueue::Volatile : CanTxQueue::Persistent }; + + const uint64_t max_transfer_interval_; // TODO: memory usage can be reduced + uint64_t tx_timeout_; + Scheduler& scheduler_; + IMarshalBufferProvider& buffer_provider_; + LazyConstructor sender_; + + bool checkInit() + { + if (sender_) + return true; + + GlobalDataTypeRegistry::instance().freeze(); + + const DataTypeDescriptor* const descr = + GlobalDataTypeRegistry::instance().find(DataTypeKind(DataSpec::DataTypeKind), + DataSpec::getDataTypeFullName()); + if (!descr) + { + UAVCAN_TRACE("GenericPublisher", "Type [%s] is not registered", DataSpec::getDataTypeFullName()); + return false; + } + sender_.template construct + (scheduler_.getDispatcher(), *descr, CanTxQueue::Qos(Qos), max_transfer_interval_); + return true; + } + + uint64_t getTxDeadline() const { return scheduler_.getMonotonicTimestamp() + tx_timeout_; } + + IMarshalBuffer* getBuffer() + { + return buffer_provider_.getBuffer(BitLenToByteLen::Result); + } + + int genericPublish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id, + TransferID* tid, uint64_t monotonic_blocking_deadline) + { + if (!checkInit()) + return -1; + + IMarshalBuffer* const buf = getBuffer(); + if (!buf) + return -1; + + { + BitStream bitstream(*buf); + ScalarCodec codec(bitstream); + const int encode_res = DataStruct::encode(message, codec); + if (encode_res <= 0) + { + assert(0); // Impossible, internal error + return -1; + } + } + if (tid) + { + return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(), + monotonic_blocking_deadline, transfer_type, dst_node_id, *tid); + } + else + { + return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(), + monotonic_blocking_deadline, transfer_type, dst_node_id); + } + } + +protected: + GenericPublisher(Scheduler& scheduler, IMarshalBufferProvider& buffer_provider, + uint64_t max_transfer_interval = TransferSender::DefaultMaxTransferInterval) + : max_transfer_interval_(max_transfer_interval) + , tx_timeout_(DefaultTxTimeoutUsec) + , scheduler_(scheduler) + , buffer_provider_(buffer_provider) + { } + + ~GenericPublisher() { } + + int publish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id, + uint64_t monotonic_blocking_deadline = 0) + { + return genericPublish(message, transfer_type, dst_node_id, NULL, monotonic_blocking_deadline); + } + + int publish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id, TransferID tid, + uint64_t monotonic_blocking_deadline = 0) + { + return genericPublish(message, transfer_type, dst_node_id, &tid, monotonic_blocking_deadline); + } + +public: + uint64_t getTxTimeout() const { return tx_timeout_; } + void setTxTimeout(uint64_t usec) + { + tx_timeout_ = std::max(usec, uint64_t(MinTxTimeoutUsec)); + } + + Scheduler& getScheduler() const { return scheduler_; } +}; + +} diff --git a/libuavcan/include/uavcan/internal/node/generic_subscriber.hpp b/libuavcan/include/uavcan/internal/node/generic_subscriber.hpp new file mode 100644 index 0000000000..52e44eb796 --- /dev/null +++ b/libuavcan/include/uavcan/internal/node/generic_subscriber.hpp @@ -0,0 +1,205 @@ +/* + * Copyright (C) 2014 Pavel Kirienko + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uavcan +{ + +template +class ReceivedDataStructure : public DataType_ +{ + const IncomingTransfer* transfer_; + + template + Ret safeget() const + { + if (!transfer_) + { + assert(0); + return Ret(); + } + return (transfer_->*Fun)(); + } + +protected: + ReceivedDataStructure() : transfer_(NULL) { } + + void setTransfer(const IncomingTransfer* transfer) + { + assert(transfer); + transfer_ = transfer; + } + +public: + typedef DataType_ DataType; + + uint64_t getMonotonicTimestamp() const { return safeget(); } + uint64_t getUtcTimestamp() const { return safeget(); } + TransferType getTransferType() const { return safeget(); } + TransferID getTransferID() const { return safeget(); } + NodeID getSrcNodeID() const { return safeget(); } +}; + + +template +class GenericSubscriber : Noncopyable +{ + typedef GenericSubscriber SelfType; + + enum { DataTypeMaxByteLen = BitLenToByteLen::Result }; + enum { NeedsBuffer = int(DataTypeMaxByteLen) > int(MaxSingleFrameTransferPayloadLen) }; + enum { BufferSize = NeedsBuffer ? DataTypeMaxByteLen : 0 }; + enum { NumStaticBufs = NeedsBuffer ? (NumStaticBufs_ ? NumStaticBufs_ : 1) : 0 }; + + typedef TransferListener TransferListenerType; + + // We need to break the inheritance chain here to implement lazy initialization + class TransferForwarder : public TransferListenerType + { + SelfType& obj_; + + void handleIncomingTransfer(IncomingTransfer& transfer) + { + obj_.handleIncomingTransfer(transfer); + } + + public: + TransferForwarder(SelfType& obj, const DataTypeDescriptor& data_type, IAllocator& allocator) + : TransferListenerType(data_type, allocator) + , obj_(obj) + { } + }; + + struct ReceivedDataStructureSpec : public ReceivedDataStructure + { + using ReceivedDataStructure::setTransfer; + }; + + Scheduler& scheduler_; + IAllocator& allocator_; + LazyConstructor forwarder_; + ReceivedDataStructureSpec message_; + uint32_t failure_count_; + + bool checkInit() + { + if (forwarder_) + return true; + + GlobalDataTypeRegistry::instance().freeze(); + + const DataTypeDescriptor* const descr = + GlobalDataTypeRegistry::instance().find(DataTypeKind(DataSpec::DataTypeKind), + DataSpec::getDataTypeFullName()); + if (!descr) + { + UAVCAN_TRACE("GenericSubscriber", "Type [%s] is not registered", DataSpec::getDataTypeFullName()); + return false; + } + forwarder_.template construct(*this, *descr, allocator_); + return true; + } + + bool decodeTransfer(IncomingTransfer& transfer) + { + BitStream bitstream(transfer); + ScalarCodec codec(bitstream); + + message_.setTransfer(&transfer); + + const int decode_res = DataSpec::decode(message_, codec); + // We don't need the data anymore, the memory can be reused from the callback: + transfer.release(); + if (decode_res <= 0) + { + UAVCAN_TRACE("GenericSubscriber", "Unable to decode the message [%i] [%s]", + decode_res, DataSpec::getDataTypeFullName()); + failure_count_++; + return false; + } + return true; + } + + void handleIncomingTransfer(IncomingTransfer& transfer) + { + if (decodeTransfer(transfer)) + { + handleReceivedDataStruct(message_); + } + } + + int genericStart(bool(Dispatcher::*registration_method)(TransferListenerBase* listener)) + { + stop(); + + if (!checkInit()) + { + UAVCAN_TRACE("GenericSubscriber", "Initialization failure [%s]", DataSpec::getDataTypeFullName()); + return -1; + } + + if (!(scheduler_.getDispatcher().*registration_method)(forwarder_)) + { + UAVCAN_TRACE("GenericSubscriber", "Failed to register transfer listener [%s]", + DataSpec::getDataTypeFullName()); + return -1; + } + return 1; + } + +protected: + GenericSubscriber(Scheduler& scheduler, IAllocator& allocator) + : scheduler_(scheduler) + , allocator_(allocator) + , failure_count_(0) + { } + + virtual ~GenericSubscriber() { stop(); } + + virtual void handleReceivedDataStruct(ReceivedDataStructure&) = 0; + + int startAsMessageListener() + { + return genericStart(&Dispatcher::registerMessageListener); + } + + int startAsServiceRequestListener() + { + return genericStart(&Dispatcher::registerServiceRequestListener); + } + + int startAsServiceResponseListener() + { + return genericStart(&Dispatcher::registerServiceResponseListener); + } + + void stop() + { + if (forwarder_) + { + scheduler_.getDispatcher().unregisterMessageListener(forwarder_); + scheduler_.getDispatcher().unregisterServiceRequestListener(forwarder_); + scheduler_.getDispatcher().unregisterServiceResponseListener(forwarder_); + } + } + +public: + Scheduler& getScheduler() const { return scheduler_; } + + uint32_t getFailureCount() const { return failure_count_; } +}; + +} diff --git a/libuavcan/include/uavcan/marshal_buffer.hpp b/libuavcan/include/uavcan/internal/node/marshal_buffer.hpp similarity index 100% rename from libuavcan/include/uavcan/marshal_buffer.hpp rename to libuavcan/include/uavcan/internal/node/marshal_buffer.hpp diff --git a/libuavcan/include/uavcan/scheduler.hpp b/libuavcan/include/uavcan/internal/node/scheduler.hpp similarity index 100% rename from libuavcan/include/uavcan/scheduler.hpp rename to libuavcan/include/uavcan/internal/node/scheduler.hpp diff --git a/libuavcan/include/uavcan/publisher.hpp b/libuavcan/include/uavcan/publisher.hpp index 302890e754..3e1c6c8f46 100644 --- a/libuavcan/include/uavcan/publisher.hpp +++ b/libuavcan/include/uavcan/publisher.hpp @@ -4,101 +4,31 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include namespace uavcan { template -class Publisher +class Publisher : public GenericPublisher { + typedef GenericPublisher BaseType; + public: typedef DataType_ DataType; - enum { DefaultTxTimeoutUsec = 2500 }; // 2500 ms --> 400Hz max - enum { MinTxTimeoutUsec = 200 }; - -private: - const uint64_t max_transfer_interval_; // TODO: memory usage can be reduced - uint64_t tx_timeout_; - Scheduler& scheduler_; - IMarshalBufferProvider& buffer_provider_; - LazyConstructor sender_; - - bool checkInit() - { - if (sender_) - return true; - - GlobalDataTypeRegistry::instance().freeze(); - - const DataTypeDescriptor* const descr = - GlobalDataTypeRegistry::instance().find(DataTypeKindMessage, DataType::getDataTypeFullName()); - if (!descr) - { - UAVCAN_TRACE("Publisher", "Type [%s] is not registered", DataType::getDataTypeFullName()); - return false; - } - sender_.template construct - (scheduler_.getDispatcher(), *descr, CanTxQueue::Volatile, max_transfer_interval_); - return true; - } - - uint64_t getTxDeadline() const { return scheduler_.getMonotonicTimestamp() + tx_timeout_; } - - IMarshalBuffer* getBuffer() - { - return buffer_provider_.getBuffer(BitLenToByteLen::Result); - } - - int genericSend(const DataType& message, TransferType transfer_type, NodeID dst_node_id, - uint64_t monotonic_blocking_deadline = 0) - { - if (!checkInit()) - return -1; - - IMarshalBuffer* const buf = getBuffer(); - if (!buf) - return -1; - - { - BitStream bitstream(*buf); - ScalarCodec codec(bitstream); - const int encode_res = DataType::encode(message, codec); - if (encode_res <= 0) - { - assert(0); // Impossible, internal error - return -1; - } - } - return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(), - monotonic_blocking_deadline, transfer_type, dst_node_id); - } - -public: Publisher(Scheduler& scheduler, IMarshalBufferProvider& buffer_provider, - uint64_t tx_timeout_usec = DefaultTxTimeoutUsec, + uint64_t tx_timeout_usec = BaseType::DefaultTxTimeoutUsec, uint64_t max_transfer_interval = TransferSender::DefaultMaxTransferInterval) - : max_transfer_interval_(max_transfer_interval) - , tx_timeout_(tx_timeout_usec) - , scheduler_(scheduler) - , buffer_provider_(buffer_provider) + : BaseType(scheduler, buffer_provider, max_transfer_interval) { - setTxTimeout(tx_timeout_usec); + BaseType::setTxTimeout(tx_timeout_usec); StaticAssert::check(); } int broadcast(const DataType& message) { - return genericSend(message, TransferTypeMessageBroadcast, NodeID::Broadcast); + return publish(message, TransferTypeMessageBroadcast, NodeID::Broadcast); } int unicast(const DataType& message, NodeID dst_node_id) @@ -108,16 +38,8 @@ public: assert(0); return -1; } - return genericSend(message, TransferTypeMessageUnicast, dst_node_id); + return publish(message, TransferTypeMessageUnicast, dst_node_id); } - - uint64_t getTxTimeout() const { return tx_timeout_; } - void setTxTimeout(uint64_t usec) - { - tx_timeout_ = std::max(usec, uint64_t(MinTxTimeoutUsec)); - } - - Scheduler& getScheduler() const { return scheduler_; } }; } diff --git a/libuavcan/include/uavcan/received_data_structure.hpp b/libuavcan/include/uavcan/received_data_structure.hpp deleted file mode 100644 index 2c021951d0..0000000000 --- a/libuavcan/include/uavcan/received_data_structure.hpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2014 Pavel Kirienko - */ - -#pragma once - -#include -#include - -namespace uavcan -{ - -template -class ReceivedDataStructure : public DataType_ -{ - const IncomingTransfer* transfer_; - - template - Ret safeget() const - { - if (!transfer_) - { - assert(0); - return Ret(); - } - return (transfer_->*Fun)(); - } - -protected: - ReceivedDataStructure() : transfer_(NULL) { } - - void setTransfer(const IncomingTransfer* transfer) - { - assert(transfer); - transfer_ = transfer; - } - -public: - typedef DataType_ DataType; - - uint64_t getMonotonicTimestamp() const { return safeget(); } - uint64_t getUtcTimestamp() const { return safeget(); } - TransferType getTransferType() const { return safeget(); } - TransferID getTransferID() const { return safeget(); } - NodeID getSrcNodeID() const { return safeget(); } -}; - -} diff --git a/libuavcan/include/uavcan/subscriber.hpp b/libuavcan/include/uavcan/subscriber.hpp index 22a56b36af..fd2581ea2e 100644 --- a/libuavcan/include/uavcan/subscriber.hpp +++ b/libuavcan/include/uavcan/subscriber.hpp @@ -4,16 +4,8 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include namespace uavcan { @@ -21,110 +13,31 @@ namespace uavcan template &), unsigned int NumStaticReceivers = 2, - unsigned int NumStaticBufs_ = 1> -class Subscriber : Noncopyable + unsigned int NumStaticBufs = 1> +class Subscriber : public GenericSubscriber { - typedef Subscriber SelfType; + typedef GenericSubscriber BaseType; + + Callback callback_; + + void handleReceivedDataStruct(ReceivedDataStructure& msg) + { + if (try_implicit_cast(callback_, true)) + callback_(msg); + else + handleFatalError("Invalid subscriber callback"); + } public: typedef DataType_ DataType; -private: - enum { DataTypeMaxByteLen = BitLenToByteLen::Result }; - enum { NeedsBuffer = int(DataTypeMaxByteLen) > int(MaxSingleFrameTransferPayloadLen) }; - enum { BufferSize = NeedsBuffer ? DataTypeMaxByteLen : 0 }; - enum { NumStaticBufs = NeedsBuffer ? (NumStaticBufs_ ? NumStaticBufs_ : 1) : 0 }; - - typedef TransferListener TransferListenerType; - - // We need to break the inheritance chain here to implement lazy initialization - class TransferForwarder : public TransferListenerType - { - SelfType& obj_; - - void handleIncomingTransfer(IncomingTransfer& transfer) - { - obj_.handleIncomingTransfer(transfer); - } - - public: - TransferForwarder(SelfType& obj, const DataTypeDescriptor& data_type, IAllocator& allocator) - : TransferListenerType(data_type, allocator) - , obj_(obj) - { } - }; - - struct ReceivedDataStructureSpec : public ReceivedDataStructure - { - using ReceivedDataStructure::setTransfer; - }; - - Scheduler& scheduler_; - IAllocator& allocator_; - Callback callback_; - LazyConstructor forwarder_; - ReceivedDataStructureSpec message_; - uint32_t failure_count_; - - bool checkInit() - { - if (forwarder_) - return true; - - GlobalDataTypeRegistry::instance().freeze(); - - const DataTypeDescriptor* const descr = - GlobalDataTypeRegistry::instance().find(DataTypeKindMessage, DataType::getDataTypeFullName()); - if (!descr) - { - UAVCAN_TRACE("Subscriber", "Type [%s] is not registered", DataType::getDataTypeFullName()); - return false; - } - forwarder_.template construct(*this, *descr, allocator_); - return true; - } - - void handleIncomingTransfer(IncomingTransfer& transfer) - { - assert(transfer.getTransferType() == TransferTypeMessageBroadcast || - transfer.getTransferType() == TransferTypeMessageUnicast); - - { - BitStream bitstream(transfer); - ScalarCodec codec(bitstream); - - const int decode_res = DataType::decode(message_, codec); - // We don't need the data anymore, the memory can be reused from the callback: - transfer.release(); - if (decode_res <= 0) - { - UAVCAN_TRACE("Subscriber", "Unable to decode the message [%i] [%s]", - decode_res, DataType::getDataTypeFullName()); - failure_count_++; - return; - } - } - - message_.setTransfer(&transfer); - if (try_implicit_cast(callback_, true)) - callback_(message_); // Callback can accept non-const message reference and mutate it, that's OK - else - assert(0); - } - -public: Subscriber(Scheduler& scheduler, IAllocator& allocator) - : scheduler_(scheduler) - , allocator_(allocator) + : BaseType(scheduler, allocator) , callback_() - , failure_count_(0) { StaticAssert::check(); } - virtual ~Subscriber() { stop(); } - int start(Callback callback) { stop(); @@ -136,29 +49,10 @@ public: } callback_ = callback; - if (!checkInit()) - { - UAVCAN_TRACE("Subscriber", "Initialization failure [%s]", DataType::getDataTypeFullName()); - return -1; - } - - if (!scheduler_.getDispatcher().registerMessageListener(forwarder_)) - { - UAVCAN_TRACE("Subscriber", "Failed to register message listener [%s]", DataType::getDataTypeFullName()); - return -1; - } - return 1; + return BaseType::startAsMessageListener(); } - void stop() - { - if (forwarder_) - scheduler_.getDispatcher().unregisterMessageListener(forwarder_); - } - - Scheduler& getScheduler() const { return scheduler_; } - - uint32_t getFailureCount() const { return failure_count_; } + using BaseType::stop; }; } diff --git a/libuavcan/include/uavcan/timer.hpp b/libuavcan/include/uavcan/timer.hpp index fdd2b13379..df9a78deeb 100644 --- a/libuavcan/include/uavcan/timer.hpp +++ b/libuavcan/include/uavcan/timer.hpp @@ -5,7 +5,7 @@ #pragma once #include -#include +#include #include #include #include diff --git a/libuavcan/src/scheduler.cpp b/libuavcan/src/scheduler.cpp index d59f7ae77d..b98b13b90e 100644 --- a/libuavcan/src/scheduler.cpp +++ b/libuavcan/src/scheduler.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include #include namespace uavcan diff --git a/libuavcan/test/publisher.cpp b/libuavcan/test/publisher.cpp index a76d70bf8d..17f193f52d 100644 --- a/libuavcan/test/publisher.cpp +++ b/libuavcan/test/publisher.cpp @@ -26,6 +26,10 @@ TEST(Publisher, Basic) uavcan::Publisher publisher(sch, buffer_provider); + std::cout << + "sizeof(uavcan::Publisher): " << + sizeof(uavcan::Publisher) << std::endl; + // Manual type registration - we can't rely on the GDTR state uavcan::GlobalDataTypeRegistry::instance().reset(); uavcan::DefaultDataTypeRegistrator _registrator;