Log commit callback - needed by the main allocator class to broadcast allocation responses

This commit is contained in:
Pavel Kirienko
2015-05-08 14:57:27 +03:00
parent a7c09ed714
commit 43f849cc10
2 changed files with 24 additions and 1 deletions
@@ -441,6 +441,20 @@ public:
bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
};
/**
* Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log.
*/
class ILeaderLogCommitHandler
{
public:
/**
* This method will be invoked when a new log entry is committed (only if the local server is the current Leader).
*/
virtual void onEntryCommitted(const Entry& entry) = 0;
virtual ~ILeaderLogCommitHandler() { }
};
/**
* This class implements log replication and voting.
* It does not implement client-server interaction at all; instead it just exposes a public method for adding
@@ -487,6 +501,7 @@ class RaftCore : private TimerBase
const MonotonicDuration base_activity_timeout_;
IDynamicNodeIDAllocationServerEventTracer& tracer_;
ILeaderLogCommitHandler& log_commit_handler_;
/*
* States
@@ -549,7 +564,10 @@ class RaftCore : private TimerBase
virtual void handleTimerEvent(const TimerEvent& event);
public:
RaftCore(INode& node, IDynamicNodeIDStorageBackend& storage, IDynamicNodeIDAllocationServerEventTracer& tracer,
RaftCore(INode& node,
IDynamicNodeIDStorageBackend& storage,
IDynamicNodeIDAllocationServerEventTracer& tracer,
ILeaderLogCommitHandler& log_commit_handler,
MonotonicDuration update_interval =
MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_REQUEST_TIMEOUT_MS),
MonotonicDuration base_activity_timeout =
@@ -558,6 +576,7 @@ public:
, update_interval_(update_interval)
, base_activity_timeout_(base_activity_timeout)
, tracer_(tracer)
, log_commit_handler_(log_commit_handler)
, persistent_state_(storage, tracer)
, cluster_(node, storage, persistent_state_.getLog(), tracer)
, commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero
@@ -1059,6 +1059,7 @@ void RaftCore::propagateCommitIndex()
{
// Objective is to estimate whether we can safely increment commit index value
UAVCAN_ASSERT(server_state_ == ServerStateLeader);
UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
if (commit_index_ == persistent_state_.getLog().getLastIndex())
{
@@ -1098,8 +1099,11 @@ void RaftCore::propagateCommitIndex()
if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize())
{
commit_index_++;
UAVCAN_ASSERT(commit_index_ > 0); // Index 0 is always committed
trace(TraceRaftNewEntryCommitted, commit_index_);
// AT THIS POINT ALLOCATION IS COMPLETE
log_commit_handler_.onEntryCommitted(*persistent_state_.getLog().getEntryAtIndex(commit_index_));
}
}
}