Centralized server: Allocation table cache removed

This commit is contained in:
Pavel Kirienko 2015-06-18 17:50:17 +03:00
parent b8ce1699a5
commit 356f46d08a
4 changed files with 145 additions and 293 deletions

View File

@ -42,12 +42,12 @@ class Server : IAllocationRequestHandler
*/
bool isNodeIDTaken(const NodeID node_id) const
{
return storage_.findByNodeID(node_id) != NULL;
return storage_.isNodeIDOccupied(node_id);
}
void tryPublishAllocationResult(const Storage::Entry& entry)
void tryPublishAllocationResult(const NodeID node_id, const UniqueID& unique_id)
{
const int res = allocation_request_manager_.broadcastAllocationResponse(entry.unique_id, entry.node_id);
const int res = allocation_request_manager_.broadcastAllocationResponse(unique_id, node_id);
if (res < 0)
{
tracer_.onEvent(TraceError, res);
@ -65,10 +65,10 @@ class Server : IAllocationRequestHandler
virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id)
{
const Storage::Entry* const e = storage_.findByUniqueID(unique_id);
if (e != NULL)
const NodeID existing_node_id = storage_.getNodeIDForUniqueID(unique_id);
if (existing_node_id.isValid())
{
tryPublishAllocationResult(*e);
tryPublishAllocationResult(existing_node_id, unique_id);
}
else
{
@ -80,7 +80,7 @@ class Server : IAllocationRequestHandler
const int res = storage_.add(allocated_node_id, unique_id);
if (res >= 0)
{
tryPublishAllocationResult(Storage::Entry(allocated_node_id, unique_id));
tryPublishAllocationResult(allocated_node_id, unique_id);
}
else
{
@ -105,12 +105,12 @@ class Server : IAllocationRequestHandler
virtual NodeAwareness checkNodeAwareness(NodeID node_id) const
{
return (storage_.findByNodeID(node_id) == NULL) ? NodeAwarenessUnknown : NodeAwarenessKnownAndCommitted;
return storage_.isNodeIDOccupied(node_id) ? NodeAwarenessKnownAndCommitted : NodeAwarenessUnknown;
}
virtual void handleNewNodeDiscovery(const UniqueID* unique_id_or_null, NodeID node_id)
{
if (storage_.findByNodeID(node_id) != NULL)
if (storage_.isNodeIDOccupied(node_id))
{
UAVCAN_ASSERT(0); // Such node is already known, the class that called this method should have known that
return;
@ -152,25 +152,15 @@ public:
own_unique_id_ = own_unique_id;
{
const Storage::Entry* e = storage_.findByNodeID(node_.getNodeID());
if (e != NULL)
const NodeID stored_own_node_id = storage_.getNodeIDForUniqueID(own_unique_id_);
if (stored_own_node_id.isValid())
{
if (e->unique_id != own_unique_id_)
if (stored_own_node_id != node_.getNodeID())
{
return -ErrInvalidConfiguration;
}
}
e = storage_.findByUniqueID(own_unique_id_);
if (e != NULL)
{
if (e->node_id != node_.getNodeID())
{
return -ErrInvalidConfiguration;
}
}
if (e == NULL)
else
{
res = storage_.add(node_.getNodeID(), own_unique_id_);
if (res < 0)
@ -198,7 +188,7 @@ public:
return 0;
}
Storage::Size getNumAllocations() const { return storage_.getSize(); }
uint8_t getNumAllocations() const { return storage_.getSize(); }
};
}

View File

@ -9,6 +9,7 @@
#include <uavcan/debug.hpp>
#include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
#include <uavcan/protocol/dynamic_node_id_server/event.hpp>
#include <uavcan/util/bitset.hpp>
namespace uavcan
{
@ -22,148 +23,59 @@ namespace centralized
*/
class Storage
{
public:
typedef uint8_t Size;
typedef BitSet<NodeID::Max + 1> OccupationMask;
typedef Array<IntegerSpec<8, SignednessUnsigned, CastModeSaturate>, ArrayModeStatic,
BitLenToByteLen<NodeID::Max + 1>::Result>
OccupationMaskArray;
enum { Capacity = NodeID::Max };
struct Entry
{
UniqueID unique_id;
NodeID node_id;
Entry() { }
Entry(const NodeID nid, const UniqueID& uid)
: unique_id(uid)
, node_id(nid)
{ }
bool operator==(const Entry& rhs) const
{
return unique_id == rhs.unique_id &&
node_id == rhs.node_id;
}
};
private:
IStorageBackend& storage_;
Entry entries_[Capacity];
Size size_;
OccupationMask occupation_mask_;
static IStorageBackend::String getSizeKey() { return "size"; }
static IStorageBackend::String getOccupationMaskKey() { return "occupation_mask"; }
static IStorageBackend::String makeEntryKey(Size index, const char* postfix)
static OccupationMask maskFromArray(const OccupationMaskArray& array)
{
IStorageBackend::String str;
// "0_foobar"
str.appendFormatted("%d", int(index));
str += "_";
str += postfix;
return str;
OccupationMask mask;
for (uint8_t byte = 0; byte < array.size(); byte++)
{
for (uint8_t bit = 0; bit < 8; bit++)
{
mask[byte * 8U + bit] = (array[byte] & (1U << bit)) != 0;
}
}
return mask;
}
int readEntryFromStorage(Size index, Entry& out_entry)
static OccupationMaskArray maskToArray(const OccupationMask& mask)
{
const StorageMarshaller io(storage_);
// Unique ID
if (io.get(makeEntryKey(index, "unique_id"), out_entry.unique_id) < 0)
OccupationMaskArray array;
for (uint8_t byte = 0; byte < array.size(); byte++)
{
return -ErrFailure;
for (uint8_t bit = 0; bit < 8; bit++)
{
if (mask[byte * 8U + bit])
{
array[byte] |= static_cast<uint8_t>(1U << bit);
}
}
}
// Node ID
uint32_t node_id = 0;
if (io.get(makeEntryKey(index, "node_id"), node_id) < 0)
{
return -ErrFailure;
}
if (node_id > NodeID::Max || node_id == 0)
{
return -ErrFailure;
}
out_entry.node_id = NodeID(static_cast<uint8_t>(node_id));
return 0;
}
int writeEntryToStorage(Size index, const Entry& entry)
{
Entry temp = entry;
StorageMarshaller io(storage_);
// Unique ID
if (io.setAndGetBack(makeEntryKey(index, "unique_id"), temp.unique_id) < 0)
{
return -ErrFailure;
}
// Node ID
uint32_t node_id = entry.node_id.get();
if (io.setAndGetBack(makeEntryKey(index, "node_id"), node_id) < 0)
{
return -ErrFailure;
}
temp.node_id = NodeID(static_cast<uint8_t>(node_id));
return (temp == entry) ? 0 : -ErrFailure;
return array;
}
public:
Storage(IStorageBackend& storage)
: storage_(storage)
, size_(0)
Storage(IStorageBackend& storage) :
storage_(storage)
{ }
/**
* This method reads all entries from the storage.
* This method reads only the occupation mask from the storage.
*/
int init()
{
StorageMarshaller io(storage_);
// Reading size
size_ = 0;
{
uint32_t value = 0;
if (io.get(getSizeKey(), value) < 0)
{
if (storage_.get(getSizeKey()).empty())
{
int res = io.setAndGetBack(getSizeKey(), value);
if (res < 0)
{
return res;
}
return (value == 0) ? 0 : -ErrFailure;
}
else
{
// There's some data in the storage, but it cannot be parsed - reporting an error
return -ErrFailure;
}
}
if (value > Capacity)
{
return -ErrFailure;
}
size_ = Size(value);
}
// Restoring entries
for (Size index = 0; index < size_; index++)
{
const int result = readEntryFromStorage(index, entries_[index]);
if (result < 0)
{
return result;
}
}
OccupationMaskArray array;
io.get(getOccupationMaskKey(), array);
occupation_mask_ = maskFromArray(array);
return 0;
}
@ -173,77 +85,62 @@ public:
*/
int add(const NodeID node_id, const UniqueID& unique_id)
{
if (size_ == Capacity)
{
return -ErrLogic;
}
if (!node_id.isUnicast())
{
return -ErrInvalidParam;
}
Entry entry;
entry.node_id = node_id;
entry.unique_id = unique_id;
StorageMarshaller io(storage_);
// If next operations fail, we'll get a dangling entry, but it's absolutely OK.
int res = writeEntryToStorage(size_, entry);
if (res < 0)
{
return res;
uint32_t node_id_int = node_id.get();
int res = io.setAndGetBack(StorageMarshaller::convertUniqueIDToHex(unique_id), node_id_int);
if (res < 0)
{
return res;
}
if (node_id_int != node_id.get())
{
return -ErrFailure;
}
}
// Updating the size
StorageMarshaller io(storage_);
uint32_t new_size_index = size_ + 1U;
res = io.setAndGetBack(getSizeKey(), new_size_index);
// Updating the mask in the storage
OccupationMask new_occupation_mask = occupation_mask_;
new_occupation_mask[node_id.get()] = true;
OccupationMaskArray occupation_array = maskToArray(new_occupation_mask);
int res = io.setAndGetBack(getOccupationMaskKey(), occupation_array);
if (res < 0)
{
return res;
}
if (new_size_index != size_ + 1U)
if (occupation_array != maskToArray(new_occupation_mask))
{
return -ErrFailure;
}
entries_[size_] = entry;
size_++;
// Updating the cached mask only if the storage was updated successfully
occupation_mask_ = new_occupation_mask;
return 0;
}
/**
* Returns nullptr if there's no such entry.
* Returns an invalid node ID if there's no such allocation.
*/
const Entry* findByNodeID(const NodeID node_id) const
NodeID getNodeIDForUniqueID(const UniqueID& unique_id) const
{
for (Size i = 0; i < size_; i++)
{
if (entries_[i].node_id == node_id)
{
return &entries_[i];
}
}
return NULL;
StorageMarshaller io(storage_);
uint32_t node_id = 0;
io.get(StorageMarshaller::convertUniqueIDToHex(unique_id), node_id);
return (node_id > 0 && node_id <= NodeID::Max) ? NodeID(static_cast<uint8_t>(node_id)) : NodeID();
}
/**
* Returns nullptr if there's no such entry.
*/
const Entry* findByUniqueID(const UniqueID& unique_id) const
{
for (Size i = 0; i < size_; i++)
{
if (entries_[i].unique_id == unique_id)
{
return &entries_[i];
}
}
return NULL;
}
bool isNodeIDOccupied(NodeID node_id) const { return occupation_mask_[node_id.get()]; }
Size getSize() const { return size_; }
uint8_t getSize() const { return static_cast<uint8_t>(occupation_mask_.count()); }
};
}

View File

@ -38,6 +38,20 @@ public:
: storage_(storage)
{ }
/**
* Turns a unique ID into a hex string that can be used as a key or as a value.
*/
static IStorageBackend::String convertUniqueIDToHex(const UniqueID& key)
{
IStorageBackend::String serialized;
for (uint8_t i = 0; i < UniqueID::MaxSize; i++)
{
serialized.appendFormatted("%02x", key.at(i));
}
UAVCAN_ASSERT(serialized.size() == UniqueID::MaxSize * 2);
return serialized;
}
/**
* These methods set the value and then immediately read it back.
* 1. Serialize the value.
@ -58,12 +72,7 @@ public:
int setAndGetBack(const IStorageBackend::String& key, UniqueID& inout_value)
{
IStorageBackend::String serialized;
for (uint8_t i = 0; i < UniqueID::MaxSize; i++)
{
serialized.appendFormatted("%02x", inout_value.at(i));
}
UAVCAN_ASSERT(serialized.size() == UniqueID::MaxSize * 2);
const IStorageBackend::String serialized = convertUniqueIDToHex(inout_value);
UAVCAN_TRACE("StorageMarshaller", "Set %s = %s", key.c_str(), serialized.c_str());
storage_.set(key, serialized);

View File

@ -11,6 +11,7 @@
TEST(dynamic_node_id_server_centralized_Storage, Initialization)
{
using namespace uavcan::dynamic_node_id_server::centralized;
using namespace uavcan::dynamic_node_id_server;
// No data in the storage - initializing empty
{
@ -20,95 +21,51 @@ TEST(dynamic_node_id_server_centralized_Storage, Initialization)
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_LE(0, stor.init());
ASSERT_EQ(1, storage.getNumKeys());
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_EQ(0, stor.getSize());
ASSERT_FALSE(stor.findByNodeID(1));
ASSERT_FALSE(stor.findByNodeID(0));
}
// Nonempty storage, one item
{
MemoryStorageBackend storage;
Storage stor(storage);
storage.set("size", "1");
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // Expected one entry, none found
ASSERT_EQ(1, storage.getNumKeys());
storage.set("0_unique_id", "00000000000000000000000000000000");
storage.set("0_node_id", "0");
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // Invalid entry - zero Node ID
storage.set("0_node_id", "1");
ASSERT_LE(0, stor.init()); // OK now
ASSERT_EQ(3, storage.getNumKeys());
ASSERT_EQ(1, stor.getSize());
ASSERT_TRUE(stor.findByNodeID(1));
ASSERT_FALSE(stor.findByNodeID(0));
}
// Nonempty storage, broken data
{
MemoryStorageBackend storage;
Storage stor(storage);
storage.set("size", "foobar");
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // Bad value
storage.set("size", "128");
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // Bad value
storage.set("size", "1");
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // No items
ASSERT_EQ(1, storage.getNumKeys());
storage.set("0_unique_id", "00000000000000000000000000000000");
storage.set("0_node_id", "128"); // Bad value (127 max)
ASSERT_EQ(-uavcan::ErrFailure, stor.init()); // Failed
storage.set("0_node_id", "127");
ASSERT_LE(0, stor.init()); // OK now
ASSERT_EQ(1, stor.getSize());
ASSERT_TRUE(stor.findByNodeID(127));
ASSERT_FALSE(stor.findByNodeID(0));
ASSERT_EQ(3, storage.getNumKeys());
ASSERT_FALSE(stor.isNodeIDOccupied(1));
ASSERT_FALSE(stor.isNodeIDOccupied(0));
}
// Nonempty storage, many items
{
MemoryStorageBackend storage;
Storage stor(storage);
storage.set("size", "2");
storage.set("0_unique_id", "00000000000000000000000000000000");
storage.set("0_node_id", "1");
storage.set("1_unique_id", "0123456789abcdef0123456789abcdef");
storage.set("1_node_id", "127");
storage.set("occupation_mask", "0e000000000000000000000000000000"); // node ID 1, 2, 3
ASSERT_LE(0, stor.init()); // OK
ASSERT_LE(0, stor.init()); // OK now
ASSERT_EQ(5, storage.getNumKeys());
ASSERT_EQ(2, stor.getSize());
ASSERT_EQ(1, storage.getNumKeys());
ASSERT_EQ(3, stor.getSize());
ASSERT_TRUE(stor.findByNodeID(1));
ASSERT_TRUE(stor.findByNodeID(127));
ASSERT_FALSE(stor.findByNodeID(0));
ASSERT_TRUE(stor.isNodeIDOccupied(1));
ASSERT_TRUE(stor.isNodeIDOccupied(2));
ASSERT_TRUE(stor.isNodeIDOccupied(3));
ASSERT_FALSE(stor.isNodeIDOccupied(0));
ASSERT_FALSE(stor.isNodeIDOccupied(4));
uavcan::protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id uid;
uid[0] = 0x01;
uid[1] = 0x23;
uid[2] = 0x45;
uid[3] = 0x67;
uid[4] = 0x89;
uid[5] = 0xab;
uid[6] = 0xcd;
uid[7] = 0xef;
uavcan::copy(uid.begin(), uid.begin() + 8, uid.begin() + 8);
UniqueID uid_1;
uid_1[0] = 1;
ASSERT_TRUE(stor.findByUniqueID(uid));
ASSERT_EQ(127, stor.findByUniqueID(uid)->node_id.get());
ASSERT_EQ(uid, stor.findByNodeID(127)->unique_id);
UniqueID uid_2;
uid_2[0] = 2;
UniqueID uid_3;
uid_3[0] = 3;
ASSERT_FALSE(stor.getNodeIDForUniqueID(uid_1).isValid());
ASSERT_FALSE(stor.getNodeIDForUniqueID(uid_2).isValid());
ASSERT_FALSE(stor.getNodeIDForUniqueID(uid_3).isValid());
ASSERT_FALSE(stor.isNodeIDOccupied(127));
storage.set("01000000000000000000000000000000", "1");
storage.set("02000000000000000000000000000000", "2");
storage.set("03000000000000000000000000000000", "3");
ASSERT_EQ(1, stor.getNodeIDForUniqueID(uid_1).get());
ASSERT_EQ(2, stor.getNodeIDForUniqueID(uid_2).get());
ASSERT_EQ(3, stor.getNodeIDForUniqueID(uid_3).get());
ASSERT_FALSE(stor.isNodeIDOccupied(127));
}
}
@ -123,21 +80,19 @@ TEST(dynamic_node_id_server_centralized_Storage, Basic)
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_LE(0, stor.init());
storage.print();
ASSERT_EQ(1, storage.getNumKeys());
ASSERT_EQ(0, storage.getNumKeys());
/*
* Adding one entry to the log, making sure it appears in the storage
*/
Storage::Entry entry;
entry.node_id = 1;
entry.unique_id[0] = 1;
ASSERT_LE(0, stor.add(entry.node_id, entry.unique_id));
uavcan::dynamic_node_id_server::UniqueID unique_id;
unique_id[0] = 1;
ASSERT_LE(0, stor.add(1, unique_id));
ASSERT_EQ("1", storage.get("size"));
ASSERT_EQ("01000000000000000000000000000000", storage.get("0_unique_id"));
ASSERT_EQ("1", storage.get("0_node_id"));
ASSERT_EQ("02000000000000000000000000000000", storage.get("occupation_mask"));
ASSERT_EQ("1", storage.get("01000000000000000000000000000000"));
ASSERT_EQ(3, storage.getNumKeys());
ASSERT_EQ(2, storage.getNumKeys());
ASSERT_EQ(1, stor.getSize());
/*
@ -145,30 +100,31 @@ TEST(dynamic_node_id_server_centralized_Storage, Basic)
*/
storage.failOnSetCalls(true);
ASSERT_EQ(3, storage.getNumKeys());
ASSERT_EQ(2, storage.getNumKeys());
entry.node_id = 2;
entry.unique_id[0] = 2;
ASSERT_GT(0, stor.add(entry.node_id, entry.unique_id));
unique_id[0] = 2;
ASSERT_GT(0, stor.add(2, unique_id));
ASSERT_EQ(3, storage.getNumKeys()); // No new entries, we failed
ASSERT_EQ(2, storage.getNumKeys()); // No new entries, we failed
ASSERT_EQ(1, stor.getSize());
/*
* Making sure add() fails when the log is full
* Adding a lot of entries
*/
storage.failOnSetCalls(false);
while (stor.getSize() < stor.Capacity)
uavcan::NodeID node_id(2);
while (stor.getSize() < 127)
{
ASSERT_LE(0, stor.add(entry.node_id, entry.unique_id));
ASSERT_LE(0, stor.add(node_id, unique_id));
entry.node_id = uint8_t(uavcan::min(entry.node_id.get() + 1U, 127U));
entry.unique_id[0] = uint8_t(entry.unique_id[0] + 1U);
ASSERT_TRUE(stor.getNodeIDForUniqueID(unique_id).isValid());
ASSERT_TRUE(stor.isNodeIDOccupied(node_id));
node_id = uint8_t(uavcan::min(node_id.get() + 1U, 127U));
unique_id[0] = uint8_t(unique_id[0] + 1U);
}
ASSERT_GT(0, stor.add(123, entry.unique_id)); // Failing because full
storage.print();
}