Implemented TransferBufferManager

This commit is contained in:
Pavel Kirienko
2014-02-06 00:08:51 +04:00
parent 690e0257dc
commit 261dd546cd
3 changed files with 398 additions and 5 deletions
@@ -6,9 +6,11 @@
#include <stdint.h>
#include <algorithm>
#include <limits>
#include <uavcan/internal/transport/transfer.hpp>
#include <uavcan/internal/linked_list.hpp>
#include <uavcan/internal/impl_constants.hpp>
#include <uavcan/internal/debug.hpp>
namespace uavcan
{
@@ -20,6 +22,9 @@ class TransferBufferBase
{
uint64_t update_timestamp_;
protected:
void reset() { update_timestamp_ = 0; }
public:
TransferBufferBase()
: update_timestamp_(0)
@@ -55,6 +60,7 @@ public:
void reset(uint8_t node_id = NODE_ID_INVALID)
{
node_id_ = node_id;
TransferBufferBase::reset();
resetImpl();
}
};
@@ -101,6 +107,9 @@ public:
resetImpl();
}
static DynamicTransferBuffer* instantiate(IAllocator* allocator);
static void destroy(DynamicTransferBuffer*& obj, IAllocator* allocator);
int read(unsigned int offset, uint8_t* data, unsigned int len) const;
int write(unsigned int offset, const uint8_t* data, unsigned int len);
};
@@ -163,6 +172,37 @@ public:
max_write_pos_ = std::max(offset + len, max_write_pos_);
return len;
}
bool migrateFrom(const TransferBufferManagerEntry* tbme)
{
if (tbme == NULL || tbme->isEmpty())
{
assert(0);
return false;
}
// Resetting self and moving all data from the source
reset(tbme->getNodeID());
setUpdateTimestamp(tbme->getUpdateTimestamp());
const int res = tbme->read(0, data_, SIZE);
if (res < 0)
{
reset();
return false;
}
max_write_pos_ = res;
if (res < int(SIZE))
return true;
// Now we need to make sure that all data can fit the storage
uint8_t dummy = 0;
if (tbme->read(SIZE, &dummy, 1) > 0)
{
reset(); // Damn, the buffer was too large
return false;
}
return true;
}
};
/**
@@ -178,4 +218,190 @@ public:
virtual void cleanup(uint64_t oldest_timestamp) = 0;
};
/**
* Buffer manager implementation.
*/
template <unsigned int STATIC_BUF_SIZE, unsigned int NUM_STATIC_BUFS>
class TransferBufferManager : public ITransferBufferManager
{
typedef StaticTransferBuffer<STATIC_BUF_SIZE> StaticBufferType;
StaticBufferType static_buffers_[NUM_STATIC_BUFS];
LinkedListRoot<DynamicTransferBuffer> dynamic_buffers_;
IAllocator* const allocator_;
StaticBufferType* findFirstStatic(uint8_t node_id)
{
assert((node_id == NODE_ID_INVALID) || (node_id <= NODE_ID_MAX));
for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++)
{
if (static_buffers_[i].getNodeID() == node_id)
return static_buffers_ + i;
}
return NULL;
}
DynamicTransferBuffer* findFirstDynamic(uint8_t node_id)
{
DynamicTransferBuffer* dyn = dynamic_buffers_.get();
while (dyn)
{
assert(!dyn->isEmpty());
if (dyn->getNodeID() == node_id)
return dyn;
dyn = dyn->getNextListNode();
}
return NULL;
}
void optimizeStorage()
{
while (!dynamic_buffers_.isEmpty())
{
StaticBufferType* const sb = findFirstStatic(NODE_ID_INVALID);
if (sb == NULL)
break;
DynamicTransferBuffer* dyn = dynamic_buffers_.get();
assert(dyn);
assert(!dyn->isEmpty());
if (sb->migrateFrom(dyn))
{
assert(!dyn->isEmpty());
assert(dyn->getUpdateTimestamp() == sb->getUpdateTimestamp());
UAVCAN_TRACE("TransferBufferManager", "Storage optimization: Migrated NID %i", int(dyn->getNodeID()));
dynamic_buffers_.remove(dyn);
DynamicTransferBuffer::destroy(dyn, allocator_);
}
else
{
/* Migration can fail if a dynamic buffer contains more data than a static buffer can accomodate (more
* than STATIC_BUF_SIZE). This means that there is probably something wrong with the network. Logic
* that uses this class should explicitly ensure the proper maximum data size.
*/
UAVCAN_TRACE("TransferBufferManager", "Storage optimization: MIGRATION FAILURE NID %i BUFSIZE %u",
int(dyn->getNodeID()), STATIC_BUF_SIZE);
sb->reset();
break; // Probably we should try to migrate the rest?
}
}
}
public:
TransferBufferManager(IAllocator* allocator)
: allocator_(allocator)
{ }
~TransferBufferManager()
{
cleanup(std::numeric_limits<uint64_t>::max());
}
unsigned int getNumDynamicBuffers() const { return dynamic_buffers_.length(); }
unsigned int getNumStaticBuffers() const
{
unsigned int res = 0;
for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++)
{
if (!static_buffers_[i].isEmpty())
res++;
}
return res;
}
TransferBufferBase* access(uint8_t node_id)
{
if (node_id > NODE_ID_MAX || node_id == NODE_ID_INVALID)
{
assert(0);
return NULL;
}
TransferBufferManagerEntry* tbme = findFirstStatic(node_id);
if (tbme)
return tbme;
return findFirstDynamic(node_id);
}
TransferBufferBase* create(uint8_t node_id)
{
if (node_id > NODE_ID_MAX || node_id == NODE_ID_INVALID)
{
assert(0);
return NULL;
}
remove(node_id);
TransferBufferManagerEntry* tbme = findFirstStatic(NODE_ID_INVALID);
if (tbme == NULL)
{
DynamicTransferBuffer* dyn = DynamicTransferBuffer::instantiate(allocator_);
tbme = dyn;
if (dyn == NULL)
return NULL; // Epic fail.
dynamic_buffers_.insert(dyn);
}
if (tbme)
{
assert(tbme->isEmpty());
tbme->reset(node_id);
}
return tbme;
}
void remove(uint8_t node_id)
{
assert((node_id <= NODE_ID_MAX && node_id != NODE_ID_INVALID));
TransferBufferManagerEntry* const tbme = findFirstStatic(node_id);
if (tbme)
{
tbme->reset();
optimizeStorage();
return;
}
DynamicTransferBuffer* dyn = findFirstDynamic(node_id);
if (dyn)
{
dynamic_buffers_.remove(dyn);
DynamicTransferBuffer::destroy(dyn, allocator_);
}
}
void cleanup(uint64_t oldest_timestamp)
{
int num_released_statics = 0;
for (unsigned int i = 0; i < NUM_STATIC_BUFS; i++)
{
TransferBufferManagerEntry* const buf = static_buffers_ + i;
if (buf->isEmpty())
continue;
if (buf->getUpdateTimestamp() <= oldest_timestamp)
{
UAVCAN_TRACE("TransferBufferManager", "Cleanup: Dead static buffer, NID %i", int(buf->getNodeID()));
buf->reset();
num_released_statics++;
}
}
DynamicTransferBuffer* dyn = dynamic_buffers_.get();
while (dyn)
{
assert(!dyn->isEmpty());
DynamicTransferBuffer* const next = dyn->getNextListNode();
if (dyn->getUpdateTimestamp() <= oldest_timestamp)
{
UAVCAN_TRACE("TransferBufferManager", "Cleanup: Dead dynamic buffer, NID %i", int(dyn->getNodeID()));
dynamic_buffers_.remove(dyn);
DynamicTransferBuffer::destroy(dyn, allocator_);
}
dyn = next;
}
if (num_released_statics > 0)
optimizeStorage();
}
};
}
@@ -62,6 +62,26 @@ void DynamicTransferBuffer::Block::write(const uint8_t*& inptr, unsigned int tar
/*
* DynamicTransferBuffer
*/
DynamicTransferBuffer* DynamicTransferBuffer::instantiate(IAllocator* allocator)
{
assert(allocator);
void* const praw = allocator->allocate(sizeof(DynamicTransferBuffer));
if (praw == NULL)
return NULL;
return new (praw) DynamicTransferBuffer(allocator);
}
void DynamicTransferBuffer::destroy(DynamicTransferBuffer*& obj, IAllocator* allocator)
{
assert(allocator);
if (obj != NULL)
{
obj->~DynamicTransferBuffer();
allocator->deallocate(obj);
obj = NULL;
}
}
void DynamicTransferBuffer::resetImpl()
{
max_write_pos_ = 0;
+152 -5
View File
@@ -4,6 +4,7 @@
#include <algorithm>
#include <gtest/gtest.h>
#include <memory>
#include <uavcan/internal/transport/transfer_buffer.hpp>
static const std::string TEST_DATA =
@@ -28,7 +29,8 @@ static void fill(const T a, int value)
a[i] = value;
}
static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned int offset, int len = -1)
static bool matchAgainst(const std::string& data, const uavcan::TransferBufferBase& tbb,
unsigned int offset = 0, int len = -1)
{
uint8_t local_buffer[1024];
fill(local_buffer, 0);
@@ -39,7 +41,7 @@ static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned
const int res = tbb.read(offset, local_buffer, sizeof(local_buffer));
if (res < 0)
{
std::cout << "matchAgainstTestData(): res " << res << std::endl;
std::cout << "matchAgainst(): res " << res << std::endl;
return false;
}
len = res;
@@ -49,23 +51,28 @@ static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned
const int res = tbb.read(offset, local_buffer, len);
if (res != len)
{
std::cout << "matchAgainstTestData(): res " << res << " expected " << len << std::endl;
std::cout << "matchAgainst(): res " << res << " expected " << len << std::endl;
return false;
}
}
const bool equals = std::equal(local_buffer, local_buffer + len, TEST_DATA.begin() + offset);
const bool equals = std::equal(local_buffer, local_buffer + len, data.begin() + offset);
if (!equals)
{
std::cout
<< "local_buffer:\n\t" << local_buffer
<< std::endl;
std::cout
<< "test_data:\n\t" << std::string(TEST_DATA.begin() + offset, TEST_DATA.begin() + offset + len)
<< "test_data:\n\t" << std::string(data.begin() + offset, data.begin() + offset + len)
<< std::endl;
}
return equals;
}
static bool matchAgainstTestData(const uavcan::TransferBufferBase& tbb, unsigned int offset, int len = -1)
{
return matchAgainst(TEST_DATA, tbb, offset, len);
}
TEST(TransferBuffer, TestDataValidation)
{
ASSERT_LE(4, TEST_DATA.length() / uavcan::MEM_POOL_BLOCK_SIZE);
@@ -188,3 +195,143 @@ TEST(DynamicTransferBuffer, Basic)
buf.~DynamicTransferBuffer();
ASSERT_EQ(0, pool.getNumUsedBlocks());
}
static const std::string MGR_TEST_DATA[4] =
{
"I thought you would cry out again \'don\'t speak of it, leave off.\'\" Raskolnikov gave a laugh, but rather a "
"forced one. \"What, silence again?\" he asked a minute later. \"We must talk about something, you know. ",
"It would be interesting for me to know how you would decide a certain \'problem\' as Lebeziatnikov would say.\" "
"(He was beginning to lose the thread.) \"No, really, I am serious. Imagine, Sonia, that you had known all ",
"Luzhin\'s intentions beforehand. Known, that is, for a fact, that they would be the ruin of Katerina Ivanovna "
"and the children and yourself thrown in--since you don\'t count yourself for anything--Polenka too... for ",
"she\'ll go the same way. Well, if suddenly it all depended on your decision whether he or they should go on "
"living, that is whether Luzhin should go on living and doing wicked things, or Katerina Ivanovna should die? "
"How would you decide which of them was to die? I ask you?"
};
static const int MGR_STATIC_BUFFER_SIZE = 100;
TEST(TransferBufferManager, TestDataValidation)
{
for (unsigned int i = 0; i < sizeof(MGR_TEST_DATA) / sizeof(MGR_TEST_DATA[0]); i++)
{
ASSERT_LT(MGR_STATIC_BUFFER_SIZE, MGR_TEST_DATA[i].length());
}
}
static int fillTestData(const std::string& data, uavcan::TransferBufferBase* tbb)
{
return tbb->write(0, reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
}
TEST(TransferBufferManager, Basic)
{
using uavcan::TransferBufferManager;
using uavcan::TransferBufferBase;
static const int POOL_BLOCKS = 8;
uavcan::PoolAllocator<uavcan::MEM_POOL_BLOCK_SIZE * POOL_BLOCKS, uavcan::MEM_POOL_BLOCK_SIZE> pool;
uavcan::PoolManager<1> poolmgr;
poolmgr.addPool(&pool);
typedef TransferBufferManager<MGR_STATIC_BUFFER_SIZE, 2> TransferBufferManagerType;
std::auto_ptr<TransferBufferManagerType> mgr(new TransferBufferManagerType(&poolmgr));
// Empty
ASSERT_FALSE(mgr->access(0));
ASSERT_FALSE(mgr->access(uavcan::NODE_ID_MAX));
TransferBufferBase* tbb = NULL;
// Static 0
ASSERT_TRUE((tbb = mgr->create(0)));
tbb->setUpdateTimestamp(1234);
ASSERT_EQ(MGR_STATIC_BUFFER_SIZE, fillTestData(MGR_TEST_DATA[0], tbb));
ASSERT_EQ(1, mgr->getNumStaticBuffers());
// Static 1
ASSERT_TRUE((tbb = mgr->create(1)));
tbb->setUpdateTimestamp(2345);
ASSERT_EQ(MGR_STATIC_BUFFER_SIZE, fillTestData(MGR_TEST_DATA[1], tbb));
ASSERT_EQ(2, mgr->getNumStaticBuffers());
ASSERT_EQ(0, mgr->getNumDynamicBuffers());
ASSERT_EQ(0, pool.getNumUsedBlocks());
// Dynamic 0
ASSERT_TRUE((tbb = mgr->create(2)));
ASSERT_EQ(1, pool.getNumUsedBlocks()); // Empty dynamic buffer occupies one block
tbb->setUpdateTimestamp(3456);
ASSERT_EQ(MGR_TEST_DATA[2].length(), fillTestData(MGR_TEST_DATA[2], tbb));
ASSERT_EQ(2, mgr->getNumStaticBuffers());
ASSERT_EQ(1, mgr->getNumDynamicBuffers());
ASSERT_LT(1, pool.getNumUsedBlocks());
std::cout << "TransferBufferManager - Basic: Pool usage: " << pool.getNumUsedBlocks() << std::endl;
// Dynamic 2
ASSERT_TRUE((tbb = mgr->create(127)));
ASSERT_EQ(0, pool.getNumFreeBlocks()); // The test assumes that the memory must be exhausted now
tbb->setUpdateTimestamp(4567);
ASSERT_EQ(0, fillTestData(MGR_TEST_DATA[3], tbb));
ASSERT_EQ(2, mgr->getNumStaticBuffers());
ASSERT_EQ(2, mgr->getNumDynamicBuffers());
// Dynamic 3 - will fail due to OOM
ASSERT_FALSE((tbb = mgr->create(64)));
ASSERT_EQ(2, mgr->getNumStaticBuffers());
ASSERT_EQ(2, mgr->getNumDynamicBuffers());
// Making sure all buffers contain proper data
ASSERT_TRUE((tbb = mgr->access(0)));
ASSERT_EQ(1234, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[0], *tbb));
ASSERT_TRUE((tbb = mgr->access(1)));
ASSERT_EQ(2345, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[1], *tbb));
ASSERT_TRUE((tbb = mgr->access(2)));
ASSERT_EQ(3456, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[2], *tbb));
ASSERT_TRUE((tbb = mgr->access(127)));
ASSERT_EQ(4567, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[3], *tbb));
// Freeing one static buffer; one dynamic must migrate
mgr->remove(1);
ASSERT_FALSE(mgr->access(1));
ASSERT_EQ(2, mgr->getNumStaticBuffers());
ASSERT_EQ(1, mgr->getNumDynamicBuffers()); // One migrated to the static
ASSERT_LT(0, pool.getNumFreeBlocks());
// Cleanup must remove NodeID 0 due to low timestamp; migration should fail due to oversized data
mgr->cleanup(2000);
ASSERT_FALSE(mgr->access(0));
ASSERT_EQ(1, mgr->getNumStaticBuffers());
ASSERT_EQ(1, mgr->getNumDynamicBuffers()); // Migration failed
// At this time we have the following NodeID: 2, 127
ASSERT_TRUE((tbb = mgr->access(2)));
ASSERT_EQ(3456, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[2], *tbb));
ASSERT_TRUE((tbb = mgr->access(127)));
ASSERT_EQ(4567, tbb->getUpdateTimestamp());
ASSERT_TRUE(matchAgainst(MGR_TEST_DATA[3], *tbb));
// These were deleted: 0, 1
ASSERT_FALSE(mgr->access(1));
ASSERT_FALSE(mgr->access(0));
// Deleting the object; all memory must be freed
ASSERT_NE(0, pool.getNumUsedBlocks());
mgr.reset();
ASSERT_EQ(0, pool.getNumUsedBlocks());
}