Refactored node ID allocation server; no changes to the logic

This commit is contained in:
Pavel Kirienko
2015-05-09 16:03:22 +03:00
parent 098c29ce93
commit 18d5cb78aa
14 changed files with 2810 additions and 2773 deletions
@@ -1,806 +0,0 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_ALLOCATION_SERVER_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_ALLOCATION_SERVER_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/node/subscriber.hpp>
#include <uavcan/node/publisher.hpp>
#include <uavcan/node/service_server.hpp>
#include <uavcan/node/service_client.hpp>
#include <uavcan/node/timer.hpp>
#include <uavcan/util/method_binder.hpp>
#include <uavcan/util/lazy_constructor.hpp>
#include <uavcan/util/map.hpp>
// Types used by the server
#include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp>
#include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp>
#include <uavcan/protocol/dynamic_node_id/server/Entry.hpp>
#include <uavcan/protocol/dynamic_node_id/server/Discovery.hpp>
#include <uavcan/protocol/dynamic_node_id/Allocation.hpp>
#include <uavcan/protocol/GetNodeInfo.hpp>
#include <uavcan/protocol/NodeStatus.hpp>
namespace uavcan
{
/**
* This interface is used by the server to read and write stable storage.
* The storage is represented as a key-value container, where keys and values are ASCII strings up to 32
* characters long, not including the termination byte. Fixed block size allows for absolutely straightforward
* and efficient implementation of storage backends, e.g. based on text files.
* Keys and values may contain only non-whitespace, non-formatting printable characters.
*/
class IDynamicNodeIDStorageBackend
{
public:
/**
* Maximum length of keys and values. One pair takes twice as much space.
*/
enum { MaxStringLength = 32 };
/**
* It is guaranteed that the server will never require more than this number of key/value pairs.
* Total storage space needed is (MaxKeyValuePairs * MaxStringLength * 2), not including storage overhead.
*/
enum { MaxKeyValuePairs = 400 };
/**
* This type is used to exchange data chunks with the backend.
* It doesn't use any dynamic memory; please refer to the Array<> class for details.
*/
typedef Array<IntegerSpec<8, SignednessUnsigned, CastModeTruncate>, ArrayModeDynamic, MaxStringLength> String;
/**
* Read one value from the storage.
* If such key does not exist, or if read failed, an empty string will be returned.
* This method should not block for more than 50 ms.
*/
virtual String get(const String& key) const = 0;
/**
* Create or update value for the given key. Empty value should be regarded as a request to delete the key.
* This method should not block for more than 50 ms.
* Failures will be ignored.
*/
virtual void set(const String& key, const String& value) = 0;
virtual ~IDynamicNodeIDStorageBackend() { }
};
/**
* This interface allows the application to trace events that happen in the server.
*/
class IDynamicNodeIDAllocationServerEventTracer
{
public:
#if UAVCAN_TOSTRING
/**
* It is safe to call this function with any argument.
* If the event code is out of range, an assertion failure will be triggered and an error text will be returned.
*/
static const char* getEventName(uint16_t code);
#endif
/**
* The server invokes this method every time it believes that a noteworthy event has happened.
* The table of event codes can be found in the server sources.
* It is guaranteed that event code values will never change, but new ones can be added in future. This ensures
* full backward compatibility.
* @param event_code Event code, see the sources for the enum with values.
* @param event_argument Value associated with the event; its meaning depends on the event code.
*/
virtual void onEvent(uint16_t event_code, int64_t event_argument) = 0;
virtual ~IDynamicNodeIDAllocationServerEventTracer() { }
};
/**
* Internals, do not use anything from this namespace directly.
*/
namespace dynamic_node_id_server_impl
{
using namespace protocol::dynamic_node_id::server;
/**
* Raft term
*/
typedef StorageType<Entry::FieldTypes::term>::Type Term;
/**
* @ref IDynamicNodeIDAllocationServerEventTracer.
* Event codes cannot be changed, only new ones can be added.
*/
enum TraceEvent
{
// Event name Argument
// 0
TraceError, // error code (may be negated)
TraceLogLastIndexRestored, // recovered last index value
TraceLogAppend, // index of new entry
TraceLogRemove, // new last index value
TraceCurrentTermRestored, // current term
// 5
TraceCurrentTermUpdate, // current term
TraceVotedForRestored, // value of votedFor
TraceVotedForUpdate, // value of votedFor
TraceDiscoveryBroadcast, // number of known servers
TraceNewServerDiscovered, // node ID of the new server
// 10
TraceDiscoveryReceived, // node ID of the sender
TraceClusterSizeInited, // cluster size
TraceInvalidClusterSizeReceived, // received cluster size
TraceRaftCoreInited, // update interval in usec
TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader
// 15
TraceRaftActiveSwitch, // 0 - Passive, 1 - Active
TraceRaftNewLogEntry, // node ID value
TraceRaftRequestIgnored, // node ID of the client
TraceRaftVoteRequestReceived, // node ID of the client
TraceRaftVoteRequestSucceeded, // node ID of the server
// 20
TraceRaftVoteRequestInitiation, // node ID of the server
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftCommitIndexUpdate, // new commit index value
TraceRaftNewerTermInResponse, // new term value
TraceRaftNewEntryCommitted, // new commit index value
// 25
TraceRaftAppendEntriesCallFailure, // error code (may be negated)
NumTraceEventCodes
};
/**
* This class extends the storage backend interface with serialization/deserialization functionality.
*/
class MarshallingStorageDecorator
{
IDynamicNodeIDStorageBackend& storage_;
static uint8_t convertLowerCaseHexCharToNibble(char ch);
public:
MarshallingStorageDecorator(IDynamicNodeIDStorageBackend& storage)
: storage_(storage)
{
// Making sure that there will be no data loss during serialization.
StaticAssert<(sizeof(Term) <= sizeof(uint32_t))>::check();
}
/**
* These methods set the value and then immediately read it back.
* 1. Serialize the value.
* 2. Update the value on the backend.
* 3. Call get() with the same value argument.
* The caller then is supposed to check whether the argument has the desired value.
*/
int setAndGetBack(const IDynamicNodeIDStorageBackend::String& key, uint32_t& inout_value);
int setAndGetBack(const IDynamicNodeIDStorageBackend::String& key,
Entry::FieldTypes::unique_id& inout_value);
/**
* Getters simply read and deserialize the value.
* 1. Read the value back from the backend; return false if read fails.
* 2. Deserealize the newly read value; return false if deserialization fails.
* 3. Update the argument with deserialized value.
* 4. Return true.
*/
int get(const IDynamicNodeIDStorageBackend::String& key, uint32_t& out_value) const;
int get(const IDynamicNodeIDStorageBackend::String& key,
Entry::FieldTypes::unique_id& out_value) const;
};
/**
* Raft log.
* This class transparently replicates its state to the storage backend, keeping the most recent state in memory.
* Writes are slow, reads are instantaneous.
*/
class Log
{
public:
typedef uint8_t Index;
enum { Capacity = NodeID::Max + 1 };
private:
IDynamicNodeIDStorageBackend& storage_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
Entry entries_[Capacity];
Index last_index_; // Index zero always contains an empty entry
static IDynamicNodeIDStorageBackend::String getLastIndexKey() { return "log_last_index"; }
static IDynamicNodeIDStorageBackend::String makeEntryKey(Index index, const char* postfix);
int readEntryFromStorage(Index index, Entry& out_entry);
int writeEntryToStorage(Index index, const Entry& entry);
int initEmptyLogStorage();
public:
Log(IDynamicNodeIDStorageBackend& storage, IDynamicNodeIDAllocationServerEventTracer& tracer)
: storage_(storage)
, tracer_(tracer)
, last_index_(0)
{ }
/**
* Initialization is performed as follows (every step may fail and return an error):
* 1. Log is restored or initialized.
* 2. Current term is restored. If there was no current term stored and the log is empty, it will be initialized
* with zero.
* 3. VotedFor value is restored. If there was no VotedFor value stored, the log is empty, and the current term is
* zero, the value will be initialized with zero.
*/
int init();
/**
* This method invokes storage IO.
* Returned value indicates whether the entry was successfully appended.
*/
int append(const Entry& entry);
/**
* This method invokes storage IO.
* Returned value indicates whether the requested operation has been carried out successfully.
*/
int removeEntriesWhereIndexGreaterOrEqual(Index index);
int removeEntriesWhereIndexGreater(Index index);
/**
* Returns nullptr if there's no such index.
* This method does not use storage IO.
*/
const Entry* getEntryAtIndex(Index index) const;
Index getLastIndex() const { return last_index_; }
bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const;
};
/**
* This class is a convenient container for persistent state variables defined by Raft.
* Writes are slow, reads are instantaneous.
*/
class PersistentState
{
IDynamicNodeIDStorageBackend& storage_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
Term current_term_;
NodeID voted_for_;
Log log_;
static IDynamicNodeIDStorageBackend::String getCurrentTermKey() { return "current_term"; }
static IDynamicNodeIDStorageBackend::String getVotedForKey() { return "voted_for"; }
public:
PersistentState(IDynamicNodeIDStorageBackend& storage, IDynamicNodeIDAllocationServerEventTracer& tracer)
: storage_(storage)
, tracer_(tracer)
, current_term_(0)
, log_(storage, tracer)
{ }
int init();
Term getCurrentTerm() const { return current_term_; }
NodeID getVotedFor() const { return voted_for_; }
bool isVotedForSet() const { return voted_for_.isUnicast(); }
Log& getLog() { return log_; }
const Log& getLog() const { return log_; }
/**
* Invokes storage IO.
*/
int setCurrentTerm(Term term);
/**
* Invokes storage IO.
*/
int setVotedFor(NodeID node_id);
int resetVotedFor() { return setVotedFor(NodeID(0)); }
};
/**
* This class maintains the cluster state.
*/
class ClusterManager : private TimerBase
{
public:
enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize };
private:
typedef MethodBinder<ClusterManager*,
void (ClusterManager::*)
(const ReceivedDataStructure<Discovery>&)>
DiscoveryCallback;
struct Server
{
NodeID node_id;
Log::Index next_index;
Log::Index match_index;
Server()
: next_index(0)
, match_index(0)
{ }
void resetIndices(const Log& log);
};
IDynamicNodeIDStorageBackend& storage_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
const Log& log_;
Subscriber<Discovery, DiscoveryCallback> discovery_sub_;
mutable Publisher<Discovery> discovery_pub_;
Server servers_[MaxClusterSize - 1]; ///< Minus one because the local server is not listed there.
uint8_t cluster_size_;
uint8_t num_known_servers_;
bool had_discovery_activity_;
static IDynamicNodeIDStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; }
INode& getNode() { return discovery_sub_.getNode(); }
const INode& getNode() const { return discovery_sub_.getNode(); }
Server* findServer(NodeID node_id);
const Server* findServer(NodeID node_id) const;
void addServer(NodeID node_id);
virtual void handleTimerEvent(const TimerEvent&);
void handleDiscovery(const ReceivedDataStructure<Discovery>& msg);
void startDiscoveryPublishingTimerIfNotRunning();
public:
enum { ClusterSizeUnknown = 0 };
/**
* @param node Needed to publish and subscribe to Discovery message
* @param storage Needed to read the cluster size parameter from the storage
* @param log Needed to initialize nextIndex[] values after elections
*/
ClusterManager(INode& node, IDynamicNodeIDStorageBackend& storage, const Log& log,
IDynamicNodeIDAllocationServerEventTracer& tracer)
: TimerBase(node)
, storage_(storage)
, tracer_(tracer)
, log_(log)
, discovery_sub_(node)
, discovery_pub_(node)
, cluster_size_(0)
, num_known_servers_(0)
, had_discovery_activity_(false)
{ }
/**
* If cluster_size is set to ClusterSizeUnknown, the class will try to read this parameter from the
* storage backend using key 'cluster_size'.
* Returns negative error code.
*/
int init(uint8_t init_cluster_size = ClusterSizeUnknown);
/**
* Whether such server has been discovered earlier.
*/
bool isKnownServer(NodeID node_id) const;
/**
* An invalid node ID will be returned if there's no such server.
* The local server is not listed there.
*/
NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const;
/**
* See next_index[] in Raft paper.
*/
Log::Index getServerNextIndex(NodeID server_node_id) const;
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment);
void decrementServerNextIndex(NodeID server_node_id);
/**
* See match_index[] in Raft paper.
*/
Log::Index getServerMatchIndex(NodeID server_node_id) const;
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index);
/**
* This method must be called when the current server becomes leader.
*/
void resetAllServerIndices();
/**
* This method returns true if there was at least one Discovery message received since last call.
*/
bool hadDiscoveryActivity()
{
if (had_discovery_activity_)
{
had_discovery_activity_ = false;
return true;
}
return false;
}
/**
* Number of known servers can only grow, and it never exceeds the cluster size value.
* This number does not include the local server.
*/
uint8_t getNumKnownServers() const { return num_known_servers_; }
/**
* Cluster size and quorum size are constant.
*/
uint8_t getClusterSize() const { return cluster_size_; }
uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); }
bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
};
/**
* Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log.
*/
class ILeaderLogCommitHandler
{
public:
/**
* This method will be invoked when a new log entry is committed (only if the local server is the current Leader).
*/
virtual void onEntryCommitted(const Entry& entry) = 0;
/**
* Assume false by default.
*/
virtual void onLeaderChange(bool local_node_is_leader) = 0;
virtual ~ILeaderLogCommitHandler() { }
};
/**
* This class implements log replication and voting.
* It does not implement client-server interaction at all; instead it just exposes a public method for adding
* allocation entries.
*/
class RaftCore : private TimerBase
{
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<AppendEntries::Request>&,
ServiceResponseDataStructure<AppendEntries::Response>&)>
AppendEntriesCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<AppendEntries>&)>
AppendEntriesResponseCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<RequestVote::Request>&,
ServiceResponseDataStructure<RequestVote::Response>&)>
RequestVoteCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<RequestVote>&)>
RequestVoteResponseCallback;
enum ServerState
{
ServerStateFollower,
ServerStateCandidate,
ServerStateLeader
};
struct PendingAppendEntriesFields
{
Log::Index prev_log_index;
Log::Index num_entries;
PendingAppendEntriesFields()
: prev_log_index(0)
, num_entries(0)
{ }
};
/*
* Constants
*/
const MonotonicDuration update_interval_; ///< AE requests will be issued at this rate
const MonotonicDuration base_activity_timeout_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
ILeaderLogCommitHandler& log_commit_handler_;
/*
* States
*/
PersistentState persistent_state_;
ClusterManager cluster_;
Log::Index commit_index_;
MonotonicTime last_activity_timestamp_;
bool active_mode_;
ServerState server_state_;
uint8_t next_server_index_; ///< Next server to query AE from
uint8_t num_votes_received_in_this_campaign_;
PendingAppendEntriesFields pending_append_entries_fields_;
/*
* Transport
*/
ServiceServer<AppendEntries, AppendEntriesCallback> append_entries_srv_;
ServiceClient<AppendEntries, AppendEntriesResponseCallback> append_entries_client_;
ServiceServer<RequestVote, RequestVoteCallback> request_vote_srv_;
enum { NumRequestVoteClients = ClusterManager::MaxClusterSize - 1 };
LazyConstructor<ServiceClient<RequestVote, RequestVoteResponseCallback> >
request_vote_clients_[NumRequestVoteClients];
void trace(TraceEvent event, int64_t argument) { tracer_.onEvent(event, argument); }
INode& getNode() { return append_entries_srv_.getNode(); }
const INode& getNode() const { return append_entries_srv_.getNode(); }
void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); }
bool isActivityTimedOut() const;
void handlePersistentStateUpdateError(int error);
void updateFollower();
void updateCandidate();
void updateLeader();
void switchState(ServerState new_state);
void setActiveMode(bool new_active);
void tryIncrementCurrentTermFromResponse(Term new_term);
void propagateCommitIndex();
void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response);
void handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result);
void handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request,
ServiceResponseDataStructure<RequestVote::Response>& response);
void handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result);
virtual void handleTimerEvent(const TimerEvent& event);
public:
RaftCore(INode& node,
IDynamicNodeIDStorageBackend& storage,
IDynamicNodeIDAllocationServerEventTracer& tracer,
ILeaderLogCommitHandler& log_commit_handler,
MonotonicDuration update_interval =
MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_REQUEST_TIMEOUT_MS),
MonotonicDuration base_activity_timeout =
MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_BASE_ELECTION_TIMEOUT_MS))
: TimerBase(node)
, update_interval_(update_interval)
, base_activity_timeout_(base_activity_timeout)
, tracer_(tracer)
, log_commit_handler_(log_commit_handler)
, persistent_state_(storage, tracer)
, cluster_(node, storage, persistent_state_.getLog(), tracer)
, commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero
, last_activity_timestamp_(node.getMonotonicTime())
, active_mode_(true)
, server_state_(ServerStateFollower)
, next_server_index_(0)
, num_votes_received_in_this_campaign_(0)
, append_entries_srv_(node)
, append_entries_client_(node)
, request_vote_srv_(node)
{
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i].construct<INode&>(node);
}
}
/**
* Once started, the logic runs in the background until destructor is called.
* @param cluster_size If set, this value will be used and stored in the persistent storage. If not set,
* value from the persistent storage will be used. If not set and there's no such key
* in the persistent storage, initialization will fail.
*/
int init(uint8_t cluster_size = ClusterManager::ClusterSizeUnknown);
/**
* This function is mostly needed for testing.
*/
Log::Index getCommitIndex() const { return commit_index_; }
/**
* Only the leader can call @ref appendLog().
*/
bool isLeader() const { return server_state_ == ServerStateLeader; }
/**
* Inserts one entry into log.
* Failures are tolerble because all operations are idempotent.
* This method will trigger an assertion failure and return error if the current node is not the leader.
*/
int appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID node_id);
/**
* This class is used to perform log searches.
*/
struct LogEntryInfo
{
Entry entry;
bool committed;
LogEntryInfo(const Entry& arg_entry, bool arg_committed)
: entry(arg_entry)
, committed(arg_committed)
{ }
};
/**
* This method is used by the allocator to query existence of certain entries in the Raft log.
* Predicate is a callable of the following prototype:
* bool (const LogEntryInfo& entry)
* Once the predicate returns true, the loop will be terminated and the method will return an initialized lazy
* contructor to the last visited entry; otherwise the constructor will not be initialized. In this case, lazy
* constructor is used as boost::optional.
* The log is always traversed from HIGH to LOW index values, i.e. entry 0 will be traversed last.
*/
template <typename Predicate>
inline LazyConstructor<LogEntryInfo> traverseLogFromEndUntil(const Predicate& predicate) const
{
UAVCAN_ASSERT(try_implicit_cast<bool>(predicate, true));
for (int index = static_cast<int>(persistent_state_.getLog().getLastIndex()); index >= 0; index--)
{
const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(Log::Index(index));
UAVCAN_ASSERT(entry != NULL);
const LogEntryInfo info(*entry, Log::Index(index) <= commit_index_);
if (predicate(info))
{
LazyConstructor<LogEntryInfo> ret;
ret.template construct<const LogEntryInfo&>(info);
return ret;
}
}
return LazyConstructor<LogEntryInfo>();
}
};
/**
* The main allocator must implement this interface.
*/
class IAllocationRequestHandler
{
public:
typedef protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id UniqueID;
virtual void handleAllocationRequest(const UniqueID& unique_id, NodeID preferred_node_id) = 0;
virtual ~IAllocationRequestHandler() { }
};
/**
* This class manages communication with allocation clients.
* Three-stage unique ID exchange is implemented here, as well as response publication.
*/
class AllocationRequestManager
{
typedef MethodBinder<AllocationRequestManager*,
void (AllocationRequestManager::*)
(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>&)>
AllocationCallback;
const MonotonicDuration stage_timeout_;
bool active_;
MonotonicTime last_message_timestamp_;
protocol::dynamic_node_id::Allocation::FieldTypes::unique_id current_unique_id_;
IAllocationRequestHandler& handler_;
Subscriber<protocol::dynamic_node_id::Allocation, AllocationCallback> allocation_sub_;
Publisher<protocol::dynamic_node_id::Allocation> allocation_pub_;
enum { InvalidStage = 0 };
static uint8_t detectRequestStage(const protocol::dynamic_node_id::Allocation& msg);
uint8_t getExpectedStage() const;
void broadcastIntermediateAllocationResponse();
void handleAllocation(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>& msg);
public:
AllocationRequestManager(INode& node, IAllocationRequestHandler& handler)
: stage_timeout_(MonotonicDuration::fromMSec(protocol::dynamic_node_id::Allocation::DEFAULT_REQUEST_PERIOD_MS))
, active_(false)
, handler_(handler)
, allocation_sub_(node)
, allocation_pub_(node)
{ }
int init();
void setActive(bool x);
bool isActive() const { return active_; }
int broadcastAllocationResponse(const IAllocationRequestHandler::UniqueID& unique_id, NodeID allocated_node_id);
};
} // namespace dynamic_node_id_server_impl
/**
* This class implements the top-level allocation logic and server API.
*/
class DynamicNodeIDAllocationServer : private dynamic_node_id_server_impl::IAllocationRequestHandler
, private dynamic_node_id_server_impl::ILeaderLogCommitHandler
{
typedef MethodBinder<DynamicNodeIDAllocationServer*,
void (DynamicNodeIDAllocationServer::*)
(const ReceivedDataStructure<protocol::NodeStatus>&)> NodeStatusCallback;
typedef MethodBinder<DynamicNodeIDAllocationServer*,
void (DynamicNodeIDAllocationServer::*)(const ServiceCallResult<protocol::GetNodeInfo>&)>
GetNodeInfoResponseCallback;
typedef Map<NodeID, uint8_t, 10> PendingGetNodeInfoAttemptsMap;
enum { MaxGetNodeInfoAttempts = 5 };
/*
* States
*/
PendingGetNodeInfoAttemptsMap pending_get_node_info_attempts_;
dynamic_node_id_server_impl::RaftCore raft_core_;
dynamic_node_id_server_impl::AllocationRequestManager allocation_request_manager_;
/*
* Transport
*/
Subscriber<protocol::NodeStatus, NodeStatusCallback> node_status_sub_;
ServiceClient<protocol::GetNodeInfo> get_node_info_client_;
INode& getNode() { return get_node_info_client_.getNode(); }
bool isNodeIDTaken(const NodeID node_id) const;
NodeID findFreeNodeID(const NodeID node_id) const;
void allocateNewNode(const UniqueID& unique_id, const NodeID preferred_node_id);
virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id);
virtual void onEntryCommitted(const protocol::dynamic_node_id::server::Entry& entry);
virtual void onLeaderChange(bool local_node_is_leader);
void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry& entry);
public:
DynamicNodeIDAllocationServer(INode& node,
IDynamicNodeIDStorageBackend& storage,
IDynamicNodeIDAllocationServerEventTracer& tracer)
: pending_get_node_info_attempts_(node.getAllocator())
, raft_core_(node, storage, tracer, *this)
, allocation_request_manager_(node, *this)
, node_status_sub_(node)
, get_node_info_client_(node)
{ }
enum { ClusterSizeUnknown = dynamic_node_id_server_impl::ClusterManager::ClusterSizeUnknown };
int init(uint8_t cluster_size = ClusterSizeUnknown);
};
}
#endif // UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_ALLOCATION_SERVER_HPP_INCLUDED
@@ -0,0 +1,255 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_ALLOCATION_REQUEST_MANAGER_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_ALLOCATION_REQUEST_MANAGER_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/node/subscriber.hpp>
#include <uavcan/node/publisher.hpp>
#include <uavcan/util/method_binder.hpp>
#include <uavcan/protocol/dynamic_node_id_server/types.hpp>
// UAVCAN types
#include <uavcan/protocol/dynamic_node_id/Allocation.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
/**
* The main allocator must implement this interface.
*/
class IAllocationRequestHandler
{
public:
virtual void handleAllocationRequest(const UniqueID& unique_id, NodeID preferred_node_id) = 0;
virtual ~IAllocationRequestHandler() { }
};
/**
* This class manages communication with allocation clients.
* Three-stage unique ID exchange is implemented here, as well as response publication.
*/
class AllocationRequestManager
{
typedef MethodBinder<AllocationRequestManager*,
void (AllocationRequestManager::*)
(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>&)>
AllocationCallback;
const MonotonicDuration stage_timeout_;
bool active_;
MonotonicTime last_message_timestamp_;
protocol::dynamic_node_id::Allocation::FieldTypes::unique_id current_unique_id_;
IAllocationRequestHandler& handler_;
Subscriber<protocol::dynamic_node_id::Allocation, AllocationCallback> allocation_sub_;
Publisher<protocol::dynamic_node_id::Allocation> allocation_pub_;
enum { InvalidStage = 0 };
static uint8_t detectRequestStage(const protocol::dynamic_node_id::Allocation& msg)
{
const uint8_t max_bytes_per_request = protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST;
if ((msg.unique_id.size() != max_bytes_per_request) &&
(msg.unique_id.size() != (msg.unique_id.capacity() - max_bytes_per_request * 2U)) &&
(msg.unique_id.size() != msg.unique_id.capacity())) // Future proofness for CAN FD
{
return InvalidStage;
}
if (msg.first_part_of_unique_id)
{
return 1; // Note that CAN FD frames can deliver the unique ID in one stage!
}
if (msg.unique_id.size() == protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 2;
}
if (msg.unique_id.size() < protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 3;
}
return InvalidStage;
}
uint8_t getExpectedStage() const
{
if (current_unique_id_.empty())
{
return 1;
}
if (current_unique_id_.size() >= (protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST * 2))
{
return 3;
}
if (current_unique_id_.size() >= protocol::dynamic_node_id::Allocation::MAX_LENGTH_OF_UNIQUE_ID_IN_REQUEST)
{
return 2;
}
return InvalidStage;
}
void broadcastIntermediateAllocationResponse()
{
UAVCAN_ASSERT(active_);
protocol::dynamic_node_id::Allocation msg;
msg.unique_id = current_unique_id_;
UAVCAN_ASSERT(msg.unique_id.size() < msg.unique_id.capacity());
UAVCAN_TRACE("AllocationRequestManager", "Intermediate response with %u bytes of unique ID",
unsigned(msg.unique_id.size()));
const int res = allocation_pub_.broadcast(msg);
if (res < 0)
{
allocation_pub_.getNode().registerInternalFailure("Dynamic allocation broadcast");
}
}
void handleAllocation(const ReceivedDataStructure<protocol::dynamic_node_id::Allocation>& msg)
{
if (!msg.isAnonymousTransfer())
{
return; // This is a response from another allocator, ignore
}
if (!active_)
{
return; // The local node is not the leader, ignore
}
/*
* Reset the expected stage on timeout
*/
if (msg.getMonotonicTimestamp() > (last_message_timestamp_ + stage_timeout_))
{
UAVCAN_TRACE("AllocationRequestManager", "Stage timeout, reset");
current_unique_id_.clear();
}
last_message_timestamp_ = msg.getMonotonicTimestamp();
/*
* Checking if request stage matches the expected stage
*/
const uint8_t request_stage = detectRequestStage(msg);
if (request_stage == InvalidStage)
{
return; // No way
}
const uint8_t expected_stage = getExpectedStage();
if (request_stage == InvalidStage)
{
current_unique_id_.clear();
return;
}
if (request_stage != expected_stage)
{
return; // Ignore - stage mismatch
}
const uint8_t max_expected_bytes = static_cast<uint8_t>(current_unique_id_.capacity() - current_unique_id_.size());
UAVCAN_ASSERT(max_expected_bytes > 0);
if (msg.unique_id.size() > max_expected_bytes)
{
return; // Malformed request
}
/*
* Updating the local state
*/
for (uint8_t i = 0; i < msg.unique_id.size(); i++)
{
current_unique_id_.push_back(msg.unique_id[i]);
}
/*
* Proceeding with allocation if possible
*/
if (current_unique_id_.size() == current_unique_id_.capacity())
{
UAVCAN_TRACE("AllocationRequestManager", "Allocation request received; preferred node ID: %d",
int(msg.node_id));
UniqueID unique_id;
copy(current_unique_id_.begin(), current_unique_id_.end(), unique_id.begin());
current_unique_id_.clear();
handler_.handleAllocationRequest(unique_id, msg.node_id);
}
else
{
broadcastIntermediateAllocationResponse();
}
}
public:
AllocationRequestManager(INode& node, IAllocationRequestHandler& handler)
: stage_timeout_(MonotonicDuration::fromMSec(protocol::dynamic_node_id::Allocation::DEFAULT_REQUEST_PERIOD_MS))
, active_(false)
, handler_(handler)
, allocation_sub_(node)
, allocation_pub_(node)
{ }
int init()
{
int res = allocation_pub_.init();
if (res < 0)
{
return res;
}
(void)allocation_pub_.setPriority(TransferPriorityLow);
res = allocation_sub_.start(AllocationCallback(this, &AllocationRequestManager::handleAllocation));
if (res < 0)
{
return res;
}
allocation_sub_.allowAnonymousTransfers();
return 0;
}
void setActive(bool x)
{
active_ = x;
if (!active_)
{
current_unique_id_.clear();
}
}
bool isActive() const { return active_; }
int broadcastAllocationResponse(const UniqueID& unique_id, NodeID allocated_node_id)
{
if (!active_)
{
UAVCAN_ASSERT(0);
return -ErrLogic;
}
protocol::dynamic_node_id::Allocation msg;
msg.unique_id.resize(msg.unique_id.capacity());
copy(unique_id.begin(), unique_id.end(), msg.unique_id.begin());
msg.node_id = allocated_node_id.get();
return allocation_pub_.broadcast(msg);
}
};
}
}
#endif // Include guard
@@ -0,0 +1,445 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/node/timer.hpp>
#include <uavcan/node/subscriber.hpp>
#include <uavcan/node/publisher.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/event.hpp>
#include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
// UAVCAN types
#include <uavcan/protocol/dynamic_node_id/server/Discovery.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* This class maintains the cluster state.
*/
class ClusterManager : private TimerBase
{
public:
enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize };
private:
typedef MethodBinder<ClusterManager*,
void (ClusterManager::*)
(const ReceivedDataStructure<Discovery>&)>
DiscoveryCallback;
struct Server
{
NodeID node_id;
Log::Index next_index;
Log::Index match_index;
Server()
: next_index(0)
, match_index(0)
{ }
void resetIndices(const Log& log)
{
next_index = Log::Index(log.getLastIndex() + 1U);
match_index = 0;
}
};
IStorageBackend& storage_;
IEventTracer& tracer_;
const Log& log_;
Subscriber<Discovery, DiscoveryCallback> discovery_sub_;
mutable Publisher<Discovery> discovery_pub_;
Server servers_[MaxClusterSize - 1]; ///< Minus one because the local server is not listed there.
uint8_t cluster_size_;
uint8_t num_known_servers_;
bool had_discovery_activity_;
static IStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; }
INode& getNode() { return discovery_sub_.getNode(); }
const INode& getNode() const { return discovery_sub_.getNode(); }
const Server* findServer(NodeID node_id) const { return const_cast<ClusterManager*>(this)->findServer(node_id); }
Server* findServer(NodeID node_id)
{
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
if (servers_[i].node_id == node_id)
{
return &servers_[i];
}
}
return NULL;
}
void addServer(NodeID node_id)
{
UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxClusterSize - 2));
if (!isKnownServer(node_id) && node_id.isUnicast())
{
tracer_.onEvent(TraceNewServerDiscovered, node_id.get());
servers_[num_known_servers_].node_id = node_id;
servers_[num_known_servers_].resetIndices(log_);
num_known_servers_ = static_cast<uint8_t>(num_known_servers_ + 1U);
}
else
{
UAVCAN_ASSERT(0);
}
}
virtual void handleTimerEvent(const TimerEvent&)
{
UAVCAN_ASSERT(num_known_servers_ < cluster_size_);
tracer_.onEvent(TraceDiscoveryBroadcast, num_known_servers_);
/*
* Filling the message
*/
Discovery msg;
msg.configured_cluster_size = cluster_size_;
msg.known_nodes.push_back(getNode().getNodeID().get()); // Putting ourselves at index 0
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
msg.known_nodes.push_back(servers_[i].node_id.get());
}
UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1));
/*
* Broadcasting
*/
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Broadcasting Discovery message; known nodes: %d of %d",
int(msg.known_nodes.size()), int(cluster_size_));
const int res = discovery_pub_.broadcast(msg);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Discovery broadcst failed: %d", res);
getNode().registerInternalFailure("Raft discovery broadcast");
}
/*
* Termination condition
*/
if (isClusterDiscovered())
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Discovery broadcasting timer stopped");
stop();
}
}
void handleDiscovery(const ReceivedDataStructure<Discovery>& msg)
{
tracer_.onEvent(TraceDiscoveryReceived, msg.getSrcNodeID().get());
/*
* Validating cluster configuration
* If there's a case of misconfiguration, the message will be ignored.
*/
if (msg.configured_cluster_size != cluster_size_)
{
tracer_.onEvent(TraceInvalidClusterSizeReceived, msg.configured_cluster_size);
getNode().registerInternalFailure("Bad Raft cluster size");
return;
}
had_discovery_activity_ = true;
/*
* Updating the set of known servers
*/
for (uint8_t i = 0; i < msg.known_nodes.size(); i++)
{
if (isClusterDiscovered())
{
break;
}
const NodeID node_id(msg.known_nodes[i]);
if (node_id.isUnicast() && !isKnownServer(node_id))
{
addServer(node_id);
}
}
/*
* Publishing a new Discovery request if the publishing server needs to learn about more servers.
*/
if (msg.configured_cluster_size > msg.known_nodes.size())
{
startDiscoveryPublishingTimerIfNotRunning();
}
}
void startDiscoveryPublishingTimerIfNotRunning()
{
if (!isRunning())
{
startPeriodic(MonotonicDuration::fromMSec(Discovery::BROADCASTING_INTERVAL_MS));
}
}
public:
enum { ClusterSizeUnknown = 0 };
/**
* @param node Needed to publish and subscribe to Discovery message
* @param storage Needed to read the cluster size parameter from the storage
* @param log Needed to initialize nextIndex[] values after elections
*/
ClusterManager(INode& node, IStorageBackend& storage, const Log& log, IEventTracer& tracer)
: TimerBase(node)
, storage_(storage)
, tracer_(tracer)
, log_(log)
, discovery_sub_(node)
, discovery_pub_(node)
, cluster_size_(0)
, num_known_servers_(0)
, had_discovery_activity_(false)
{ }
/**
* If cluster_size is set to ClusterSizeUnknown, the class will try to read this parameter from the
* storage backend using key 'cluster_size'.
* Returns negative error code.
*/
int init(uint8_t init_cluster_size = ClusterSizeUnknown)
{
/*
* Figuring out the cluster size
*/
if (init_cluster_size == ClusterSizeUnknown)
{
// Reading from the storage
StorageMarshaller io(storage_);
uint32_t value = 0;
int res = io.get(getStorageKeyForClusterSize(), value);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager",
"Cluster size is neither configured nor stored in the storage");
return res;
}
if ((value == 0) || (value > MaxClusterSize))
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Cluster size is invalid");
return -ErrFailure;
}
cluster_size_ = static_cast<uint8_t>(value);
}
else
{
if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize))
{
return -ErrInvalidParam;
}
cluster_size_ = init_cluster_size;
// Writing the storage
StorageMarshaller io(storage_);
uint32_t value = init_cluster_size;
int res = io.setAndGetBack(getStorageKeyForClusterSize(), value);
if ((res < 0) || (value != init_cluster_size))
{
UAVCAN_TRACE("dynamic_node_id_server_impl::ClusterManager", "Failed to store cluster size");
return -ErrFailure;
}
}
tracer_.onEvent(TraceClusterSizeInited, cluster_size_);
UAVCAN_ASSERT(cluster_size_ > 0);
UAVCAN_ASSERT(cluster_size_ <= MaxClusterSize);
/*
* Initializing pub/sub and timer
*/
int res = discovery_pub_.init();
if (res < 0)
{
return res;
}
(void)discovery_pub_.setPriority(TransferPriorityLow);
res = discovery_sub_.start(DiscoveryCallback(this, &ClusterManager::handleDiscovery));
if (res < 0)
{
return res;
}
startDiscoveryPublishingTimerIfNotRunning();
/*
* Misc
*/
resetAllServerIndices();
return 0;
}
/**
* Whether such server has been discovered.
*/
bool isKnownServer(NodeID node_id) const
{
if (node_id == getNode().getNodeID())
{
return true;
}
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID());
if (servers_[i].node_id == node_id)
{
return true;
}
}
return false;
}
/**
* An invalid node ID will be returned if there's no such server.
* The local server is not listed there.
*/
NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const
{
if (index < num_known_servers_)
{
return servers_[index].node_id;
}
return NodeID();
}
/**
* See next_index[] in Raft paper.
*/
Log::Index getServerNextIndex(NodeID server_node_id) const
{
const Server* const s = findServer(server_node_id);
if (s != NULL)
{
return s->next_index;
}
UAVCAN_ASSERT(0);
return 0;
}
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
{
Server* const s = findServer(server_node_id);
if (s != NULL)
{
s->next_index = Log::Index(s->next_index + increment);
}
else
{
UAVCAN_ASSERT(0);
}
}
void decrementServerNextIndex(NodeID server_node_id)
{
Server* const s = findServer(server_node_id);
if (s != NULL)
{
s->next_index--;
}
else
{
UAVCAN_ASSERT(0);
}
}
/**
* See match_index[] in Raft paper.
*/
Log::Index getServerMatchIndex(NodeID server_node_id) const
{
const Server* const s = findServer(server_node_id);
if (s != NULL)
{
return s->match_index;
}
UAVCAN_ASSERT(0);
return 0;
}
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
{
Server* const s = findServer(server_node_id);
if (s != NULL)
{
s->match_index = match_index;
}
else
{
UAVCAN_ASSERT(0);
}
}
/**
* This method must be called when the current server becomes leader.
*/
void resetAllServerIndices()
{
for (uint8_t i = 0; i < num_known_servers_; i++)
{
UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
servers_[i].resetIndices(log_);
}
}
/**
* This method returns true if there was at least one Discovery message received since last call.
*/
bool hadDiscoveryActivity()
{
if (had_discovery_activity_)
{
had_discovery_activity_ = false;
return true;
}
return false;
}
/**
* Number of known servers can only grow, and it never exceeds the cluster size value.
* This number does not include the local server.
*/
uint8_t getNumKnownServers() const { return num_known_servers_; }
/**
* Cluster size and quorum size are constant.
*/
uint8_t getClusterSize() const { return cluster_size_; }
uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); }
bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
};
}
}
}
#endif // Include guard
@@ -0,0 +1,127 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_EVENT_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_EVENT_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* @ref IEventTracer.
* Event codes cannot be changed, only new ones can be added.
*/
enum TraceCode
{
// Event name Argument
// 0
TraceError, // error code (may be negated)
TraceLogLastIndexRestored, // recovered last index value
TraceLogAppend, // index of new entry
TraceLogRemove, // new last index value
TraceCurrentTermRestored, // current term
// 5
TraceCurrentTermUpdate, // current term
TraceVotedForRestored, // value of votedFor
TraceVotedForUpdate, // value of votedFor
TraceDiscoveryBroadcast, // number of known servers
TraceNewServerDiscovered, // node ID of the new server
// 10
TraceDiscoveryReceived, // node ID of the sender
TraceClusterSizeInited, // cluster size
TraceInvalidClusterSizeReceived, // received cluster size
TraceRaftCoreInited, // update interval in usec
TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader
// 15
TraceRaftActiveSwitch, // 0 - Passive, 1 - Active
TraceRaftNewLogEntry, // node ID value
TraceRaftRequestIgnored, // node ID of the client
TraceRaftVoteRequestReceived, // node ID of the client
TraceRaftVoteRequestSucceeded, // node ID of the server
// 20
TraceRaftVoteRequestInitiation, // node ID of the server
TraceRaftPersistStateUpdateError, // negative error code
TraceRaftCommitIndexUpdate, // new commit index value
TraceRaftNewerTermInResponse, // new term value
TraceRaftNewEntryCommitted, // new commit index value
// 25
TraceRaftAppendEntriesCallFailure, // error code (may be negated)
NumTraceCodes
};
/**
* This interface allows the application to trace events that happen in the server.
*/
class IEventTracer
{
public:
#if UAVCAN_TOSTRING
/**
* It is safe to call this function with any argument.
* If the event code is out of range, an assertion failure will be triggered and an error text will be returned.
*/
static const char* getEventName(TraceCode code)
{
// import re
// make_strings = lambda s: ',\n'.join('"%s"' % x for x in re.findall(r'\ \ \ \ Trace([A-Za-z0-9]+),', s))
static const char* const Strings[NumTraceCodes] =
{
"Error",
"LogLastIndexRestored",
"LogAppend",
"LogRemove",
"CurrentTermRestored",
"CurrentTermUpdate",
"VotedForRestored",
"VotedForUpdate",
"DiscoveryBroadcast",
"NewServerDiscovered",
"DiscoveryReceived",
"ClusterSizeInited",
"InvalidClusterSizeReceived",
"RaftCoreInited",
"RaftStateSwitch",
"RaftActiveSwitch",
"RaftNewLogEntry",
"RaftRequestIgnored",
"RaftVoteRequestReceived",
"RaftVoteRequestSucceeded",
"RaftVoteRequestInitiation",
"RaftPersistStateUpdateError",
"RaftCommitIndexUpdate",
"RaftNewerTermInResponse",
"RaftNewEntryCommitted",
"RaftAppendEntriesCallFailure"
};
uavcan::StaticAssert<sizeof(Strings) / sizeof(Strings[0]) == NumTraceCodes>::check();
UAVCAN_ASSERT(code < NumTraceCodes);
return (code < NumTraceCodes) ? Strings[static_cast<unsigned>(code)] : "INVALID_EVENT_CODE";
}
#endif
/**
* The server invokes this method every time it believes that a noteworthy event has happened.
* The table of event codes can be found in the server sources.
* It is guaranteed that event code values will never change, but new ones can be added in future. This ensures
* full backward compatibility.
* @param event_code Event code, see the sources for the enum with values.
* @param event_argument Value associated with the event; its meaning depends on the event code.
*/
virtual void onEvent(TraceCode event_code, int64_t event_argument) = 0;
virtual ~IEventTracer() { }
};
}
}
}
#endif // Include guard
@@ -0,0 +1,318 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/event.hpp>
#include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* Raft log.
* This class transparently replicates its state to the storage backend, keeping the most recent state in memory.
* Writes are slow, reads are instantaneous.
*/
class Log
{
public:
typedef uint8_t Index;
enum { Capacity = NodeID::Max + 1 };
private:
IStorageBackend& storage_;
IEventTracer& tracer_;
Entry entries_[Capacity];
Index last_index_; // Index zero always contains an empty entry
static IStorageBackend::String getLastIndexKey() { return "log_last_index"; }
static IStorageBackend::String makeEntryKey(Index index, const char* postfix)
{
IStorageBackend::String str;
// "log0_foobar"
str += "log";
str.appendFormatted("%d", int(index));
str += "_";
str += postfix;
return str;
}
int readEntryFromStorage(Index index, Entry& out_entry)
{
const StorageMarshaller io(storage_);
// Term
if (io.get(makeEntryKey(index, "term"), out_entry.term) < 0)
{
return -ErrFailure;
}
// Unique ID
if (io.get(makeEntryKey(index, "unique_id"), out_entry.unique_id) < 0)
{
return -ErrFailure;
}
// Node ID
uint32_t node_id = 0;
if (io.get(makeEntryKey(index, "node_id"), node_id) < 0)
{
return -ErrFailure;
}
if (node_id > NodeID::Max)
{
return -ErrFailure;
}
out_entry.node_id = static_cast<uint8_t>(node_id);
return 0;
}
int writeEntryToStorage(Index index, const Entry& entry)
{
Entry temp = entry;
StorageMarshaller io(storage_);
// Term
if (io.setAndGetBack(makeEntryKey(index, "term"), temp.term) < 0)
{
return -ErrFailure;
}
// Unique ID
if (io.setAndGetBack(makeEntryKey(index, "unique_id"), temp.unique_id) < 0)
{
return -ErrFailure;
}
// Node ID
uint32_t node_id = entry.node_id;
if (io.setAndGetBack(makeEntryKey(index, "node_id"), node_id) < 0)
{
return -ErrFailure;
}
temp.node_id = static_cast<uint8_t>(node_id);
return (temp == entry) ? 0 : -ErrFailure;
}
int initEmptyLogStorage()
{
StorageMarshaller io(storage_);
/*
* Writing the zero entry - it must always be default-initialized
*/
entries_[0] = Entry();
int res = writeEntryToStorage(0, entries_[0]);
if (res < 0)
{
return res;
}
/*
* Initializing last index
* Last index must be written AFTER the zero entry, otherwise if the write fails here the storage will be
* left in an inconsistent state.
*/
last_index_ = 0;
uint32_t stored_index = 0;
res = io.setAndGetBack(getLastIndexKey(), stored_index);
if (res < 0)
{
return res;
}
if (stored_index != 0)
{
return -ErrFailure;
}
return 0;
}
public:
Log(IStorageBackend& storage, IEventTracer& tracer)
: storage_(storage)
, tracer_(tracer)
, last_index_(0)
{ }
/**
* Initialization is performed as follows (every step may fail and return an error):
* 1. Log is restored or initialized.
* 2. Current term is restored. If there was no current term stored and the log is empty, it will be initialized
* with zero.
* 3. VotedFor value is restored. If there was no VotedFor value stored, the log is empty, and the current term is
* zero, the value will be initialized with zero.
*/
int init()
{
StorageMarshaller io(storage_);
// Reading max index
{
uint32_t value = 0;
if (io.get(getLastIndexKey(), value) < 0)
{
if (storage_.get(getLastIndexKey()).empty())
{
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Initializing empty storage");
return initEmptyLogStorage();
}
else
{
// There's some data in the storage, but it cannot be parsed - reporting an error
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Failed to read last index");
return -ErrFailure;
}
}
if (value >= Capacity)
{
return -ErrFailure;
}
last_index_ = Index(value);
}
tracer_.onEvent(TraceLogLastIndexRestored, last_index_);
// Restoring log entries - note that index 0 always exists
for (Index index = 0; index <= last_index_; index++)
{
const int result = readEntryFromStorage(index, entries_[index]);
if (result < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Failed to read entry at index %u: %d",
unsigned(index), result);
return result;
}
}
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Restored %u log entries", unsigned(last_index_));
return 0;
}
/**
* This method invokes storage IO.
* Returned value indicates whether the entry was successfully appended.
*/
int append(const Entry& entry)
{
if ((last_index_ + 1) >= Capacity)
{
return -ErrLogic;
}
tracer_.onEvent(TraceLogAppend, last_index_ + 1U);
// If next operations fail, we'll get a dangling entry, but it's absolutely OK.
int res = writeEntryToStorage(Index(last_index_ + 1), entry);
if (res < 0)
{
return res;
}
// Updating the last index
StorageMarshaller io(storage_);
uint32_t new_last_index = last_index_ + 1U;
res = io.setAndGetBack(getLastIndexKey(), new_last_index);
if (res < 0)
{
return res;
}
if (new_last_index != last_index_ + 1U)
{
return -ErrFailure;
}
entries_[new_last_index] = entry;
last_index_ = Index(new_last_index);
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "New entry, index %u, node ID %u, term %u",
unsigned(last_index_), unsigned(entry.node_id), unsigned(entry.term));
return 0;
}
/**
* This method invokes storage IO.
* Returned value indicates whether the requested operation has been carried out successfully.
*/
int removeEntriesWhereIndexGreaterOrEqual(Index index)
{
UAVCAN_ASSERT(last_index_ < Capacity);
if (((index) >= Capacity) || (index <= 0))
{
return -ErrLogic;
}
uint32_t new_last_index = index - 1U;
tracer_.onEvent(TraceLogRemove, new_last_index);
if (new_last_index != last_index_)
{
StorageMarshaller io(storage_);
int res = io.setAndGetBack(getLastIndexKey(), new_last_index);
if (res < 0)
{
return res;
}
if (new_last_index != index - 1U)
{
return -ErrFailure;
}
UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Entries removed, last index %u --> %u",
unsigned(last_index_), unsigned(new_last_index));
last_index_ = Index(new_last_index);
}
// Removal operation leaves dangling entries in storage, it's OK
return 0;
}
int removeEntriesWhereIndexGreater(Index index)
{
return removeEntriesWhereIndexGreaterOrEqual(Index(index + 1U));
}
/**
* Returns nullptr if there's no such index.
* This method does not use storage IO.
*/
const Entry* getEntryAtIndex(Index index) const
{
UAVCAN_ASSERT(last_index_ < Capacity);
return (index <= last_index_) ? &entries_[index] : NULL;
}
Index getLastIndex() const { return last_index_; }
bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const
{
UAVCAN_ASSERT(last_index_ < Capacity);
// Terms are different - the one with higher term is more up-to-date
if (other_last_term != entries_[last_index_].term)
{
return other_last_term > entries_[last_index_].term;
}
// Terms are equal - longer log wins
return other_last_index >= last_index_;
}
};
}
}
}
#endif // Include guard
@@ -0,0 +1,225 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_PERSISTENT_STATE_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_PERSISTENT_STATE_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/event.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/log.hpp>
#include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* This class is a convenient container for persistent state variables defined by Raft.
* Writes are slow, reads are instantaneous.
*/
class PersistentState
{
IStorageBackend& storage_;
IEventTracer& tracer_;
Term current_term_;
NodeID voted_for_;
Log log_;
static IStorageBackend::String getCurrentTermKey() { return "current_term"; }
static IStorageBackend::String getVotedForKey() { return "voted_for"; }
public:
PersistentState(IStorageBackend& storage, IEventTracer& tracer)
: storage_(storage)
, tracer_(tracer)
, current_term_(0)
, log_(storage, tracer)
{ }
int init()
{
/*
* Reading log
*/
int res = log_.init();
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState", "Log init failed: %d", res);
return res;
}
const Entry* const last_entry = log_.getEntryAtIndex(log_.getLastIndex());
if (last_entry == NULL)
{
UAVCAN_ASSERT(0);
return -ErrLogic;
}
const bool log_is_empty = (log_.getLastIndex() == 0) && (last_entry->term == 0);
StorageMarshaller io(storage_);
/*
* Reading currentTerm
*/
if (storage_.get(getCurrentTermKey()).empty() && log_is_empty)
{
// First initialization
current_term_ = 0;
res = io.setAndGetBack(getCurrentTermKey(), current_term_);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState", "Failed to init current term: %d", res);
return res;
}
if (current_term_ != 0)
{
return -ErrFailure;
}
}
else
{
// Restoring
res = io.get(getCurrentTermKey(), current_term_);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState", "Failed to read current term: %d", res);
return res;
}
}
tracer_.onEvent(TraceCurrentTermRestored, current_term_);
if (current_term_ < last_entry->term)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState",
"Persistent storage is damaged: current term is less than term of the last log entry (%u < %u)",
unsigned(current_term_), unsigned(last_entry->term));
return -ErrLogic;
}
/*
* Reading votedFor
*/
if (storage_.get(getVotedForKey()).empty() && log_is_empty && (current_term_ == 0))
{
// First initialization
voted_for_ = NodeID(0);
uint32_t stored_voted_for = 0;
res = io.setAndGetBack(getVotedForKey(), stored_voted_for);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState", "Failed to init votedFor: %d", res);
return res;
}
if (stored_voted_for != 0)
{
return -ErrFailure;
}
}
else
{
// Restoring
uint32_t stored_voted_for = 0;
res = io.get(getVotedForKey(), stored_voted_for);
if (res < 0)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::PersistentState", "Failed to read votedFor: %d", res);
return res;
}
if (stored_voted_for > NodeID::Max)
{
return -ErrFailure;
}
voted_for_ = NodeID(uint8_t(stored_voted_for));
}
tracer_.onEvent(TraceVotedForRestored, voted_for_.get());
return 0;
}
Term getCurrentTerm() const { return current_term_; }
NodeID getVotedFor() const { return voted_for_; }
bool isVotedForSet() const { return voted_for_.isUnicast(); }
Log& getLog() { return log_; }
const Log& getLog() const { return log_; }
/**
* Invokes storage IO.
*/
int setCurrentTerm(Term term)
{
if (term < current_term_)
{
UAVCAN_ASSERT(0);
return -ErrInvalidParam;
}
tracer_.onEvent(TraceCurrentTermUpdate, term);
StorageMarshaller io(storage_);
Term tmp = term;
int res = io.setAndGetBack(getCurrentTermKey(), tmp);
if (res < 0)
{
return res;
}
if (tmp != term)
{
return -ErrFailure;
}
current_term_ = term;
return 0;
}
/**
* Invokes storage IO.
*/
int setVotedFor(NodeID node_id)
{
if (!node_id.isValid())
{
UAVCAN_ASSERT(0);
return -ErrInvalidParam;
}
tracer_.onEvent(TraceVotedForUpdate, node_id.get());
StorageMarshaller io(storage_);
uint32_t tmp = node_id.get();
int res = io.setAndGetBack(getVotedForKey(), tmp);
if (res < 0)
{
return res;
}
if (node_id.get() != tmp)
{
return -ErrFailure;
}
voted_for_ = node_id;
return 0;
}
int resetVotedFor() { return setVotedFor(NodeID(0)); }
};
}
}
}
#endif // Include guard
@@ -0,0 +1,860 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/util/method_binder.hpp>
#include <uavcan/node/timer.hpp>
#include <uavcan/node/service_server.hpp>
#include <uavcan/node/service_client.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/event.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/persistent_state.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp>
// UAVCAN types
#include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp>
#include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log.
*/
class ILeaderLogCommitHandler
{
public:
/**
* This method will be invoked when a new log entry is committed (only if the local server is the current Leader).
*/
virtual void onEntryCommitted(const Entry& entry) = 0;
/**
* Assume false by default.
*/
virtual void onLeaderChange(bool local_node_is_leader) = 0;
virtual ~ILeaderLogCommitHandler() { }
};
/**
* This class implements log replication and voting.
* It does not implement client-server interaction at all; instead it just exposes a public method for adding
* allocation entries.
*/
class RaftCore : private TimerBase
{
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<AppendEntries::Request>&,
ServiceResponseDataStructure<AppendEntries::Response>&)>
AppendEntriesCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<AppendEntries>&)>
AppendEntriesResponseCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<RequestVote::Request>&,
ServiceResponseDataStructure<RequestVote::Response>&)>
RequestVoteCallback;
typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<RequestVote>&)>
RequestVoteResponseCallback;
enum ServerState
{
ServerStateFollower,
ServerStateCandidate,
ServerStateLeader
};
struct PendingAppendEntriesFields
{
Log::Index prev_log_index;
Log::Index num_entries;
PendingAppendEntriesFields()
: prev_log_index(0)
, num_entries(0)
{ }
};
/*
* Constants
*/
const MonotonicDuration update_interval_; ///< AE requests will be issued at this rate
const MonotonicDuration base_activity_timeout_;
IEventTracer& tracer_;
ILeaderLogCommitHandler& log_commit_handler_;
/*
* States
*/
PersistentState persistent_state_;
ClusterManager cluster_;
Log::Index commit_index_;
MonotonicTime last_activity_timestamp_;
bool active_mode_;
ServerState server_state_;
uint8_t next_server_index_; ///< Next server to query AE from
uint8_t num_votes_received_in_this_campaign_;
PendingAppendEntriesFields pending_append_entries_fields_;
/*
* Transport
*/
ServiceServer<AppendEntries, AppendEntriesCallback> append_entries_srv_;
ServiceClient<AppendEntries, AppendEntriesResponseCallback> append_entries_client_;
ServiceServer<RequestVote, RequestVoteCallback> request_vote_srv_;
enum { NumRequestVoteClients = ClusterManager::MaxClusterSize - 1 };
LazyConstructor<ServiceClient<RequestVote, RequestVoteResponseCallback> >
request_vote_clients_[NumRequestVoteClients];
/*
* Methods
*/
void trace(TraceCode event, int64_t argument) { tracer_.onEvent(event, argument); }
INode& getNode() { return append_entries_srv_.getNode(); }
const INode& getNode() const { return append_entries_srv_.getNode(); }
void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); }
bool isActivityTimedOut() const
{
const int multiplier = static_cast<int>(getNode().getNodeID().get()) - 1;
const MonotonicDuration activity_timeout =
MonotonicDuration::fromUSec(base_activity_timeout_.toUSec() + update_interval_.toUSec() * multiplier);
return getNode().getMonotonicTime() > (last_activity_timestamp_ + activity_timeout);
}
void handlePersistentStateUpdateError(int error)
{
UAVCAN_ASSERT(error < 0);
trace(TraceRaftPersistStateUpdateError, error);
switchState(ServerStateFollower);
setActiveMode(false); // Goodnight sweet prince
registerActivity(); // Deferring reelections
}
void updateFollower()
{
if (active_mode_ && isActivityTimedOut())
{
switchState(ServerStateCandidate);
registerActivity();
}
}
void updateCandidate()
{
UAVCAN_ASSERT(active_mode_);
if (num_votes_received_in_this_campaign_ > 0)
{
const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize();
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Election complete, won: %d", int(won));
switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader
}
else
{
// Set votedFor, abort on failure
int res = persistent_state_.setVotedFor(getNode().getNodeID());
if (res < 0)
{
handlePersistentStateUpdateError(res);
return;
}
// Increment current term, abort on failure
res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U);
if (res < 0)
{
handlePersistentStateUpdateError(res);
return;
}
num_votes_received_in_this_campaign_ = 1; // Voting for self
RequestVote::Request req;
req.last_log_index = persistent_state_.getLog().getLastIndex();
req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term;
req.term = persistent_state_.getCurrentTerm();
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i);
if (!node_id.isUnicast())
{
break;
}
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Requesting vote from %d", int(node_id.get()));
trace(TraceRaftVoteRequestInitiation, node_id.get());
res = request_vote_clients_[i]->call(node_id, req);
if (res < 0)
{
trace(TraceError, res);
}
}
}
}
void updateLeader()
{
if (cluster_.getClusterSize() == 1)
{
setActiveMode(false); // Haha
}
if (active_mode_ || (next_server_index_ > 0))
{
const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_);
UAVCAN_ASSERT(node_id.isUnicast());
next_server_index_++;
if (next_server_index_ >= cluster_.getNumKnownServers())
{
next_server_index_ = 0;
}
AppendEntries::Request req;
req.term = persistent_state_.getCurrentTerm();
req.leader_commit = commit_index_;
req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U);
const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index);
if (entry == NULL)
{
UAVCAN_ASSERT(0);
handlePersistentStateUpdateError(-ErrLogic);
return;
}
req.prev_log_term = entry->term;
for (Log::Index index = cluster_.getServerNextIndex(node_id);
index <= persistent_state_.getLog().getLastIndex();
index++)
{
req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index));
if (req.entries.size() == req.entries.capacity())
{
break;
}
}
pending_append_entries_fields_.num_entries = req.entries.size();
pending_append_entries_fields_.prev_log_index = req.prev_log_index;
const int res = append_entries_client_.call(node_id, req);
if (res < 0)
{
trace(TraceRaftAppendEntriesCallFailure, res);
}
}
propagateCommitIndex();
}
void switchState(ServerState new_state)
{
if (server_state_ != new_state)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "State switch: %d --> %d",
int(server_state_), int(new_state));
trace(TraceRaftStateSwitch, new_state);
if ((ServerStateLeader == server_state_) ||
(ServerStateLeader == new_state))
{
log_commit_handler_.onLeaderChange(ServerStateLeader == new_state);
}
server_state_ = new_state;
cluster_.resetAllServerIndices();
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i]->cancel();
}
append_entries_client_.cancel();
}
}
void setActiveMode(bool new_active)
{
if (active_mode_ != new_active)
{
UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Active switch: %d --> %d",
int(active_mode_), int(new_active));
trace(TraceRaftActiveSwitch, new_active);
active_mode_ = new_active;
}
}
void tryIncrementCurrentTermFromResponse(Term new_term)
{
trace(TraceRaftNewerTermInResponse, new_term);
const int res = persistent_state_.setCurrentTerm(new_term);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
}
registerActivity(); // Deferring future elections
switchState(ServerStateFollower);
setActiveMode(false);
}
void propagateCommitIndex()
{
// Objective is to estimate whether we can safely increment commit index value
UAVCAN_ASSERT(server_state_ == ServerStateLeader);
UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
if (commit_index_ == persistent_state_.getLog().getLastIndex())
{
// All local entries are committed
bool commit_index_fully_replicated = true;
for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
{
const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
if (match_index != commit_index_)
{
commit_index_fully_replicated = false;
break;
}
}
const bool all_done = commit_index_fully_replicated && cluster_.isClusterDiscovered();
setActiveMode(!all_done); // Enable passive mode if commit index is the same on all nodes
}
else
{
// Not all local entries are committed
setActiveMode(true);
uint8_t num_nodes_with_next_log_entry_available = 1; // Local node
for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
{
const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
if (match_index > commit_index_)
{
num_nodes_with_next_log_entry_available++;
}
}
if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize())
{
commit_index_++;
UAVCAN_ASSERT(commit_index_ > 0); // Index 0 is always committed
trace(TraceRaftNewEntryCommitted, commit_index_);
// AT THIS POINT ALLOCATION IS COMPLETE
log_commit_handler_.onEntryCommitted(*persistent_state_.getLog().getEntryAtIndex(commit_index_));
}
}
}
void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
ServiceResponseDataStructure<AppendEntries::Response>& response)
{
if (!cluster_.isKnownServer(request.getSrcNodeID()))
{
trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
return;
}
registerActivity();
UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
/*
* Checking if our current state is up to date.
* The request will be ignored if persistent state cannot be updated.
*/
if (request.term > persistent_state_.getCurrentTerm())
{
int res = persistent_state_.setCurrentTerm(request.term);
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
res = persistent_state_.resetVotedFor();
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
switchState(ServerStateFollower);
setActiveMode(false);
if (!response.isResponseEnabled())
{
return;
}
}
/*
* Preparing the response
*/
response.term = persistent_state_.getCurrentTerm();
response.success = false;
/*
* Step 1 (see Raft paper)
* Reject the request if the leader has stale term number.
*/
if (request.term < persistent_state_.getCurrentTerm())
{
response.setResponseEnabled(true);
return;
}
switchState(ServerStateFollower);
setActiveMode(false);
/*
* Step 2
* Reject the request if the assumed log index does not exist on the local node.
*/
const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index);
if (prev_entry == NULL)
{
response.setResponseEnabled(true);
return;
}
/*
* Step 3
* Drop log entries if term number does not match.
* Ignore the request if the persistent state cannot be updated.
*/
if (prev_entry->term != request.prev_log_term)
{
const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index);
response.setResponseEnabled(res >= 0);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
}
return;
}
/*
* Step 4
* Update the log with new entries - this will possibly require to rewrite existing entries.
* Ignore the request if the persistent state cannot be updated.
*/
if (request.prev_log_index != persistent_state_.getLog().getLastIndex())
{
const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return;
}
}
for (uint8_t i = 0; i < request.entries.size(); i++)
{
const int res = persistent_state_.getLog().append(request.entries[i]);
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return; // Response will not be sent, the server will assume that we're dead
}
}
/*
* Step 5
* Update the commit index.
*/
if (request.leader_commit > commit_index_)
{
commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex());
trace(TraceRaftCommitIndexUpdate, commit_index_);
}
response.setResponseEnabled(true);
response.success = true;
}
void handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result)
{
UAVCAN_ASSERT(server_state_ == ServerStateLeader); // When state switches, all requests must be cancelled
if (!result.isSuccessful())
{
return;
}
if (result.response.term > persistent_state_.getCurrentTerm())
{
tryIncrementCurrentTermFromResponse(result.response.term);
}
else
{
if (result.response.success)
{
cluster_.incrementServerNextIndexBy(result.server_node_id, pending_append_entries_fields_.num_entries);
cluster_.setServerMatchIndex(result.server_node_id,
Log::Index(pending_append_entries_fields_.prev_log_index +
pending_append_entries_fields_.num_entries));
}
else
{
cluster_.decrementServerNextIndex(result.server_node_id);
}
}
pending_append_entries_fields_ = PendingAppendEntriesFields();
// Rest of the logic is implemented in periodic update handlers.
}
void handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request,
ServiceResponseDataStructure<RequestVote::Response>& response)
{
trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get());
if (!cluster_.isKnownServer(request.getSrcNodeID()))
{
trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
return;
}
UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
setActiveMode(true);
/*
* Checking if our current state is up to date.
* The request will be ignored if persistent state cannot be updated.
*/
if (request.term > persistent_state_.getCurrentTerm())
{
int res = persistent_state_.setCurrentTerm(request.term);
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
res = persistent_state_.resetVotedFor();
if (res < 0)
{
response.setResponseEnabled(false);
trace(TraceRaftPersistStateUpdateError, res);
}
switchState(ServerStateFollower);
if (!response.isResponseEnabled())
{
return;
}
}
/*
* Preparing the response
*/
response.term = persistent_state_.getCurrentTerm();
if (request.term < response.term)
{
response.vote_granted = false;
}
else
{
const bool can_vote = !persistent_state_.isVotedForSet() ||
(persistent_state_.getVotedFor() == request.getSrcNodeID());
const bool log_is_up_to_date =
persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term);
response.vote_granted = can_vote && log_is_up_to_date;
if (response.vote_granted)
{
registerActivity(); // This is necessary to avoid excessive elections
const int res = persistent_state_.setVotedFor(request.getSrcNodeID());
if (res < 0)
{
trace(TraceRaftPersistStateUpdateError, res);
response.setResponseEnabled(false);
return;
}
}
}
}
void handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result)
{
UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled
if (!result.isSuccessful())
{
return;
}
trace(TraceRaftVoteRequestSucceeded, result.server_node_id.get());
if (result.response.term > persistent_state_.getCurrentTerm())
{
tryIncrementCurrentTermFromResponse(result.response.term);
}
else
{
if (result.response.vote_granted)
{
num_votes_received_in_this_campaign_++;
}
}
// Rest of the logic is implemented in periodic update handlers.
// I'm no fan of asynchronous programming. At all.
}
virtual void handleTimerEvent(const TimerEvent&)
{
if (cluster_.hadDiscoveryActivity() && isLeader())
{
setActiveMode(true);
}
switch (server_state_)
{
case ServerStateFollower:
{
updateFollower();
break;
}
case ServerStateCandidate:
{
updateCandidate();
break;
}
case ServerStateLeader:
{
updateLeader();
break;
}
default:
{
UAVCAN_ASSERT(0);
break;
}
}
}
public:
RaftCore(INode& node,
IStorageBackend& storage,
IEventTracer& tracer,
ILeaderLogCommitHandler& log_commit_handler,
MonotonicDuration update_interval =
MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_REQUEST_TIMEOUT_MS),
MonotonicDuration base_activity_timeout =
MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_BASE_ELECTION_TIMEOUT_MS))
: TimerBase(node)
, update_interval_(update_interval)
, base_activity_timeout_(base_activity_timeout)
, tracer_(tracer)
, log_commit_handler_(log_commit_handler)
, persistent_state_(storage, tracer)
, cluster_(node, storage, persistent_state_.getLog(), tracer)
, commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero
, last_activity_timestamp_(node.getMonotonicTime())
, active_mode_(true)
, server_state_(ServerStateFollower)
, next_server_index_(0)
, num_votes_received_in_this_campaign_(0)
, append_entries_srv_(node)
, append_entries_client_(node)
, request_vote_srv_(node)
{
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
request_vote_clients_[i].construct<INode&>(node);
}
}
/**
* Once started, the logic runs in the background until destructor is called.
* @param cluster_size If set, this value will be used and stored in the persistent storage. If not set,
* value from the persistent storage will be used. If not set and there's no such key
* in the persistent storage, initialization will fail.
*/
int init(uint8_t cluster_size = ClusterManager::ClusterSizeUnknown)
{
/*
* Initializing state variables
*/
last_activity_timestamp_ = getNode().getMonotonicTime();
active_mode_ = true;
server_state_ = ServerStateFollower;
next_server_index_ = 0;
num_votes_received_in_this_campaign_ = 0;
commit_index_ = 0;
/*
* Initializing internals
*/
int res = persistent_state_.init();
if (res < 0)
{
return res;
}
res = cluster_.init(cluster_size);
if (res < 0)
{
return res;
}
res = append_entries_srv_.start(AppendEntriesCallback(this, &RaftCore::handleAppendEntriesRequest));
if (res < 0)
{
return res;
}
res = request_vote_srv_.start(RequestVoteCallback(this, &RaftCore::handleRequestVoteRequest));
if (res < 0)
{
return res;
}
res = append_entries_client_.init();
if (res < 0)
{
return res;
}
append_entries_client_.setCallback(AppendEntriesResponseCallback(this, &RaftCore::handleAppendEntriesResponse));
append_entries_client_.setRequestTimeout(update_interval_);
for (uint8_t i = 0; i < NumRequestVoteClients; i++)
{
res = request_vote_clients_[i]->init();
if (res < 0)
{
return res;
}
request_vote_clients_[i]->setCallback(RequestVoteResponseCallback(this, &RaftCore::handleRequestVoteResponse));
request_vote_clients_[i]->setRequestTimeout(update_interval_);
}
startPeriodic(update_interval_);
trace(TraceRaftCoreInited, update_interval_.toUSec());
UAVCAN_ASSERT(res >= 0);
return 0;
}
/**
* This function is mostly needed for testing.
*/
Log::Index getCommitIndex() const { return commit_index_; }
/**
* Only the leader can call @ref appendLog().
*/
bool isLeader() const { return server_state_ == ServerStateLeader; }
/**
* Inserts one entry into log.
* Failures are tolerble because all operations are idempotent.
* This method will trigger an assertion failure and return error if the current node is not the leader.
*/
int appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID node_id)
{
if (isLeader())
{
Entry entry;
entry.node_id = node_id.get();
entry.unique_id = unique_id;
entry.term = persistent_state_.getCurrentTerm();
trace(TraceRaftNewLogEntry, entry.node_id);
return persistent_state_.getLog().append(entry);
}
else
{
UAVCAN_ASSERT(0);
return -ErrLogic;
}
}
/**
* This class is used to perform log searches.
*/
struct LogEntryInfo
{
Entry entry;
bool committed;
LogEntryInfo(const Entry& arg_entry, bool arg_committed)
: entry(arg_entry)
, committed(arg_committed)
{ }
};
/**
* This method is used by the allocator to query existence of certain entries in the Raft log.
* Predicate is a callable of the following prototype:
* bool (const LogEntryInfo& entry)
* Once the predicate returns true, the loop will be terminated and the method will return an initialized lazy
* contructor to the last visited entry; otherwise the constructor will not be initialized. In this case, lazy
* constructor is used as boost::optional.
* The log is always traversed from HIGH to LOW index values, i.e. entry 0 will be traversed last.
*/
template <typename Predicate>
inline LazyConstructor<LogEntryInfo> traverseLogFromEndUntil(const Predicate& predicate) const
{
UAVCAN_ASSERT(try_implicit_cast<bool>(predicate, true));
for (int index = static_cast<int>(persistent_state_.getLog().getLastIndex()); index >= 0; index--)
{
const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(Log::Index(index));
UAVCAN_ASSERT(entry != NULL);
const LogEntryInfo info(*entry, Log::Index(index) <= commit_index_);
if (predicate(info))
{
LazyConstructor<LogEntryInfo> ret;
ret.template construct<const LogEntryInfo&>(info);
return ret;
}
}
return LazyConstructor<LogEntryInfo>();
}
};
}
}
}
#endif // Include guard
@@ -0,0 +1,201 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/node/timer.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/event.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp>
#include <uavcan/protocol/dynamic_node_id_server/allocation_request_manager.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
/**
* This class implements the top-level allocation logic and server API.
*/
class Server : private IAllocationRequestHandler
, private ILeaderLogCommitHandler
{
struct UniqueIDLogPredicate
{
const UniqueID unique_id;
UniqueIDLogPredicate(const UniqueID& uid)
: unique_id(uid)
{ }
bool operator()(const RaftCore::LogEntryInfo& info) const
{
return info.entry.unique_id == unique_id;
}
};
struct NodeIDLogPredicate
{
const NodeID node_id;
NodeIDLogPredicate(const NodeID& nid)
: node_id(nid)
{ }
bool operator()(const RaftCore::LogEntryInfo& info) const
{
return info.entry.node_id == node_id.get();
}
};
/*
* States
*/
INode& node_;
RaftCore raft_core_;
AllocationRequestManager allocation_request_manager_;
/*
* Methods
*/
bool isNodeIDTaken(const NodeID node_id) const
{
UAVCAN_TRACE("DynamicNodeIDServer", "Testing if node ID %d is taken", int(node_id.get()));
return raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id));
}
NodeID findFreeNodeID(const NodeID preferred_node_id) const
{
uint8_t candidate = preferred_node_id.isUnicast() ? preferred_node_id.get() : NodeID::Max;
// Up
while (candidate <= NodeID::Max)
{
if (!isNodeIDTaken(candidate))
{
return candidate;
}
candidate++;
}
candidate = preferred_node_id.isUnicast() ? preferred_node_id.get() : NodeID::Max;
candidate--; // This has been tested already
// Down
while (candidate > 0)
{
if (!isNodeIDTaken(candidate))
{
return candidate;
}
candidate--;
}
return NodeID();
}
void allocateNewNode(const UniqueID& unique_id, const NodeID preferred_node_id)
{
const NodeID allocated_node_id = findFreeNodeID(preferred_node_id);
if (!allocated_node_id.isUnicast())
{
UAVCAN_TRACE("DynamicNodeIDServer", "Request ignored - no free node ID left");
return;
}
UAVCAN_TRACE("DynamicNodeIDServer", "New node ID allocated: %d", int(allocated_node_id.get()));
const int res = raft_core_.appendLog(unique_id, allocated_node_id);
if (res < 0)
{
node_.registerInternalFailure("Raft log append");
}
}
virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id)
{
// TODO: allocation requests must not be served if the list of unidentified nodes is not empty
const LazyConstructor<RaftCore::LogEntryInfo> result =
raft_core_.traverseLogFromEndUntil(UniqueIDLogPredicate(unique_id));
if (result.isConstructed())
{
if (result->committed)
{
tryPublishAllocationResult(result->entry);
UAVCAN_TRACE("DynamicNodeIDServer",
"Allocation request served with existing allocation; node ID %d",
int(result->entry.node_id));
}
else
{
UAVCAN_TRACE("DynamicNodeIDServer",
"Allocation request ignored - allocation exists but not committed yet; node ID %d",
int(result->entry.node_id));
}
}
else
{
allocateNewNode(unique_id, preferred_node_id);
}
}
virtual void onEntryCommitted(const protocol::dynamic_node_id::server::Entry& entry)
{
tryPublishAllocationResult(entry);
}
virtual void onLeaderChange(bool local_node_is_leader)
{
UAVCAN_TRACE("DynamicNodeIDServer", "I am leader: %d", int(local_node_is_leader));
allocation_request_manager_.setActive(local_node_is_leader);
}
void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry& entry)
{
const int res = allocation_request_manager_.broadcastAllocationResponse(entry.unique_id, entry.node_id);
if (res < 0)
{
node_.registerInternalFailure("Dynamic allocation final broadcast");
}
}
public:
Server(INode& node,
IStorageBackend& storage,
IEventTracer& tracer)
: node_(node)
, raft_core_(node, storage, tracer, *this)
, allocation_request_manager_(node, *this)
{ }
int init(uint8_t cluster_size = ClusterManager::ClusterSizeUnknown)
{
int res = raft_core_.init(cluster_size);
if (res < 0)
{
return res;
}
res = allocation_request_manager_.init();
if (res < 0)
{
return res;
}
// TODO Initialize the node info transport
return 0;
}
};
}
}
}
#endif // Include guard
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_TYPES_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_TYPES_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/protocol/dynamic_node_id_server/types.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
namespace distributed
{
using namespace ::uavcan::protocol::dynamic_node_id::server;
/**
* Raft term
*/
typedef StorageType<Entry::FieldTypes::term>::Type Term;
}
}
}
#endif // Include guard
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_STORAGE_BACKEND_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_STORAGE_BACKEND_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/marshal/types.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
/**
* This interface is used by the server to read and write stable storage.
* The storage is represented as a key-value container, where keys and values are ASCII strings up to 32
* characters long, not including the termination byte. Fixed block size allows for absolutely straightforward
* and efficient implementation of storage backends, e.g. based on text files.
* Keys and values may contain only non-whitespace, non-formatting printable characters.
*/
class IStorageBackend
{
public:
/**
* Maximum length of keys and values. One pair takes twice as much space.
*/
enum { MaxStringLength = 32 };
/**
* It is guaranteed that the server will never require more than this number of key/value pairs.
* Total storage space needed is (MaxKeyValuePairs * MaxStringLength * 2), not including storage overhead.
*/
enum { MaxKeyValuePairs = 512 };
/**
* This type is used to exchange data chunks with the backend.
* It doesn't use any dynamic memory; please refer to the Array<> class for details.
*/
typedef Array<IntegerSpec<8, SignednessUnsigned, CastModeTruncate>, ArrayModeDynamic, MaxStringLength> String;
/**
* Read one value from the storage.
* If such key does not exist, or if read failed, an empty string will be returned.
* This method should not block for more than 50 ms.
*/
virtual String get(const String& key) const = 0;
/**
* Create or update value for the given key. Empty value should be regarded as a request to delete the key.
* This method should not block for more than 50 ms.
* Failures will be ignored.
*/
virtual void set(const String& key, const String& value) = 0;
virtual ~IStorageBackend() { }
};
}
}
#endif // Include guard
@@ -0,0 +1,181 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_STORAGE_MARSHALLER_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_STORAGE_MARSHALLER_HPP_INCLUDED
#include <uavcan/build_config.hpp>
#include <uavcan/debug.hpp>
#include <uavcan/protocol/dynamic_node_id_server/storage_backend.hpp>
#include <uavcan/protocol/dynamic_node_id_server/types.hpp>
#include <cstdlib>
#if UAVCAN_CPP_VERSION >= UAVCAN_CPP11
# include <cerrno>
#endif
namespace uavcan
{
namespace dynamic_node_id_server
{
/**
* This class extends the storage backend interface with serialization/deserialization functionality.
*/
class StorageMarshaller
{
IStorageBackend& storage_;
static uint8_t convertLowerCaseHexCharToNibble(char ch)
{
const uint8_t ret = (ch > '9') ? static_cast<uint8_t>(ch - 'a' + 10) : static_cast<uint8_t>(ch - '0');
UAVCAN_ASSERT(ret < 16);
return ret;
}
public:
StorageMarshaller(IStorageBackend& storage)
: storage_(storage)
{ }
/**
* These methods set the value and then immediately read it back.
* 1. Serialize the value.
* 2. Update the value on the backend.
* 3. Call get() with the same value argument.
* The caller then is supposed to check whether the argument has the desired value.
*/
int setAndGetBack(const IStorageBackend::String& key, uint32_t& inout_value)
{
IStorageBackend::String serialized;
serialized.appendFormatted("%llu", static_cast<unsigned long long>(inout_value));
UAVCAN_TRACE("StorageMarshaller", "Set %s = %s", key.c_str(), serialized.c_str());
storage_.set(key, serialized);
return get(key, inout_value);
}
int setAndGetBack(const IStorageBackend::String& key, UniqueID& inout_value)
{
IStorageBackend::String serialized;
for (uint8_t i = 0; i < UniqueID::MaxSize; i++)
{
serialized.appendFormatted("%02x", inout_value.at(i));
}
UAVCAN_ASSERT(serialized.size() == UniqueID::MaxSize * 2);
UAVCAN_TRACE("StorageMarshaller", "Set %s = %s", key.c_str(), serialized.c_str());
storage_.set(key, serialized);
return get(key, inout_value);
}
/**
* Getters simply read and deserialize the value.
* 1. Read the value back from the backend; return false if read fails.
* 2. Deserealize the newly read value; return false if deserialization fails.
* 3. Update the argument with deserialized value.
* 4. Return true.
*/
int get(const IStorageBackend::String& key, uint32_t& out_value) const
{
/*
* Reading the storage
*/
const IStorageBackend::String val = storage_.get(key);
if (val.empty())
{
return -ErrFailure;
}
/*
* Per MISRA C++ recommendations, checking the inputs instead of relying solely on errno.
* The value must contain only numeric characters.
*/
for (IStorageBackend::String::const_iterator it = val.begin(); it != val.end(); ++it)
{
if (static_cast<char>(*it) < '0' || static_cast<char>(*it) > '9')
{
return -ErrFailure;
}
}
if (val.size() > 10) // len(str(0xFFFFFFFF))
{
return -ErrFailure;
}
/*
* Conversion is carried out here
*/
#if UAVCAN_CPP_VERSION >= UAVCAN_CPP11
errno = 0;
#endif
#if UAVCAN_CPP_VERSION >= UAVCAN_CPP11
const unsigned long long x = std::strtoull(val.c_str(), NULL, 10);
#else
// There was no strtoull() before C++11, so we need to resort to strtoul()
StaticAssert<(sizeof(unsigned long) >= sizeof(uint32_t))>::check();
const unsigned long x = std::strtoul(val.c_str(), NULL, 10);
#endif
#if UAVCAN_CPP_VERSION >= UAVCAN_CPP11
if (errno != 0)
{
return -ErrFailure;
}
#endif
out_value = static_cast<uint32_t>(x);
return 0;
}
int get(const IStorageBackend::String& key, UniqueID& out_value) const
{
static const uint8_t NumBytes = UniqueID::MaxSize;
/*
* Reading the storage
*/
IStorageBackend::String val = storage_.get(key);
if (val.size() != NumBytes * 2)
{
return -ErrFailure;
}
/*
* The value must contain only hexadecimal numbers.
*/
val.convertToLowerCaseASCII();
for (IStorageBackend::String::const_iterator it = val.begin(); it != val.end(); ++it)
{
if ((static_cast<char>(*it) < '0' || static_cast<char>(*it) > '9') &&
(static_cast<char>(*it) < 'a' || static_cast<char>(*it) > 'f'))
{
return -ErrFailure;
}
}
/*
* Conversion is carried out here
*/
IStorageBackend::String::const_iterator it = val.begin();
for (uint8_t byte_index = 0; byte_index < NumBytes; byte_index++)
{
out_value[byte_index] =
static_cast<uint8_t>(convertLowerCaseHexCharToNibble(static_cast<char>(*it++)) << 4);
out_value[byte_index] =
static_cast<uint8_t>(convertLowerCaseHexCharToNibble(static_cast<char>(*it++)) | out_value[byte_index]);
}
return 0;
}
};
}
}
#endif // Include guard
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
*/
#ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_TYPES_HPP_INCLUDED
#define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_INTERNAL_TYPES_HPP_INCLUDED
#include <uavcan/build_config.hpp>
// UAVCAN types
#include <uavcan/protocol/dynamic_node_id/server/Entry.hpp>
namespace uavcan
{
namespace dynamic_node_id_server
{
using namespace ::uavcan::protocol::dynamic_node_id;
/**
* Node Unique ID
*/
typedef protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id UniqueID;
}
}
#endif // Include guard
File diff suppressed because it is too large Load Diff
@@ -10,11 +10,13 @@
#include <gtest/gtest.h>
#include <map>
#include <memory>
#include <uavcan/protocol/dynamic_node_id_allocation_server.hpp>
#include <uavcan/protocol/dynamic_node_id_server/distributed/server.hpp>
#include <uavcan/protocol/dynamic_node_id_client.hpp>
#include "helpers.hpp"
#include "../helpers.hpp"
class StorageBackend : public uavcan::IDynamicNodeIDStorageBackend
using uavcan::dynamic_node_id_server::UniqueID;
class StorageBackend : public uavcan::dynamic_node_id_server::IStorageBackend
{
typedef std::map<String, String> Container;
Container container_;
@@ -60,11 +62,11 @@ public:
};
class EventTracer : public uavcan::IDynamicNodeIDAllocationServerEventTracer
class EventTracer : public uavcan::dynamic_node_id_server::distributed::IEventTracer
{
const std::string id_;
virtual void onEvent(uavcan::uint16_t code, uavcan::int64_t argument)
virtual void onEvent(uavcan::dynamic_node_id_server::distributed::TraceCode code, uavcan::int64_t argument)
{
std::cout << "EVENT [" << id_ << "]\t" << code << "\t" << getEventName(code) << "\t" << argument << std::endl;
}
@@ -76,7 +78,7 @@ public:
};
class CommitHandler : public uavcan::dynamic_node_id_server_impl::ILeaderLogCommitHandler
class CommitHandler : public uavcan::dynamic_node_id_server::distributed::ILeaderLogCommitHandler
{
const std::string id_;
@@ -95,7 +97,7 @@ public:
};
class AllocationRequestHandler : public uavcan::dynamic_node_id_server_impl::IAllocationRequestHandler
class AllocationRequestHandler : public uavcan::dynamic_node_id_server::IAllocationRequestHandler
{
std::vector<std::pair<UniqueID, uavcan::NodeID> > requests_;
@@ -139,13 +141,13 @@ public:
static const unsigned NumEntriesInStorageWithEmptyLog = 4; // last index + 3 items per log entry
TEST(DynamicNodeIDAllocationServer, MarshallingStorageDecorator)
TEST(DynamicNodeIDServer, StorageMarshaller)
{
StorageBackend st;
uavcan::dynamic_node_id_server_impl::MarshallingStorageDecorator marshaler(st);
uavcan::dynamic_node_id_server::StorageMarshaller marshaler(st);
uavcan::IDynamicNodeIDStorageBackend::String key;
uavcan::dynamic_node_id_server::IStorageBackend::String key;
/*
* uint32
@@ -221,13 +223,15 @@ TEST(DynamicNodeIDAllocationServer, MarshallingStorageDecorator)
}
TEST(DynamicNodeIDAllocationServer, LogInitialization)
TEST(DynamicNodeIDServer, LogInitialization)
{
using namespace uavcan::dynamic_node_id_server::distributed;
EventTracer tracer;
// No log data in the storage - initializing empty log
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
uavcan::dynamic_node_id_server::distributed::Log log(storage, tracer);
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_LE(0, log.init());
@@ -241,7 +245,7 @@ TEST(DynamicNodeIDAllocationServer, LogInitialization)
// Nonempty storage, one item
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
storage.set("log_last_index", "0");
ASSERT_LE(-uavcan::ErrFailure, log.init()); // Expected one entry, none found
@@ -258,7 +262,7 @@ TEST(DynamicNodeIDAllocationServer, LogInitialization)
// Nonempty storage, broken data
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
storage.set("log_last_index", "foobar");
ASSERT_LE(-uavcan::ErrFailure, log.init()); // Bad value
@@ -281,7 +285,7 @@ TEST(DynamicNodeIDAllocationServer, LogInitialization)
// Nonempty storage, many items
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
storage.set("log_last_index", "1"); // 2 items - 0, 1
storage.set("log0_term", "0");
@@ -317,11 +321,13 @@ TEST(DynamicNodeIDAllocationServer, LogInitialization)
}
TEST(DynamicNodeIDAllocationServer, LogAppend)
TEST(DynamicNodeIDServer, LogAppend)
{
using namespace uavcan::dynamic_node_id_server::distributed;
EventTracer tracer;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_LE(0, log.init());
@@ -390,11 +396,13 @@ TEST(DynamicNodeIDAllocationServer, LogAppend)
}
TEST(DynamicNodeIDAllocationServer, LogRemove)
TEST(DynamicNodeIDServer, LogRemove)
{
using namespace uavcan::dynamic_node_id_server::distributed;
EventTracer tracer;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
/*
* Filling the log fully
@@ -446,15 +454,17 @@ TEST(DynamicNodeIDAllocationServer, LogRemove)
}
TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
TEST(DynamicNodeIDServer, PersistentStorageInitialization)
{
using namespace uavcan::dynamic_node_id_server::distributed;
EventTracer tracer;
/*
* First initialization
*/
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::PersistentState pers(storage, tracer);
PersistentState pers(storage, tracer);
ASSERT_EQ(0, storage.getNumKeys());
ASSERT_LE(0, pers.init());
@@ -472,12 +482,12 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
{
// This log is used to initialize the storage
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
ASSERT_LE(0, log.init());
}
ASSERT_LE(1, storage.getNumKeys());
uavcan::dynamic_node_id_server_impl::PersistentState pers(storage, tracer);
PersistentState pers(storage, tracer);
ASSERT_LE(0, pers.init());
@@ -494,14 +504,14 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
{
// This log is used to initialize the storage
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
ASSERT_LE(0, log.init());
}
ASSERT_LE(1, storage.getNumKeys());
storage.set("current_term", "1");
uavcan::dynamic_node_id_server_impl::PersistentState pers(storage, tracer);
PersistentState pers(storage, tracer);
ASSERT_GT(0, pers.init()); // Fails because current term is not zero
@@ -522,7 +532,7 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
{
// This log is used to initialize the storage
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
ASSERT_LE(0, log.init());
uavcan::protocol::dynamic_node_id::server::Entry entry;
@@ -533,7 +543,7 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
}
ASSERT_LE(4, storage.getNumKeys());
uavcan::dynamic_node_id_server_impl::PersistentState pers(storage, tracer);
PersistentState pers(storage, tracer);
ASSERT_GT(0, pers.init()); // Fails because log is not empty
@@ -556,11 +566,13 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorageInitialization)
}
TEST(DynamicNodeIDAllocationServer, PersistentStorage)
TEST(DynamicNodeIDServer, PersistentStorage)
{
using namespace uavcan::dynamic_node_id_server::distributed;
EventTracer tracer;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::PersistentState pers(storage, tracer);
PersistentState pers(storage, tracer);
/*
* Initializing
@@ -643,8 +655,10 @@ TEST(DynamicNodeIDAllocationServer, PersistentStorage)
}
TEST(DynamicNodeIDAllocationServer, ClusterManagerInitialization)
TEST(DynamicNodeIDServer, ClusterManagerInitialization)
{
using namespace uavcan::dynamic_node_id_server::distributed;
const unsigned MaxClusterSize =
uavcan::protocol::dynamic_node_id::server::Discovery::FieldTypes::known_nodes::MaxSize;
@@ -658,10 +672,10 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerInitialization)
*/
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log, tracer);
ClusterManager mgr(nodes.a, storage, log, tracer);
// Too big
ASSERT_GT(0, mgr.init(MaxClusterSize + 1));
@@ -683,10 +697,10 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerInitialization)
*/
{
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log, tracer);
ClusterManager mgr(nodes.a, storage, log, tracer);
// Not configured
ASSERT_GT(0, mgr.init());
@@ -700,17 +714,19 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerInitialization)
}
TEST(DynamicNodeIDAllocationServer, ClusterManagerOneServer)
TEST(DynamicNodeIDServer, ClusterManagerOneServer)
{
using namespace uavcan::dynamic_node_id_server::distributed;
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::Discovery> _reg1;
EventTracer tracer;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log, tracer);
ClusterManager mgr(nodes.a, storage, log, tracer);
/*
* Pub and sub
@@ -775,17 +791,19 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerOneServer)
}
TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers)
TEST(DynamicNodeIDServer, ClusterManagerThreeServers)
{
using namespace uavcan::dynamic_node_id_server::distributed;
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<uavcan::protocol::dynamic_node_id::server::Discovery> _reg1;
EventTracer tracer;
StorageBackend storage;
uavcan::dynamic_node_id_server_impl::Log log(storage, tracer);
Log log(storage, tracer);
InterlinkedTestNodesWithSysClock nodes;
uavcan::dynamic_node_id_server_impl::ClusterManager mgr(nodes.a, storage, log, tracer);
ClusterManager mgr(nodes.a, storage, log, tracer);
/*
* Pub and sub
@@ -891,9 +909,9 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers)
}
TEST(DynamicNodeIDAllocationServer, RaftCoreBasic)
TEST(DynamicNodeIDServer, RaftCoreBasic)
{
using namespace uavcan::dynamic_node_id_server_impl;
using namespace uavcan::dynamic_node_id_server::distributed;
using namespace uavcan::protocol::dynamic_node_id::server;
uavcan::GlobalDataTypeRegistry::instance().reset();
@@ -970,28 +988,24 @@ TEST(DynamicNodeIDAllocationServer, RaftCoreBasic)
}
TEST(DynamicNodeIDAllocationServer, EventCodeToString)
TEST(DynamicNodeIDServer, EventCodeToString)
{
using uavcan::IDynamicNodeIDAllocationServerEventTracer;
using namespace uavcan::dynamic_node_id_server_impl;
using namespace uavcan::dynamic_node_id_server::distributed;
using namespace uavcan::dynamic_node_id_server;
// Simply checking some error codes
ASSERT_STREQ("Error",
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceError));
ASSERT_STREQ("RaftActiveSwitch",
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftActiveSwitch));
ASSERT_STREQ("RaftAppendEntriesCallFailure",
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceRaftAppendEntriesCallFailure));
ASSERT_STREQ("DiscoveryReceived",
IDynamicNodeIDAllocationServerEventTracer::getEventName(TraceDiscoveryReceived));
ASSERT_STREQ("Error", IEventTracer::getEventName(TraceError));
ASSERT_STREQ("RaftActiveSwitch", IEventTracer::getEventName(TraceRaftActiveSwitch));
ASSERT_STREQ("RaftAppendEntriesCallFailure", IEventTracer::getEventName(TraceRaftAppendEntriesCallFailure));
ASSERT_STREQ("DiscoveryReceived", IEventTracer::getEventName(TraceDiscoveryReceived));
}
TEST(DynamicNodeIDAllocationServer, AllocationRequestManager)
TEST(DynamicNodeIDServer, AllocationRequestManager)
{
using namespace uavcan::protocol::dynamic_node_id;
using namespace uavcan::protocol::dynamic_node_id::server;
using namespace uavcan::dynamic_node_id_server_impl;
using namespace uavcan::dynamic_node_id_server;
uavcan::GlobalDataTypeRegistry::instance().reset();
uavcan::DefaultDataTypeRegistrator<Allocation> _reg1;
@@ -1045,9 +1059,9 @@ TEST(DynamicNodeIDAllocationServer, AllocationRequestManager)
}
TEST(DynamicNodeIDAllocationServer, Main)
TEST(DynamicNodeIDServer, Main)
{
using namespace uavcan::dynamic_node_id_server_impl;
using namespace uavcan::dynamic_node_id_server;
using namespace uavcan::protocol::dynamic_node_id;
using namespace uavcan::protocol::dynamic_node_id::server;
@@ -1066,7 +1080,7 @@ TEST(DynamicNodeIDAllocationServer, Main)
/*
* Server
*/
uavcan::DynamicNodeIDAllocationServer server(nodes.a, storage, tracer);
distributed::Server server(nodes.a, storage, tracer);
ASSERT_LE(0, server.init(1));
@@ -1092,22 +1106,14 @@ TEST(DynamicNodeIDAllocationServer, Main)
}
TEST(DynamicNodeIDAllocationServer, ObjectSizes)
TEST(DynamicNodeIDServer, ObjectSizes)
{
std::cout << "Log: "
<< sizeof(uavcan::dynamic_node_id_server_impl::Log) << std::endl;
using namespace uavcan::dynamic_node_id_server;
std::cout << "PersistentState: "
<< sizeof(uavcan::dynamic_node_id_server_impl::PersistentState) << std::endl;
std::cout << "ClusterManager: "
<< sizeof(uavcan::dynamic_node_id_server_impl::ClusterManager) << std::endl;
std::cout << "RaftCore: "
<< sizeof(uavcan::dynamic_node_id_server_impl::RaftCore) << std::endl;
std::cout << "AllocationRequestManager: "
<< sizeof(uavcan::dynamic_node_id_server_impl::AllocationRequestManager) << std::endl;
std::cout << "DynamicNodeIDAllocationServer: " << sizeof(uavcan::DynamicNodeIDAllocationServer) << std::endl;
std::cout << "distributed::Log: " << sizeof(distributed::Log) << std::endl;
std::cout << "distributed::PersistentState: " << sizeof(distributed::PersistentState) << std::endl;
std::cout << "distributed::ClusterManager: " << sizeof(distributed::ClusterManager) << std::endl;
std::cout << "distributed::RaftCore: " << sizeof(distributed::RaftCore) << std::endl;
std::cout << "distributed::Server: " << sizeof(distributed::Server) << std::endl;
std::cout << "AllocationRequestManager: " << sizeof(AllocationRequestManager) << std::endl;
}