From 4f99302c2b27c4d8fb886b4e1ab179ea24a3f709 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Wed, 10 May 2023 11:56:44 +0400 Subject: [PATCH] raft_topology: add barrier_and_drain cmd We use utils::phased_barrier. The new phase is started each time the version is updated. We track all instances of token_metadata, when an instance is destroyed the corresponding phased_barrier::operation is released. --- idl/storage_service.idl.hh | 1 + locator/token_metadata.cc | 18 +++++++++++++++++- locator/token_metadata.hh | 31 ++++++++++++++++++++++++++++++- service/storage_service.cc | 28 ++++++++++++++++++++++++++++ service/topology_state_machine.cc | 3 +++ service/topology_state_machine.hh | 9 +++++---- 6 files changed, 84 insertions(+), 6 deletions(-) diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index ecfd53ec48..0073caea18 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -10,6 +10,7 @@ namespace service { struct raft_topology_cmd { enum class command: uint8_t { barrier, + barrier_and_drain, stream_ranges, fence_old_reads }; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 8e86572d90..391616b8ed 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -77,6 +77,7 @@ private: // need to apply fencing or not. // The initial valid version is 1; token_metadata::version_t _version = 0; + token_metadata::version_tracker_t _version_tracker; // Note: if any member is added to this class // clone_async() must be updated to copy that member. @@ -299,6 +300,9 @@ public: } _version = version; } + void set_version_tracker(token_metadata::version_tracker_t tracker) { + _version_tracker = std::move(tracker); + } friend class token_metadata; }; @@ -1166,12 +1170,24 @@ void token_metadata::set_version(version_t version) { _impl->set_version(version); } +void +token_metadata::set_version_tracker(version_tracker_t tracker) { + _impl->set_version_tracker(std::move(tracker)); +} void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { - on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); + on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); } + + if (_shared->get_version() > tmptr->get_version()) { + on_internal_error(tlogger, format("shared_token_metadata: must not set decreasing version: {} -> {}", _shared->get_version(), tmptr->get_version())); + } else if (_shared->get_version() < tmptr->get_version()) { + _stale_versions_in_use = _versions_barrier.advance_and_await(); + } + _shared = std::move(tmptr); + _shared->set_version_tracker(_versions_barrier.start()); } future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 638545907b..2ff8a94a87 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -22,9 +22,11 @@ #include #include #include "range.hh" +#include #include #include #include +#include "utils/phased_barrier.hh" #include "service/topology_state_machine.hh" #include "locator/types.hh" @@ -80,6 +82,7 @@ public: }; using inet_address = gms::inet_address; using version_t = service::topology::version_t; + using version_tracker_t = utils::phased_barrier::operation; private: friend class token_metadata_ring_splitter; class tokens_iterator { @@ -281,6 +284,9 @@ public: void set_version(version_t version); friend class token_metadata_impl; + friend class shared_token_metadata; +private: + void set_version_tracker(version_tracker_t tracker); }; struct topology_change_info { @@ -310,13 +316,32 @@ class shared_token_metadata { mutable_token_metadata_ptr _shared; token_metadata_lock_func _lock_func; + // We use this barrier during the transition to a new token_metadata version to ensure that the + // system stops using previous versions. Here are the key points: + // * A new phase begins when a mutable_token_metadata_ptr passed to shared_token_metadata::set has + // a higher version than the current one. + // * Each shared_token_metadata::set call initiates an operation on the barrier. If multiple calls + // have the same version, multiple operations may be initiated with the same phase. + // * The operation is stored within the new token_metadata instance (token_metadata::set_version_tracker), + // and it completes when the instance is destroyed. + // * The method shared_token_metadata::stale_versions_in_use can be used to wait for the phase + // transition to complete. Once this future resolves, there will be no token_metadata instances + // with versions lower than the current one. + // * Multiple new phases (version upgrades) can be started before accessing stale_versions_in_use. + // However, stale_versions_in_use waits for all previous phases to finish, as advance_and_await + // includes its own invocation as an operation in the new phase. + utils::phased_barrier _versions_barrier; + shared_future<> _stale_versions_in_use{make_ready_future<>()}; + public: // used to construct the shared object as a sharded<> instance // lock_func returns semaphore_units<> explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg) : _shared(make_token_metadata_ptr(std::move(cfg))) , _lock_func(std::move(lock_func)) - { } + { + _shared->set_version_tracker(_versions_barrier.start()); + } shared_token_metadata(const shared_token_metadata& x) = delete; shared_token_metadata(shared_token_metadata&& x) = default; @@ -327,6 +352,10 @@ public: void set(mutable_token_metadata_ptr tmptr) noexcept; + future<> stale_versions_in_use() const { + return _stale_versions_in_use.get_future(); + } + // Token metadata changes are serialized // using the schema_tables merge_lock. // diff --git a/service/storage_service.cc b/service/storage_service.cc index 558171115a..dcd5c82c78 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4913,6 +4913,34 @@ future storage_service::raft_topology_cmd_handler(shar // we already did read barrier above result.status = raft_topology_cmd_result::command_status::success; break; + case raft_topology_cmd::command::barrier_and_drain: { + const auto version = _topology_state_machine._topology.version; + co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { + const auto current_version = ss._shared_token_metadata.get()->get_version(); + slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", + version, current_version); + + // This shouldn't happen under normal operation, it's only plausible + // if the topology change coordinator has + // moved to another node and managed to update the topology + // parallel to this method. The previous coordinator + // should be inactive now, so it won't observe this + // exception. By returning exception we aim + // to reveal any other conditions where this may arise. + if (current_version != version) { + co_await coroutine::return_exception(std::runtime_error( + ::format("raft topology: command::barrier_and_drain, the version has changed, " + "version {}, current_version {}, the topology change coordinator " + " had probably migrated to another node", + version, current_version))); + } + + co_await ss._shared_token_metadata.stale_versions_in_use(); + slogger.debug("raft_topology_cmd::barrier_and_drain done"); + }); + result.status = raft_topology_cmd_result::command_status::success; + } + break; case raft_topology_cmd::command::stream_ranges: { const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second; auto tstate = _topology_state_machine._topology.tstate; diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index f39cfb0aab..1692975c55 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -137,6 +137,9 @@ std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd case raft_topology_cmd::command::barrier: os << "barrier"; break; + case raft_topology_cmd::command::barrier_and_drain: + os << "barrier_and_drain"; + break; case raft_topology_cmd::command::stream_ranges: os << "stream_ranges"; break; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 5b74248ba4..6562b3e723 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -134,10 +134,11 @@ struct topology_state_machine { // Raft leader uses this command to drive bootstrap process on other nodes struct raft_topology_cmd { enum class command: uint16_t { - barrier, // request to wait for the latest topology - stream_ranges, // reqeust to stream data, return when streaming is - // done - fence_old_reads // wait for all reads started before to complete + barrier, // request to wait for the latest topology + barrier_and_drain, // same + drain requests which use previous versions + stream_ranges, // reqeust to stream data, return when streaming is + // done + fence_old_reads // wait for all reads started before to complete }; command cmd; };