raft_topology: add barrier_and_drain cmd

We use utils::phased_barrier. The new phase
is started each time the version is updated.
We track all instances of token_metadata,
when an instance is destroyed the
corresponding phased_barrier::operation is
released.
This commit is contained in:
Petr Gusev
2023-05-10 11:56:44 +04:00
parent 253d8a8c65
commit 4f99302c2b
6 changed files with 84 additions and 6 deletions

View File

@@ -10,6 +10,7 @@ namespace service {
struct raft_topology_cmd {
enum class command: uint8_t {
barrier,
barrier_and_drain,
stream_ranges,
fence_old_reads
};

View File

@@ -77,6 +77,7 @@ private:
// need to apply fencing or not.
// The initial valid version is 1;
token_metadata::version_t _version = 0;
token_metadata::version_tracker_t _version_tracker;
// Note: if any member is added to this class
// clone_async() must be updated to copy that member.
@@ -299,6 +300,9 @@ public:
}
_version = version;
}
void set_version_tracker(token_metadata::version_tracker_t tracker) {
_version_tracker = std::move(tracker);
}
friend class token_metadata;
};
@@ -1166,12 +1170,24 @@ void
token_metadata::set_version(version_t version) {
_impl->set_version(version);
}
void
token_metadata::set_version_tracker(version_tracker_t tracker) {
_impl->set_version_tracker(std::move(tracker));
}
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
}
if (_shared->get_version() > tmptr->get_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set decreasing version: {} -> {}", _shared->get_version(), tmptr->get_version()));
} else if (_shared->get_version() < tmptr->get_version()) {
_stale_versions_in_use = _versions_barrier.advance_and_await();
}
_shared = std::move(tmptr);
_shared->set_version_tracker(_versions_barrier.start());
}
future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func) {

View File

@@ -22,9 +22,11 @@
#include <boost/range/iterator_range.hpp>
#include <boost/icl/interval.hpp>
#include "range.hh"
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include "utils/phased_barrier.hh"
#include "service/topology_state_machine.hh"
#include "locator/types.hh"
@@ -80,6 +82,7 @@ public:
};
using inet_address = gms::inet_address;
using version_t = service::topology::version_t;
using version_tracker_t = utils::phased_barrier::operation;
private:
friend class token_metadata_ring_splitter;
class tokens_iterator {
@@ -281,6 +284,9 @@ public:
void set_version(version_t version);
friend class token_metadata_impl;
friend class shared_token_metadata;
private:
void set_version_tracker(version_tracker_t tracker);
};
struct topology_change_info {
@@ -310,13 +316,32 @@ class shared_token_metadata {
mutable_token_metadata_ptr _shared;
token_metadata_lock_func _lock_func;
// We use this barrier during the transition to a new token_metadata version to ensure that the
// system stops using previous versions. Here are the key points:
// * A new phase begins when a mutable_token_metadata_ptr passed to shared_token_metadata::set has
// a higher version than the current one.
// * Each shared_token_metadata::set call initiates an operation on the barrier. If multiple calls
// have the same version, multiple operations may be initiated with the same phase.
// * The operation is stored within the new token_metadata instance (token_metadata::set_version_tracker),
// and it completes when the instance is destroyed.
// * The method shared_token_metadata::stale_versions_in_use can be used to wait for the phase
// transition to complete. Once this future resolves, there will be no token_metadata instances
// with versions lower than the current one.
// * Multiple new phases (version upgrades) can be started before accessing stale_versions_in_use.
// However, stale_versions_in_use waits for all previous phases to finish, as advance_and_await
// includes its own invocation as an operation in the new phase.
utils::phased_barrier _versions_barrier;
shared_future<> _stale_versions_in_use{make_ready_future<>()};
public:
// used to construct the shared object as a sharded<> instance
// lock_func returns semaphore_units<>
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg)
: _shared(make_token_metadata_ptr(std::move(cfg)))
, _lock_func(std::move(lock_func))
{ }
{
_shared->set_version_tracker(_versions_barrier.start());
}
shared_token_metadata(const shared_token_metadata& x) = delete;
shared_token_metadata(shared_token_metadata&& x) = default;
@@ -327,6 +352,10 @@ public:
void set(mutable_token_metadata_ptr tmptr) noexcept;
future<> stale_versions_in_use() const {
return _stale_versions_in_use.get_future();
}
// Token metadata changes are serialized
// using the schema_tables merge_lock.
//

View File

@@ -4913,6 +4913,34 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
// we already did read barrier above
result.status = raft_topology_cmd_result::command_status::success;
break;
case raft_topology_cmd::command::barrier_and_drain: {
const auto version = _topology_state_machine._topology.version;
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
const auto current_version = ss._shared_token_metadata.get()->get_version();
slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
version, current_version);
// This shouldn't happen under normal operation, it's only plausible
// if the topology change coordinator has
// moved to another node and managed to update the topology
// parallel to this method. The previous coordinator
// should be inactive now, so it won't observe this
// exception. By returning exception we aim
// to reveal any other conditions where this may arise.
if (current_version != version) {
co_await coroutine::return_exception(std::runtime_error(
::format("raft topology: command::barrier_and_drain, the version has changed, "
"version {}, current_version {}, the topology change coordinator "
" had probably migrated to another node",
version, current_version)));
}
co_await ss._shared_token_metadata.stale_versions_in_use();
slogger.debug("raft_topology_cmd::barrier_and_drain done");
});
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case raft_topology_cmd::command::stream_ranges: {
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
auto tstate = _topology_state_machine._topology.tstate;

View File

@@ -137,6 +137,9 @@ std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd
case raft_topology_cmd::command::barrier:
os << "barrier";
break;
case raft_topology_cmd::command::barrier_and_drain:
os << "barrier_and_drain";
break;
case raft_topology_cmd::command::stream_ranges:
os << "stream_ranges";
break;

View File

@@ -134,10 +134,11 @@ struct topology_state_machine {
// Raft leader uses this command to drive bootstrap process on other nodes
struct raft_topology_cmd {
enum class command: uint16_t {
barrier, // request to wait for the latest topology
stream_ranges, // reqeust to stream data, return when streaming is
// done
fence_old_reads // wait for all reads started before to complete
barrier, // request to wait for the latest topology
barrier_and_drain, // same + drain requests which use previous versions
stream_ranges, // reqeust to stream data, return when streaming is
// done
fence_old_reads // wait for all reads started before to complete
};
command cmd;
};