diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index 8c2ec8ce1e..e04b8e7043 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -441,6 +441,20 @@ public: bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); } }; +/** + * Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log. + */ +class ILeaderLogCommitHandler +{ +public: + /** + * This method will be invoked when a new log entry is committed (only if the local server is the current Leader). + */ + virtual void onEntryCommitted(const Entry& entry) = 0; + + virtual ~ILeaderLogCommitHandler() { } +}; + /** * This class implements log replication and voting. * It does not implement client-server interaction at all; instead it just exposes a public method for adding @@ -487,6 +501,7 @@ class RaftCore : private TimerBase const MonotonicDuration base_activity_timeout_; IDynamicNodeIDAllocationServerEventTracer& tracer_; + ILeaderLogCommitHandler& log_commit_handler_; /* * States @@ -549,7 +564,10 @@ class RaftCore : private TimerBase virtual void handleTimerEvent(const TimerEvent& event); public: - RaftCore(INode& node, IDynamicNodeIDStorageBackend& storage, IDynamicNodeIDAllocationServerEventTracer& tracer, + RaftCore(INode& node, + IDynamicNodeIDStorageBackend& storage, + IDynamicNodeIDAllocationServerEventTracer& tracer, + ILeaderLogCommitHandler& log_commit_handler, MonotonicDuration update_interval = MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_REQUEST_TIMEOUT_MS), MonotonicDuration base_activity_timeout = @@ -558,6 +576,7 @@ public: , update_interval_(update_interval) , base_activity_timeout_(base_activity_timeout) , tracer_(tracer) + , log_commit_handler_(log_commit_handler) , persistent_state_(storage, tracer) , cluster_(node, storage, persistent_state_.getLog(), tracer) , commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index 6ff4bfc61c..4638ac75fb 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -1059,6 +1059,7 @@ void RaftCore::propagateCommitIndex() { // Objective is to estimate whether we can safely increment commit index value UAVCAN_ASSERT(server_state_ == ServerStateLeader); + UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex()); if (commit_index_ == persistent_state_.getLog().getLastIndex()) { @@ -1098,8 +1099,11 @@ void RaftCore::propagateCommitIndex() if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize()) { commit_index_++; + UAVCAN_ASSERT(commit_index_ > 0); // Index 0 is always committed trace(TraceRaftNewEntryCommitted, commit_index_); + // AT THIS POINT ALLOCATION IS COMPLETE + log_commit_handler_.onEntryCommitted(*persistent_state_.getLog().getEntryAtIndex(commit_index_)); } } }