diff --git a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp index 2f2e4fa58d..f826fd9189 100644 --- a/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp +++ b/libuavcan/include/uavcan/protocol/dynamic_node_id_allocation_server.hpp @@ -124,11 +124,17 @@ enum TraceEvent // 10 TraceDiscoveryReceived, // node ID of the sender TraceClusterSizeInited, // cluster size + TraceInvalidClusterSizeReceived, // received cluster size TraceRaftCoreInited, // update interval in usec TraceRaftStateSwitch, // 0 - Follower, 1 - Candidate, 2 - Leader - TraceRaftModeSwitch, // 0 - Passive, 1 - Active // 15 + TraceRaftModeSwitch, // 0 - Passive, 1 - Active TraceRaftNewLogEntry, // node ID value + TraceRaftRequestIgnored, // node ID of the client + TraceRaftVoteRequestReceived, // node ID of the client + TraceRaftPersistStateUpdateError, // negative error code + // 20 + TraceRaftCommitIndexUpdate, // new commit index value NumTraceEventCodes }; @@ -227,6 +233,7 @@ public: * Returned value indicates whether the requested operation has been carried out successfully. */ int removeEntriesWhereIndexGreaterOrEqual(Index index); + int removeEntriesWhereIndexGreater(Index index); /** * Returns nullptr if there's no such index. @@ -269,6 +276,7 @@ public: Term getCurrentTerm() const { return current_term_; } NodeID getVotedFor() const { return voted_for_; } + bool isVotedForSet() const { return voted_for_.isUnicast(); } Log& getLog() { return log_; } const Log& getLog() const { return log_; } @@ -282,6 +290,7 @@ public: * Invokes storage IO. */ int setVotedFor(NodeID node_id); + int resetVotedFor() { return setVotedFor(NodeID(0)); } }; /** @@ -331,7 +340,6 @@ class ClusterManager : private TimerBase Server* findServer(NodeID node_id); const Server* findServer(NodeID node_id) const; - bool isKnownServer(NodeID node_id) const; void addServer(NodeID node_id); virtual void handleTimerEvent(const TimerEvent&); @@ -368,6 +376,11 @@ public: */ int init(uint8_t init_cluster_size = ClusterSizeUnknown); + /** + * Whether such server has been discovered earlier. + */ + bool isKnownServer(NodeID node_id) const; + /** * An invalid node ID will be returned if there's no such server. * The local server is not listed there. @@ -482,11 +495,14 @@ class RaftCore : private TimerBase INode& getNode() { return append_entries_srv_.getNode(); } + void registerActivity() { last_activity_timestamp_ = getNode().getMonotonicTime(); } + void updateFollower(const MonotonicTime& current_time); void updateCandidate(const MonotonicTime& current_time); void updateLeader(const MonotonicTime& current_time); void switchState(ServerState new_state); + void setActiveMode(bool new_active); void handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response); diff --git a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp index 56bd9e8773..83f6a12ee0 100644 --- a/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp +++ b/libuavcan/src/protocol/uc_dynamic_node_id_allocation_server.cpp @@ -344,32 +344,41 @@ int Log::removeEntriesWhereIndexGreaterOrEqual(Index index) { UAVCAN_ASSERT(last_index_ < Capacity); - if (((index) >= Capacity) || (index == 0)) + if (((index) >= Capacity) || (index <= 0)) { return -ErrLogic; } - tracer_.onEvent(TraceLogRemove, index - 1U); - - MarshallingStorageDecorator io(storage_); uint32_t new_last_index = index - 1U; - int res = io.setAndGetBack(getLastIndexKey(), new_last_index); - if (res < 0) + + tracer_.onEvent(TraceLogRemove, new_last_index); + + if (new_last_index != last_index_) { - return res; + MarshallingStorageDecorator io(storage_); + int res = io.setAndGetBack(getLastIndexKey(), new_last_index); + if (res < 0) + { + return res; + } + if (new_last_index != index - 1U) + { + return -ErrFailure; + } + UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Entries removed, last index %u --> %u", + unsigned(last_index_), unsigned(new_last_index)); + last_index_ = Index(new_last_index); } - if (new_last_index != index - 1U) - { - return -ErrFailure; - } - UAVCAN_TRACE("dynamic_node_id_server_impl::Log", "Entries removed, last index %u --> %u", - unsigned(last_index_), unsigned(new_last_index)); - last_index_ = Index(new_last_index); // Removal operation leaves dangling entries in storage, it's OK return 0; } +int Log::removeEntriesWhereIndexGreater(Index index) +{ + return removeEntriesWhereIndexGreaterOrEqual(Index(index + 1U)); +} + const Entry* Log::getEntryAtIndex(Index index) const { UAVCAN_ASSERT(last_index_ < Capacity); @@ -577,24 +586,6 @@ const ClusterManager::Server* ClusterManager::findServer(NodeID node_id) const return const_cast(this)->findServer(node_id); } -bool ClusterManager::isKnownServer(NodeID node_id) const -{ - if (node_id == getNode().getNodeID()) - { - return true; - } - for (uint8_t i = 0; i < num_known_servers_; i++) - { - UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); - UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID()); - if (servers_[i].node_id == node_id) - { - return true; - } - } - return false; -} - void ClusterManager::addServer(NodeID node_id) { UAVCAN_ASSERT((num_known_servers_ + 1) < (MaxServers - 2)); @@ -666,6 +657,7 @@ void ClusterManager::handleDiscovery(const ReceivedDataStructure& msg */ if (msg.configured_cluster_size != cluster_size_) { + tracer_.onEvent(TraceInvalidClusterSizeReceived, msg.configured_cluster_size); getNode().registerInternalFailure("Bad Raft cluster size"); return; } @@ -778,6 +770,24 @@ int ClusterManager::init(const uint8_t init_cluster_size) return 0; } +bool ClusterManager::isKnownServer(NodeID node_id) const +{ + if (node_id == getNode().getNodeID()) + { + return true; + } + for (uint8_t i = 0; i < num_known_servers_; i++) + { + UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); + UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID()); + if (servers_[i].node_id == node_id) + { + return true; + } + } + return false; +} + NodeID ClusterManager::getRemoteServerNodeIDAtIndex(uint8_t index) const { if (index < num_known_servers_) @@ -875,19 +885,162 @@ void RaftCore::updateLeader(const MonotonicTime& current_time) (void)current_time; } -void RaftCore::switchState(ServerState new_state) +void RaftCore::switchState(const ServerState new_state) { if (server_state_ != new_state) { + UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "State switch: %d --> %d", + int(server_state_), int(new_state)); trace(TraceRaftStateSwitch, new_state); + + server_state_ = new_state; + + cluster_.resetAllServerIndices(); + + if (server_state_ == ServerStateFollower) + { + setActiveMode(false); + } + } +} + +void RaftCore::setActiveMode(const bool new_active) +{ + if (active_mode_ != new_active) + { + UAVCAN_TRACE("dynamic_node_id_server_impl::RaftCore", "Mode switch: %d --> %d", + int(active_mode_), int(new_active)); + trace(TraceRaftModeSwitch, new_active); + + active_mode_ = new_active; } } void RaftCore::handleAppendEntriesRequest(const ReceivedDataStructure& request, ServiceResponseDataStructure& response) { - (void)request; - (void)response; + if (!cluster_.isKnownServer(request.getSrcNodeID())) + { + trace(TraceRaftRequestIgnored, request.getSrcNodeID().get()); + return; + } + + registerActivity(); + + UAVCAN_ASSERT(response.isResponseEnabled()); // This is default + + /* + * Checking if our current state is up to date. + * The request will be ignored if persistent state cannot be updated. + */ + if (request.term > persistent_state_.getCurrentTerm()) + { + int res = persistent_state_.setCurrentTerm(request.term); + if (res < 0) + { + response.setResponseEnabled(false); + trace(TraceRaftPersistStateUpdateError, res); + } + + res = persistent_state_.resetVotedFor(); + if (res < 0) + { + response.setResponseEnabled(false); + trace(TraceRaftPersistStateUpdateError, res); + } + + switchState(ServerStateFollower); + + if (!response.isResponseEnabled()) + { + return; + } + } + + /* + * Preparing the response + */ + response.term = persistent_state_.getCurrentTerm(); + response.success = false; + + /* + * Step 1 (see Raft paper) + * Reject the request if the leader has stale term number. + */ + if (request.term < persistent_state_.getCurrentTerm()) + { + response.setResponseEnabled(true); + return; + } + + switchState(ServerStateFollower); + + /* + * Step 2 + * Reject the request if the assumed log index does not exist on the local node. + */ + const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index); + if (prev_entry == NULL) + { + response.setResponseEnabled(true); + return; + } + + /* + * Step 3 + * Drop log entries if term number does not match. + * Ignore the request if the persistent state cannot be updated. + */ + if (prev_entry->term != request.prev_log_term) + { + const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index); + response.setResponseEnabled(res >= 0); + if (res < 0) + { + trace(TraceRaftPersistStateUpdateError, res); + } + return; + } + + /* + * Step 4 + * Update the log with new entries - this will possibly require to rewrite existing entries. + * Ignore the request if the persistent state cannot be updated. + */ + if (request.prev_log_index != persistent_state_.getLog().getLastIndex()) + { + const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index); + if (res < 0) + { + trace(TraceRaftPersistStateUpdateError, res); + response.setResponseEnabled(false); + return; + } + } + + for (uint8_t i = 0; i < request.entries.size(); i++) + { + const int res = persistent_state_.getLog().append(request.entries[i]); + if (res < 0) + { + trace(TraceRaftPersistStateUpdateError, res); + response.setResponseEnabled(false); + return; // Response will not be sent, the server will assume that we're dead + } + } + + /* + * Step 5 + * Update the commit index. + */ + if (request.leader_commit > commit_index_) + { + commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex()); + trace(TraceRaftCommitIndexUpdate, commit_index_); + } + + response.setResponseEnabled(true); + response.success = true; } void RaftCore::handleAppendEntriesResponse(const ServiceCallResult& result) @@ -898,8 +1051,77 @@ void RaftCore::handleAppendEntriesResponse(const ServiceCallResult& request, ServiceResponseDataStructure& response) { - (void)request; - (void)response; + trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get()); + + if (!cluster_.isKnownServer(request.getSrcNodeID())) + { + trace(TraceRaftRequestIgnored, request.getSrcNodeID().get()); + return; + } + + UAVCAN_ASSERT(response.isResponseEnabled()); // This is default + + setActiveMode(true); + + /* + * Checking if our current state is up to date. + * The request will be ignored if persistent state cannot be updated. + */ + if (request.term > persistent_state_.getCurrentTerm()) + { + int res = persistent_state_.setCurrentTerm(request.term); + if (res < 0) + { + response.setResponseEnabled(false); + trace(TraceRaftPersistStateUpdateError, res); + } + + res = persistent_state_.resetVotedFor(); + if (res < 0) + { + response.setResponseEnabled(false); + trace(TraceRaftPersistStateUpdateError, res); + } + + switchState(ServerStateFollower); + + if (!response.isResponseEnabled()) + { + return; + } + } + + /* + * Preparing the response + */ + response.term = persistent_state_.getCurrentTerm(); + + if (request.term < response.term) + { + response.vote_granted = false; + } + else + { + const bool can_vote = !persistent_state_.isVotedForSet() || + (persistent_state_.getVotedFor() == request.getSrcNodeID()); + const bool log_is_up_to_date = + persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term); + + response.vote_granted = can_vote && log_is_up_to_date; + + if (response.vote_granted) + { + registerActivity(); // This is necessary to avoid excessive elections + + const int res = persistent_state_.setVotedFor(request.getSrcNodeID()); + if (res < 0) + { + trace(TraceRaftPersistStateUpdateError, res); + response.setResponseEnabled(false); + return; + } + } + } } void RaftCore::handleRequestVoteResponse(const ServiceCallResult& result)