Refactoring - Scheduler, GenericPublisher, GenericSubscruber moved into /uavcan/internal/node/*

This commit is contained in:
Pavel Kirienko 2014-03-10 13:53:10 +04:00
parent 6573d79fd1
commit 4b92497aee
10 changed files with 372 additions and 261 deletions

View File

@ -0,0 +1,134 @@
/*
* Copyright (C) 2014 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#pragma once
#include <uavcan/internal/node/scheduler.hpp>
#include <uavcan/data_type.hpp>
#include <uavcan/internal/node/marshal_buffer.hpp>
#include <uavcan/global_data_type_registry.hpp>
#include <uavcan/util/lazy_constructor.hpp>
#include <uavcan/internal/debug.hpp>
#include <uavcan/internal/transport/transfer_sender.hpp>
#include <uavcan/internal/marshal/scalar_codec.hpp>
#include <uavcan/internal/marshal/types.hpp>
namespace uavcan
{
/**
* Generic publisher, suitable for messages and services.
* DataSpec - data type specification class
* DataStruct - instantiable class
*/
template <typename DataSpec, typename DataStruct>
class GenericPublisher
{
public:
enum { DefaultTxTimeoutUsec = 2500 }; // 2500 ms --> 400Hz max
enum { MinTxTimeoutUsec = 200 };
private:
enum { Qos = (DataTypeKind(DataSpec::DataTypeKind) == DataTypeKindMessage) ?
CanTxQueue::Volatile : CanTxQueue::Persistent };
const uint64_t max_transfer_interval_; // TODO: memory usage can be reduced
uint64_t tx_timeout_;
Scheduler& scheduler_;
IMarshalBufferProvider& buffer_provider_;
LazyConstructor<TransferSender> sender_;
bool checkInit()
{
if (sender_)
return true;
GlobalDataTypeRegistry::instance().freeze();
const DataTypeDescriptor* const descr =
GlobalDataTypeRegistry::instance().find(DataTypeKind(DataSpec::DataTypeKind),
DataSpec::getDataTypeFullName());
if (!descr)
{
UAVCAN_TRACE("GenericPublisher", "Type [%s] is not registered", DataSpec::getDataTypeFullName());
return false;
}
sender_.template construct<Dispatcher&, const DataTypeDescriptor&, CanTxQueue::Qos, uint64_t>
(scheduler_.getDispatcher(), *descr, CanTxQueue::Qos(Qos), max_transfer_interval_);
return true;
}
uint64_t getTxDeadline() const { return scheduler_.getMonotonicTimestamp() + tx_timeout_; }
IMarshalBuffer* getBuffer()
{
return buffer_provider_.getBuffer(BitLenToByteLen<DataStruct::MaxBitLen>::Result);
}
int genericPublish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id,
TransferID* tid, uint64_t monotonic_blocking_deadline)
{
if (!checkInit())
return -1;
IMarshalBuffer* const buf = getBuffer();
if (!buf)
return -1;
{
BitStream bitstream(*buf);
ScalarCodec codec(bitstream);
const int encode_res = DataStruct::encode(message, codec);
if (encode_res <= 0)
{
assert(0); // Impossible, internal error
return -1;
}
}
if (tid)
{
return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(),
monotonic_blocking_deadline, transfer_type, dst_node_id, *tid);
}
else
{
return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(),
monotonic_blocking_deadline, transfer_type, dst_node_id);
}
}
protected:
GenericPublisher(Scheduler& scheduler, IMarshalBufferProvider& buffer_provider,
uint64_t max_transfer_interval = TransferSender::DefaultMaxTransferInterval)
: max_transfer_interval_(max_transfer_interval)
, tx_timeout_(DefaultTxTimeoutUsec)
, scheduler_(scheduler)
, buffer_provider_(buffer_provider)
{ }
~GenericPublisher() { }
int publish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id,
uint64_t monotonic_blocking_deadline = 0)
{
return genericPublish(message, transfer_type, dst_node_id, NULL, monotonic_blocking_deadline);
}
int publish(const DataStruct& message, TransferType transfer_type, NodeID dst_node_id, TransferID tid,
uint64_t monotonic_blocking_deadline = 0)
{
return genericPublish(message, transfer_type, dst_node_id, &tid, monotonic_blocking_deadline);
}
public:
uint64_t getTxTimeout() const { return tx_timeout_; }
void setTxTimeout(uint64_t usec)
{
tx_timeout_ = std::max(usec, uint64_t(MinTxTimeoutUsec));
}
Scheduler& getScheduler() const { return scheduler_; }
};
}

View File

@ -0,0 +1,205 @@
/*
* Copyright (C) 2014 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#pragma once
#include <uavcan/internal/node/scheduler.hpp>
#include <uavcan/data_type.hpp>
#include <uavcan/global_data_type_registry.hpp>
#include <uavcan/util/compile_time.hpp>
#include <uavcan/util/lazy_constructor.hpp>
#include <uavcan/internal/debug.hpp>
#include <uavcan/internal/transport/transfer_listener.hpp>
#include <uavcan/internal/marshal/scalar_codec.hpp>
#include <uavcan/internal/marshal/types.hpp>
namespace uavcan
{
template <typename DataType_>
class ReceivedDataStructure : public DataType_
{
const IncomingTransfer* transfer_;
template <typename Ret, Ret (IncomingTransfer::*Fun)() const>
Ret safeget() const
{
if (!transfer_)
{
assert(0);
return Ret();
}
return (transfer_->*Fun)();
}
protected:
ReceivedDataStructure() : transfer_(NULL) { }
void setTransfer(const IncomingTransfer* transfer)
{
assert(transfer);
transfer_ = transfer;
}
public:
typedef DataType_ DataType;
uint64_t getMonotonicTimestamp() const { return safeget<uint64_t, &IncomingTransfer::getMonotonicTimestamp>(); }
uint64_t getUtcTimestamp() const { return safeget<uint64_t, &IncomingTransfer::getUtcTimestamp>(); }
TransferType getTransferType() const { return safeget<TransferType, &IncomingTransfer::getTransferType>(); }
TransferID getTransferID() const { return safeget<TransferID, &IncomingTransfer::getTransferID>(); }
NodeID getSrcNodeID() const { return safeget<NodeID, &IncomingTransfer::getSrcNodeID>(); }
};
template <typename DataSpec, typename DataStruct, unsigned int NumStaticReceivers, unsigned int NumStaticBufs_>
class GenericSubscriber : Noncopyable
{
typedef GenericSubscriber<DataSpec, DataStruct, NumStaticReceivers, NumStaticBufs_> SelfType;
enum { DataTypeMaxByteLen = BitLenToByteLen<DataStruct::MaxBitLen>::Result };
enum { NeedsBuffer = int(DataTypeMaxByteLen) > int(MaxSingleFrameTransferPayloadLen) };
enum { BufferSize = NeedsBuffer ? DataTypeMaxByteLen : 0 };
enum { NumStaticBufs = NeedsBuffer ? (NumStaticBufs_ ? NumStaticBufs_ : 1) : 0 };
typedef TransferListener<BufferSize, NumStaticBufs, // TODO: support for zero static bufs
NumStaticReceivers ? NumStaticReceivers : 1> TransferListenerType;
// We need to break the inheritance chain here to implement lazy initialization
class TransferForwarder : public TransferListenerType
{
SelfType& obj_;
void handleIncomingTransfer(IncomingTransfer& transfer)
{
obj_.handleIncomingTransfer(transfer);
}
public:
TransferForwarder(SelfType& obj, const DataTypeDescriptor& data_type, IAllocator& allocator)
: TransferListenerType(data_type, allocator)
, obj_(obj)
{ }
};
struct ReceivedDataStructureSpec : public ReceivedDataStructure<DataStruct>
{
using ReceivedDataStructure<DataStruct>::setTransfer;
};
Scheduler& scheduler_;
IAllocator& allocator_;
LazyConstructor<TransferForwarder> forwarder_;
ReceivedDataStructureSpec message_;
uint32_t failure_count_;
bool checkInit()
{
if (forwarder_)
return true;
GlobalDataTypeRegistry::instance().freeze();
const DataTypeDescriptor* const descr =
GlobalDataTypeRegistry::instance().find(DataTypeKind(DataSpec::DataTypeKind),
DataSpec::getDataTypeFullName());
if (!descr)
{
UAVCAN_TRACE("GenericSubscriber", "Type [%s] is not registered", DataSpec::getDataTypeFullName());
return false;
}
forwarder_.template construct<SelfType&, const DataTypeDescriptor&, IAllocator&>(*this, *descr, allocator_);
return true;
}
bool decodeTransfer(IncomingTransfer& transfer)
{
BitStream bitstream(transfer);
ScalarCodec codec(bitstream);
message_.setTransfer(&transfer);
const int decode_res = DataSpec::decode(message_, codec);
// We don't need the data anymore, the memory can be reused from the callback:
transfer.release();
if (decode_res <= 0)
{
UAVCAN_TRACE("GenericSubscriber", "Unable to decode the message [%i] [%s]",
decode_res, DataSpec::getDataTypeFullName());
failure_count_++;
return false;
}
return true;
}
void handleIncomingTransfer(IncomingTransfer& transfer)
{
if (decodeTransfer(transfer))
{
handleReceivedDataStruct(message_);
}
}
int genericStart(bool(Dispatcher::*registration_method)(TransferListenerBase* listener))
{
stop();
if (!checkInit())
{
UAVCAN_TRACE("GenericSubscriber", "Initialization failure [%s]", DataSpec::getDataTypeFullName());
return -1;
}
if (!(scheduler_.getDispatcher().*registration_method)(forwarder_))
{
UAVCAN_TRACE("GenericSubscriber", "Failed to register transfer listener [%s]",
DataSpec::getDataTypeFullName());
return -1;
}
return 1;
}
protected:
GenericSubscriber(Scheduler& scheduler, IAllocator& allocator)
: scheduler_(scheduler)
, allocator_(allocator)
, failure_count_(0)
{ }
virtual ~GenericSubscriber() { stop(); }
virtual void handleReceivedDataStruct(ReceivedDataStructure<DataStruct>&) = 0;
int startAsMessageListener()
{
return genericStart(&Dispatcher::registerMessageListener);
}
int startAsServiceRequestListener()
{
return genericStart(&Dispatcher::registerServiceRequestListener);
}
int startAsServiceResponseListener()
{
return genericStart(&Dispatcher::registerServiceResponseListener);
}
void stop()
{
if (forwarder_)
{
scheduler_.getDispatcher().unregisterMessageListener(forwarder_);
scheduler_.getDispatcher().unregisterServiceRequestListener(forwarder_);
scheduler_.getDispatcher().unregisterServiceResponseListener(forwarder_);
}
}
public:
Scheduler& getScheduler() const { return scheduler_; }
uint32_t getFailureCount() const { return failure_count_; }
};
}

View File

@ -4,101 +4,31 @@
#pragma once
#include <uavcan/scheduler.hpp>
#include <uavcan/data_type.hpp>
#include <uavcan/marshal_buffer.hpp>
#include <uavcan/global_data_type_registry.hpp>
#include <uavcan/util/lazy_constructor.hpp>
#include <uavcan/internal/debug.hpp>
#include <uavcan/internal/transport/transfer_sender.hpp>
#include <uavcan/internal/marshal/scalar_codec.hpp>
#include <uavcan/internal/marshal/types.hpp>
#include <uavcan/internal/node/generic_publisher.hpp>
namespace uavcan
{
template <typename DataType_>
class Publisher
class Publisher : public GenericPublisher<DataType_, DataType_>
{
typedef GenericPublisher<DataType_, DataType_> BaseType;
public:
typedef DataType_ DataType;
enum { DefaultTxTimeoutUsec = 2500 }; // 2500 ms --> 400Hz max
enum { MinTxTimeoutUsec = 200 };
private:
const uint64_t max_transfer_interval_; // TODO: memory usage can be reduced
uint64_t tx_timeout_;
Scheduler& scheduler_;
IMarshalBufferProvider& buffer_provider_;
LazyConstructor<TransferSender> sender_;
bool checkInit()
{
if (sender_)
return true;
GlobalDataTypeRegistry::instance().freeze();
const DataTypeDescriptor* const descr =
GlobalDataTypeRegistry::instance().find(DataTypeKindMessage, DataType::getDataTypeFullName());
if (!descr)
{
UAVCAN_TRACE("Publisher", "Type [%s] is not registered", DataType::getDataTypeFullName());
return false;
}
sender_.template construct<Dispatcher&, const DataTypeDescriptor&, CanTxQueue::Qos, uint64_t>
(scheduler_.getDispatcher(), *descr, CanTxQueue::Volatile, max_transfer_interval_);
return true;
}
uint64_t getTxDeadline() const { return scheduler_.getMonotonicTimestamp() + tx_timeout_; }
IMarshalBuffer* getBuffer()
{
return buffer_provider_.getBuffer(BitLenToByteLen<DataType::MaxBitLen>::Result);
}
int genericSend(const DataType& message, TransferType transfer_type, NodeID dst_node_id,
uint64_t monotonic_blocking_deadline = 0)
{
if (!checkInit())
return -1;
IMarshalBuffer* const buf = getBuffer();
if (!buf)
return -1;
{
BitStream bitstream(*buf);
ScalarCodec codec(bitstream);
const int encode_res = DataType::encode(message, codec);
if (encode_res <= 0)
{
assert(0); // Impossible, internal error
return -1;
}
}
return sender_->send(buf->getDataPtr(), buf->getDataLength(), getTxDeadline(),
monotonic_blocking_deadline, transfer_type, dst_node_id);
}
public:
Publisher(Scheduler& scheduler, IMarshalBufferProvider& buffer_provider,
uint64_t tx_timeout_usec = DefaultTxTimeoutUsec,
uint64_t tx_timeout_usec = BaseType::DefaultTxTimeoutUsec,
uint64_t max_transfer_interval = TransferSender::DefaultMaxTransferInterval)
: max_transfer_interval_(max_transfer_interval)
, tx_timeout_(tx_timeout_usec)
, scheduler_(scheduler)
, buffer_provider_(buffer_provider)
: BaseType(scheduler, buffer_provider, max_transfer_interval)
{
setTxTimeout(tx_timeout_usec);
BaseType::setTxTimeout(tx_timeout_usec);
StaticAssert<DataTypeKind(DataType::DataTypeKind) == DataTypeKindMessage>::check();
}
int broadcast(const DataType& message)
{
return genericSend(message, TransferTypeMessageBroadcast, NodeID::Broadcast);
return publish(message, TransferTypeMessageBroadcast, NodeID::Broadcast);
}
int unicast(const DataType& message, NodeID dst_node_id)
@ -108,16 +38,8 @@ public:
assert(0);
return -1;
}
return genericSend(message, TransferTypeMessageUnicast, dst_node_id);
return publish(message, TransferTypeMessageUnicast, dst_node_id);
}
uint64_t getTxTimeout() const { return tx_timeout_; }
void setTxTimeout(uint64_t usec)
{
tx_timeout_ = std::max(usec, uint64_t(MinTxTimeoutUsec));
}
Scheduler& getScheduler() const { return scheduler_; }
};
}

View File

@ -1,48 +0,0 @@
/*
* Copyright (C) 2014 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#pragma once
#include <cassert>
#include <uavcan/internal/transport/transfer_listener.hpp>
namespace uavcan
{
template <typename DataType_>
class ReceivedDataStructure : public DataType_
{
const IncomingTransfer* transfer_;
template <typename Ret, Ret (IncomingTransfer::*Fun)() const>
Ret safeget() const
{
if (!transfer_)
{
assert(0);
return Ret();
}
return (transfer_->*Fun)();
}
protected:
ReceivedDataStructure() : transfer_(NULL) { }
void setTransfer(const IncomingTransfer* transfer)
{
assert(transfer);
transfer_ = transfer;
}
public:
typedef DataType_ DataType;
uint64_t getMonotonicTimestamp() const { return safeget<uint64_t, &IncomingTransfer::getMonotonicTimestamp>(); }
uint64_t getUtcTimestamp() const { return safeget<uint64_t, &IncomingTransfer::getUtcTimestamp>(); }
TransferType getTransferType() const { return safeget<TransferType, &IncomingTransfer::getTransferType>(); }
TransferID getTransferID() const { return safeget<TransferID, &IncomingTransfer::getTransferID>(); }
NodeID getSrcNodeID() const { return safeget<NodeID, &IncomingTransfer::getSrcNodeID>(); }
};
}

View File

@ -4,16 +4,8 @@
#pragma once
#include <uavcan/scheduler.hpp>
#include <uavcan/data_type.hpp>
#include <uavcan/global_data_type_registry.hpp>
#include <uavcan/received_data_structure.hpp>
#include <uavcan/util/compile_time.hpp>
#include <uavcan/util/lazy_constructor.hpp>
#include <uavcan/internal/debug.hpp>
#include <uavcan/internal/transport/transfer_listener.hpp>
#include <uavcan/internal/marshal/scalar_codec.hpp>
#include <uavcan/internal/marshal/types.hpp>
#include <cassert>
#include <uavcan/internal/node/generic_subscriber.hpp>
namespace uavcan
{
@ -21,110 +13,31 @@ namespace uavcan
template <typename DataType_,
typename Callback = void(*)(const ReceivedDataStructure<DataType_>&),
unsigned int NumStaticReceivers = 2,
unsigned int NumStaticBufs_ = 1>
class Subscriber : Noncopyable
unsigned int NumStaticBufs = 1>
class Subscriber : public GenericSubscriber<DataType_, DataType_, NumStaticReceivers, NumStaticBufs>
{
typedef Subscriber<DataType_, Callback, NumStaticReceivers, NumStaticBufs_> SelfType;
typedef GenericSubscriber<DataType_, DataType_, NumStaticReceivers, NumStaticBufs> BaseType;
Callback callback_;
void handleReceivedDataStruct(ReceivedDataStructure<DataType_>& msg)
{
if (try_implicit_cast<bool>(callback_, true))
callback_(msg);
else
handleFatalError("Invalid subscriber callback");
}
public:
typedef DataType_ DataType;
private:
enum { DataTypeMaxByteLen = BitLenToByteLen<DataType::MaxBitLen>::Result };
enum { NeedsBuffer = int(DataTypeMaxByteLen) > int(MaxSingleFrameTransferPayloadLen) };
enum { BufferSize = NeedsBuffer ? DataTypeMaxByteLen : 0 };
enum { NumStaticBufs = NeedsBuffer ? (NumStaticBufs_ ? NumStaticBufs_ : 1) : 0 };
typedef TransferListener<BufferSize, NumStaticBufs, // TODO: support for zero static bufs
NumStaticReceivers ? NumStaticReceivers : 1> TransferListenerType;
// We need to break the inheritance chain here to implement lazy initialization
class TransferForwarder : public TransferListenerType
{
SelfType& obj_;
void handleIncomingTransfer(IncomingTransfer& transfer)
{
obj_.handleIncomingTransfer(transfer);
}
public:
TransferForwarder(SelfType& obj, const DataTypeDescriptor& data_type, IAllocator& allocator)
: TransferListenerType(data_type, allocator)
, obj_(obj)
{ }
};
struct ReceivedDataStructureSpec : public ReceivedDataStructure<DataType>
{
using ReceivedDataStructure<DataType>::setTransfer;
};
Scheduler& scheduler_;
IAllocator& allocator_;
Callback callback_;
LazyConstructor<TransferForwarder> forwarder_;
ReceivedDataStructureSpec message_;
uint32_t failure_count_;
bool checkInit()
{
if (forwarder_)
return true;
GlobalDataTypeRegistry::instance().freeze();
const DataTypeDescriptor* const descr =
GlobalDataTypeRegistry::instance().find(DataTypeKindMessage, DataType::getDataTypeFullName());
if (!descr)
{
UAVCAN_TRACE("Subscriber", "Type [%s] is not registered", DataType::getDataTypeFullName());
return false;
}
forwarder_.template construct<SelfType&, const DataTypeDescriptor&, IAllocator&>(*this, *descr, allocator_);
return true;
}
void handleIncomingTransfer(IncomingTransfer& transfer)
{
assert(transfer.getTransferType() == TransferTypeMessageBroadcast ||
transfer.getTransferType() == TransferTypeMessageUnicast);
{
BitStream bitstream(transfer);
ScalarCodec codec(bitstream);
const int decode_res = DataType::decode(message_, codec);
// We don't need the data anymore, the memory can be reused from the callback:
transfer.release();
if (decode_res <= 0)
{
UAVCAN_TRACE("Subscriber", "Unable to decode the message [%i] [%s]",
decode_res, DataType::getDataTypeFullName());
failure_count_++;
return;
}
}
message_.setTransfer(&transfer);
if (try_implicit_cast<bool>(callback_, true))
callback_(message_); // Callback can accept non-const message reference and mutate it, that's OK
else
assert(0);
}
public:
Subscriber(Scheduler& scheduler, IAllocator& allocator)
: scheduler_(scheduler)
, allocator_(allocator)
: BaseType(scheduler, allocator)
, callback_()
, failure_count_(0)
{
StaticAssert<DataTypeKind(DataType::DataTypeKind) == DataTypeKindMessage>::check();
}
virtual ~Subscriber() { stop(); }
int start(Callback callback)
{
stop();
@ -136,29 +49,10 @@ public:
}
callback_ = callback;
if (!checkInit())
{
UAVCAN_TRACE("Subscriber", "Initialization failure [%s]", DataType::getDataTypeFullName());
return -1;
}
if (!scheduler_.getDispatcher().registerMessageListener(forwarder_))
{
UAVCAN_TRACE("Subscriber", "Failed to register message listener [%s]", DataType::getDataTypeFullName());
return -1;
}
return 1;
return BaseType::startAsMessageListener();
}
void stop()
{
if (forwarder_)
scheduler_.getDispatcher().unregisterMessageListener(forwarder_);
}
Scheduler& getScheduler() const { return scheduler_; }
uint32_t getFailureCount() const { return failure_count_; }
using BaseType::stop;
};
}

View File

@ -5,7 +5,7 @@
#pragma once
#include <stdint.h>
#include <uavcan/scheduler.hpp>
#include <uavcan/internal/node/scheduler.hpp>
#include <uavcan/util/compile_time.hpp>
#include <uavcan/internal/linked_list.hpp>
#include <uavcan/internal/fatal_error.hpp>

View File

@ -4,7 +4,7 @@
#include <cassert>
#include <limits>
#include <uavcan/scheduler.hpp>
#include <uavcan/internal/node/scheduler.hpp>
#include <uavcan/internal/debug.hpp>
namespace uavcan

View File

@ -26,6 +26,10 @@ TEST(Publisher, Basic)
uavcan::Publisher<uavcan::mavlink::Message> publisher(sch, buffer_provider);
std::cout <<
"sizeof(uavcan::Publisher<uavcan::mavlink::Message>): " <<
sizeof(uavcan::Publisher<uavcan::mavlink::Message>) << std::endl;
// Manual type registration - we can't rely on the GDTR state
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<uavcan::mavlink::Message> _registrator;