From 261dd546cdbf3b0cc2ccdefd9f7a44f8c91a659a Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Thu, 6 Feb 2014 00:08:51 +0400 Subject: [PATCH] Implemented TransferBufferManager --- .../internal/transport/transfer_buffer.hpp | 226 ++++++++++++++++++ libuavcan/src/transport/transfer_buffer.cpp | 20 ++ libuavcan/test/transport/transfer_buffer.cpp | 157 +++++++++++- 3 files changed, 398 insertions(+), 5 deletions(-) diff --git a/libuavcan/include/uavcan/internal/transport/transfer_buffer.hpp b/libuavcan/include/uavcan/internal/transport/transfer_buffer.hpp index b53382989f..d86f2e57ac 100644 --- a/libuavcan/include/uavcan/internal/transport/transfer_buffer.hpp +++ b/libuavcan/include/uavcan/internal/transport/transfer_buffer.hpp @@ -6,9 +6,11 @@ #include #include +#include #include #include #include +#include namespace uavcan { @@ -20,6 +22,9 @@ class TransferBufferBase { uint64_t update_timestamp_; +protected: + void reset() { update_timestamp_ = 0; } + public: TransferBufferBase() : update_timestamp_(0) @@ -55,6 +60,7 @@ public: void reset(uint8_t node_id = NODE_ID_INVALID) { node_id_ = node_id; + TransferBufferBase::reset(); resetImpl(); } }; @@ -101,6 +107,9 @@ public: resetImpl(); } + static DynamicTransferBuffer* instantiate(IAllocator* allocator); + static void destroy(DynamicTransferBuffer*& obj, IAllocator* allocator); + int read(unsigned int offset, uint8_t* data, unsigned int len) const; int write(unsigned int offset, const uint8_t* data, unsigned int len); }; @@ -163,6 +172,37 @@ public: max_write_pos_ = std::max(offset + len, max_write_pos_); return len; } + + bool migrateFrom(const TransferBufferManagerEntry* tbme) + { + if (tbme == NULL || tbme->isEmpty()) + { + assert(0); + return false; + } + + // Resetting self and moving all data from the source + reset(tbme->getNodeID()); + setUpdateTimestamp(tbme->getUpdateTimestamp()); + const int res = tbme->read(0, data_, SIZE); + if (res < 0) + { + reset(); + return false; + } + max_write_pos_ = res; + if (res < int(SIZE)) + return true; + + // Now we need to make sure that all data can fit the storage + uint8_t dummy = 0; + if (tbme->read(SIZE, &dummy, 1) > 0) + { + reset(); // Damn, the buffer was too large + return false; + } + return true; + } }; /** @@ -178,4 +218,190 @@ public: virtual void cleanup(uint64_t oldest_timestamp) = 0; }; +/** + * Buffer manager implementation. + */ +template +class TransferBufferManager : public ITransferBufferManager +{ + typedef StaticTransferBuffer StaticBufferType; + + StaticBufferType static_buffers_[NUM_STATIC_BUFS]; + LinkedListRoot dynamic_buffers_; + IAllocator* const allocator_; + + StaticBufferType* findFirstStatic(uint8_t node_id) + { + assert((node_id == NODE_ID_INVALID) || (node_id <= NODE_ID_MAX)); + for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++) + { + if (static_buffers_[i].getNodeID() == node_id) + return static_buffers_ + i; + } + return NULL; + } + + DynamicTransferBuffer* findFirstDynamic(uint8_t node_id) + { + DynamicTransferBuffer* dyn = dynamic_buffers_.get(); + while (dyn) + { + assert(!dyn->isEmpty()); + if (dyn->getNodeID() == node_id) + return dyn; + dyn = dyn->getNextListNode(); + } + return NULL; + } + + void optimizeStorage() + { + while (!dynamic_buffers_.isEmpty()) + { + StaticBufferType* const sb = findFirstStatic(NODE_ID_INVALID); + if (sb == NULL) + break; + DynamicTransferBuffer* dyn = dynamic_buffers_.get(); + assert(dyn); + assert(!dyn->isEmpty()); + if (sb->migrateFrom(dyn)) + { + assert(!dyn->isEmpty()); + assert(dyn->getUpdateTimestamp() == sb->getUpdateTimestamp()); + UAVCAN_TRACE("TransferBufferManager", "Storage optimization: Migrated NID %i", int(dyn->getNodeID())); + dynamic_buffers_.remove(dyn); + DynamicTransferBuffer::destroy(dyn, allocator_); + } + else + { + /* Migration can fail if a dynamic buffer contains more data than a static buffer can accomodate (more + * than STATIC_BUF_SIZE). This means that there is probably something wrong with the network. Logic + * that uses this class should explicitly ensure the proper maximum data size. + */ + UAVCAN_TRACE("TransferBufferManager", "Storage optimization: MIGRATION FAILURE NID %i BUFSIZE %u", + int(dyn->getNodeID()), STATIC_BUF_SIZE); + sb->reset(); + break; // Probably we should try to migrate the rest? + } + } + } + +public: + TransferBufferManager(IAllocator* allocator) + : allocator_(allocator) + { } + + ~TransferBufferManager() + { + cleanup(std::numeric_limits::max()); + } + + unsigned int getNumDynamicBuffers() const { return dynamic_buffers_.length(); } + + unsigned int getNumStaticBuffers() const + { + unsigned int res = 0; + for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++) + { + if (!static_buffers_[i].isEmpty()) + res++; + } + return res; + } + + TransferBufferBase* access(uint8_t node_id) + { + if (node_id > NODE_ID_MAX || node_id == NODE_ID_INVALID) + { + assert(0); + return NULL; + } + TransferBufferManagerEntry* tbme = findFirstStatic(node_id); + if (tbme) + return tbme; + return findFirstDynamic(node_id); + } + + TransferBufferBase* create(uint8_t node_id) + { + if (node_id > NODE_ID_MAX || node_id == NODE_ID_INVALID) + { + assert(0); + return NULL; + } + remove(node_id); + + TransferBufferManagerEntry* tbme = findFirstStatic(NODE_ID_INVALID); + if (tbme == NULL) + { + DynamicTransferBuffer* dyn = DynamicTransferBuffer::instantiate(allocator_); + tbme = dyn; + if (dyn == NULL) + return NULL; // Epic fail. + dynamic_buffers_.insert(dyn); + } + + if (tbme) + { + assert(tbme->isEmpty()); + tbme->reset(node_id); + } + return tbme; + } + + void remove(uint8_t node_id) + { + assert((node_id <= NODE_ID_MAX && node_id != NODE_ID_INVALID)); + + TransferBufferManagerEntry* const tbme = findFirstStatic(node_id); + if (tbme) + { + tbme->reset(); + optimizeStorage(); + return; + } + + DynamicTransferBuffer* dyn = findFirstDynamic(node_id); + if (dyn) + { + dynamic_buffers_.remove(dyn); + DynamicTransferBuffer::destroy(dyn, allocator_); + } + } + + void cleanup(uint64_t oldest_timestamp) + { + int num_released_statics = 0; + for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++) + { + TransferBufferManagerEntry* const buf = static_buffers_ + i; + if (buf->isEmpty()) + continue; + if (buf->getUpdateTimestamp() <= oldest_timestamp) + { + UAVCAN_TRACE("TransferBufferManager", "Cleanup: Dead static buffer, NID %i", int(buf->getNodeID())); + buf->reset(); + num_released_statics++; + } + } + + DynamicTransferBuffer* dyn = dynamic_buffers_.get(); + while (dyn) + { + assert(!dyn->isEmpty()); + DynamicTransferBuffer* const next = dyn->getNextListNode(); + if (dyn->getUpdateTimestamp() <= oldest_timestamp) + { + UAVCAN_TRACE("TransferBufferManager", "Cleanup: Dead dynamic buffer, NID %i", int(dyn->getNodeID())); + dynamic_buffers_.remove(dyn); + DynamicTransferBuffer::destroy(dyn, allocator_); + } + dyn = next; + } + + if (num_released_statics > 0) + optimizeStorage(); + } +}; + } diff --git a/libuavcan/src/transport/transfer_buffer.cpp b/libuavcan/src/transport/transfer_buffer.cpp index d84ce9b23e..a76e9dd135 100644 --- a/libuavcan/src/transport/transfer_buffer.cpp +++ b/libuavcan/src/transport/transfer_buffer.cpp @@ -62,6 +62,26 @@ void DynamicTransferBuffer::Block::write(const uint8_t*& inptr, unsigned int tar /* * DynamicTransferBuffer */ +DynamicTransferBuffer* DynamicTransferBuffer::instantiate(IAllocator* allocator) +{ + assert(allocator); + void* const praw = allocator->allocate(sizeof(DynamicTransferBuffer)); + if (praw == NULL) + return NULL; + return new (praw) DynamicTransferBuffer(allocator); +} + +void DynamicTransferBuffer::destroy(DynamicTransferBuffer*& obj, IAllocator* allocator) +{ + assert(allocator); + if (obj != NULL) + { + obj->~DynamicTransferBuffer(); + allocator->deallocate(obj); + obj = NULL; + } +} + void DynamicTransferBuffer::resetImpl() { max_write_pos_ = 0; diff --git a/libuavcan/test/transport/transfer_buffer.cpp b/libuavcan/test/transport/transfer_buffer.cpp index dbd6a65567..1e37bb16eb 100644 --- a/libuavcan/test/transport/transfer_buffer.cpp +++ b/libuavcan/test/transport/transfer_buffer.cpp @@ -4,6 +4,7 @@ #include #include +#include #include static const std::string TEST_DATA = @@ -28,7 +29,8 @@ static void fill(const T a, int value) a[i] = value; } -static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned int offset, int len = -1) +static bool matchAgainst(const std::string& data, const uavcan::TransferBufferBase& tbb, + unsigned int offset = 0, int len = -1) { uint8_t local_buffer[1024]; fill(local_buffer, 0); @@ -39,7 +41,7 @@ static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned const int res = tbb.read(offset, local_buffer, sizeof(local_buffer)); if (res < 0) { - std::cout << "matchAgainstTestData(): res " << res << std::endl; + std::cout << "matchAgainst(): res " << res << std::endl; return false; } len = res; @@ -49,23 +51,28 @@ static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned const int res = tbb.read(offset, local_buffer, len); if (res != len) { - std::cout << "matchAgainstTestData(): res " << res << " expected " << len << std::endl; + std::cout << "matchAgainst(): res " << res << " expected " << len << std::endl; return false; } } - const bool equals = std::equal(local_buffer, local_buffer + len, TEST_DATA.begin() + offset); + const bool equals = std::equal(local_buffer, local_buffer + len, data.begin() + offset); if (!equals) { std::cout << "local_buffer:\n\t" << local_buffer << std::endl; std::cout - << "test_data:\n\t" << std::string(TEST_DATA.begin() + offset, TEST_DATA.begin() + offset + len) + << "test_data:\n\t" << std::string(data.begin() + offset, data.begin() + offset + len) << std::endl; } return equals; } +static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned int offset, int len = -1) +{ + return matchAgainst(TEST_DATA, tbb, offset, len); +} + TEST(TransferBuffer, TestDataValidation) { ASSERT_LE(4, TEST_DATA.length() / uavcan::MEM_POOL_BLOCK_SIZE); @@ -188,3 +195,143 @@ TEST(DynamicTransferBuffer, Basic) buf.~DynamicTransferBuffer(); ASSERT_EQ(0, pool.getNumUsedBlocks()); } + + +static const std::string MGR_TEST_DATA[4] = +{ + "I thought you would cry out again \'don\'t speak of it, leave off.\'\" Raskolnikov gave a laugh, but rather a " + "forced one. \"What, silence again?\" he asked a minute later. \"We must talk about something, you know. ", + + "It would be interesting for me to know how you would decide a certain \'problem\' as Lebeziatnikov would say.\" " + "(He was beginning to lose the thread.) \"No, really, I am serious. Imagine, Sonia, that you had known all ", + + "Luzhin\'s intentions beforehand. Known, that is, for a fact, that they would be the ruin of Katerina Ivanovna " + "and the children and yourself thrown in--since you don\'t count yourself for anything--Polenka too... for ", + + "she\'ll go the same way. Well, if suddenly it all depended on your decision whether he or they should go on " + "living, that is whether Luzhin should go on living and doing wicked things, or Katerina Ivanovna should die? " + "How would you decide which of them was to die? I ask you?" +}; + +static const int MGR_STATIC_BUFFER_SIZE = 100; + +TEST(TransferBufferManager, TestDataValidation) +{ + for (unsigned int i = 0; i < sizeof(MGR_TEST_DATA) / sizeof(MGR_TEST_DATA[0]); i++) + { + ASSERT_LT(MGR_STATIC_BUFFER_SIZE, MGR_TEST_DATA[i].length()); + } +} + + +static int fillTestData(const std::string& data, uavcan::TransferBufferBase* tbb) +{ + return tbb->write(0, reinterpret_cast(data.c_str()), data.length()); +} + +TEST(TransferBufferManager, Basic) +{ + using uavcan::TransferBufferManager; + using uavcan::TransferBufferBase; + + static const int POOL_BLOCKS = 8; + uavcan::PoolAllocator pool; + uavcan::PoolManager<1> poolmgr; + poolmgr.addPool(&pool); + + typedef TransferBufferManager TransferBufferManagerType; + std::auto_ptr mgr(new TransferBufferManagerType(&poolmgr)); + + // Empty + ASSERT_FALSE(mgr->access(0)); + ASSERT_FALSE(mgr->access(uavcan::NODE_ID_MAX)); + + TransferBufferBase* tbb = NULL; + + // Static 0 + ASSERT_TRUE((tbb = mgr->create(0))); + tbb->setUpdateTimestamp(1234); + ASSERT_EQ(MGR_STATIC_BUFFER_SIZE, fillTestData(MGR_TEST_DATA[0], tbb)); + ASSERT_EQ(1, mgr->getNumStaticBuffers()); + + // Static 1 + ASSERT_TRUE((tbb = mgr->create(1))); + tbb->setUpdateTimestamp(2345); + ASSERT_EQ(MGR_STATIC_BUFFER_SIZE, fillTestData(MGR_TEST_DATA[1], tbb)); + ASSERT_EQ(2, mgr->getNumStaticBuffers()); + ASSERT_EQ(0, mgr->getNumDynamicBuffers()); + ASSERT_EQ(0, pool.getNumUsedBlocks()); + + // Dynamic 0 + ASSERT_TRUE((tbb = mgr->create(2))); + ASSERT_EQ(1, pool.getNumUsedBlocks()); // Empty dynamic buffer occupies one block + tbb->setUpdateTimestamp(3456); + ASSERT_EQ(MGR_TEST_DATA[2].length(), fillTestData(MGR_TEST_DATA[2], tbb)); + ASSERT_EQ(2, mgr->getNumStaticBuffers()); + ASSERT_EQ(1, mgr->getNumDynamicBuffers()); + ASSERT_LT(1, pool.getNumUsedBlocks()); + + std::cout << "TransferBufferManager - Basic: Pool usage: " << pool.getNumUsedBlocks() << std::endl; + + // Dynamic 2 + ASSERT_TRUE((tbb = mgr->create(127))); + ASSERT_EQ(0, pool.getNumFreeBlocks()); // The test assumes that the memory must be exhausted now + tbb->setUpdateTimestamp(4567); + + ASSERT_EQ(0, fillTestData(MGR_TEST_DATA[3], tbb)); + ASSERT_EQ(2, mgr->getNumStaticBuffers()); + ASSERT_EQ(2, mgr->getNumDynamicBuffers()); + + // Dynamic 3 - will fail due to OOM + ASSERT_FALSE((tbb = mgr->create(64))); + ASSERT_EQ(2, mgr->getNumStaticBuffers()); + ASSERT_EQ(2, mgr->getNumDynamicBuffers()); + + // Making sure all buffers contain proper data + ASSERT_TRUE((tbb = mgr->access(0))); + ASSERT_EQ(1234, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[0], *tbb)); + + ASSERT_TRUE((tbb = mgr->access(1))); + ASSERT_EQ(2345, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[1], *tbb)); + + ASSERT_TRUE((tbb = mgr->access(2))); + ASSERT_EQ(3456, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[2], *tbb)); + + ASSERT_TRUE((tbb = mgr->access(127))); + ASSERT_EQ(4567, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[3], *tbb)); + + // Freeing one static buffer; one dynamic must migrate + mgr->remove(1); + ASSERT_FALSE(mgr->access(1)); + ASSERT_EQ(2, mgr->getNumStaticBuffers()); + ASSERT_EQ(1, mgr->getNumDynamicBuffers()); // One migrated to the static + ASSERT_LT(0, pool.getNumFreeBlocks()); + + // Cleanup must remove NodeID 0 due to low timestamp; migration should fail due to oversized data + mgr->cleanup(2000); + ASSERT_FALSE(mgr->access(0)); + ASSERT_EQ(1, mgr->getNumStaticBuffers()); + ASSERT_EQ(1, mgr->getNumDynamicBuffers()); // Migration failed + + // At this time we have the following NodeID: 2, 127 + ASSERT_TRUE((tbb = mgr->access(2))); + ASSERT_EQ(3456, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[2], *tbb)); + + ASSERT_TRUE((tbb = mgr->access(127))); + ASSERT_EQ(4567, tbb->getUpdateTimestamp()); + ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[3], *tbb)); + + // These were deleted: 0, 1 + ASSERT_FALSE(mgr->access(1)); + ASSERT_FALSE(mgr->access(0)); + + // Deleting the object; all memory must be freed + ASSERT_NE(0, pool.getNumUsedBlocks()); + mgr.reset(); + ASSERT_EQ(0, pool.getNumUsedBlocks()); +}