diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index 8bc71db869..8260f55093 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -137,6 +137,7 @@ enum TraceEvent TraceRaftPersistStateUpdateError, // negative error code TraceRaftCommitIndexUpdate, // new commit index value TraceRaftNewerTermInResponse, // new term value + TraceRaftNewEntryCommitted, // new commit index value NumTraceEventCodes }; @@ -521,6 +522,8 @@ class RaftCore : private TimerBase void tryIncrementCurrentTermFromResponse(Term new_term); + void propagateCommitIndex(); + void handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response); diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index cf23c22265..85d83010dc 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -883,6 +883,7 @@ void RaftCore::updateCandidate(const MonotonicTime& current_time) void RaftCore::updateLeader(const MonotonicTime& current_time) { (void)current_time; + propagateCommitIndex(); } void RaftCore::switchState(const ServerState new_state) @@ -931,6 +932,55 @@ void RaftCore::tryIncrementCurrentTermFromResponse(Term new_term) switchState(ServerStateFollower); } +void RaftCore::propagateCommitIndex() +{ + // Objective is to estimate whether we can safely increment commit index value + UAVCAN_ASSERT(server_state_ == ServerStateLeader); + + if (commit_index_ == persistent_state_.getLog().getLastIndex()) + { + // All local entries are committed + bool commit_index_fully_replicated = true; + + for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++) + { + const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i)); + if (match_index != commit_index_) + { + commit_index_fully_replicated = false; + break; + } + } + + if (commit_index_fully_replicated && cluster_.isClusterDiscovered()) + { + setActiveMode(false); // Commit index is the same on all nodes, enabling passive mode + } + } + else + { + // Not all local entries are committed + setActiveMode(true); + + uint8_t num_nodes_with_next_log_entry_available = 1; // Local node + for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++) + { + const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i)); + if (match_index > commit_index_) + { + num_nodes_with_next_log_entry_available++; + } + } + + if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize()) + { + commit_index_++; + trace(TraceRaftNewEntryCommitted, commit_index_); + // AT THIS POINT ALLOCATION IS COMPLETE + } + } +} + void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response) { diff --git a/libuavcan/test/protocol/dynamic_node_id_allocation_server.cpp b/libuavcan/test/protocol/dynamic_node_id_allocation_server.cpp index 83e85618ad..fa5bd78964 100644 --- a/libuavcan/test/protocol/dynamic_node_id_allocation_server.cpp +++ b/libuavcan/test/protocol/dynamic_node_id_allocation_server.cpp @@ -801,3 +801,12 @@ TEST(DynamicNodeIDAllocationServer, ClusterManagerThreeServers) ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(2)); ASSERT_EQ(log.getLastIndex() + 1, mgr.getServerNextIndex(127)); } + + +TEST(DynamicNodeIDAllocationServer, ObjectSizes) +{ + std::cout << "Log: " << sizeof(uavcan::dynamic_node_id_server_impl::Log) << std::endl; + std::cout << "PersistentState: " << sizeof(uavcan::dynamic_node_id_server_impl::PersistentState) << std::endl; + std::cout << "ClusterManager: " << sizeof(uavcan::dynamic_node_id_server_impl::ClusterManager) << std::endl; + std::cout << "RaftCore: " << sizeof(uavcan::dynamic_node_id_server_impl::RaftCore) << std::endl; +}