diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index fd2cf2e88c..e33ada92a4 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -273,9 +273,9 @@ future<> range_streamer::stream_async() { nr_ranges_streamed, nr_ranges_streamed + ranges_to_stream.size(), nr_ranges_total); auto ranges_streamed = ranges_to_stream.size(); if (_nr_rx_added) { - sp.request_ranges(source, keyspace, std::move(ranges_to_stream)); + sp.request_ranges(source, keyspace, std::move(ranges_to_stream), _tables); } else if (_nr_tx_added) { - sp.transfer_ranges(source, keyspace, std::move(ranges_to_stream)); + sp.transfer_ranges(source, keyspace, std::move(ranges_to_stream), _tables); } sp.execute().discard_result().get(); // Update finished percentage diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index d0afc8d533..b95f222a9b 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -77,7 +77,8 @@ public: }; range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, - inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason) + inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, + std::vector tables = {}) : _db(db) , _stream_manager(sm) , _token_metadata_ptr(std::move(tmptr)) @@ -87,13 +88,14 @@ public: , _dr(std::move(dr)) , _description(std::move(description)) , _reason(reason) + , _tables(std::move(tables)) { _abort_source.check(); } range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, - inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason) - : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason) { + inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, std::vector tables = {}) + : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason, std::move(tables)) { } void add_source_filter(std::unique_ptr filter) { @@ -155,6 +157,7 @@ private: locator::endpoint_dc_rack _dr; sstring _description; streaming::stream_reason _reason; + std::vector _tables; std::unordered_multimap> _to_stream; std::unordered_set> _source_filters; // Number of tx and rx ranges added diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 8bde9ee62a..2b4eb4dbe5 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -6,6 +6,19 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +namespace locator { + +struct tablet_id final { + uint64_t value(); +}; + +struct global_tablet_id final { + ::table_id table; + locator::tablet_id tablet; +}; + +} + namespace service { struct fencing_token { service::topology::version_t topology_version; @@ -38,4 +51,5 @@ struct raft_topology_pull_params {}; verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result; verb raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot; +verb [[cancellable]] tablet_stream_data (locator::global_tablet_id); } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 5238322474..984f6079d5 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -18,6 +18,7 @@ #include "gms/gossip_digest_syn.hh" #include "gms/gossip_digest_ack.hh" #include "gms/gossip_digest_ack2.hh" +#include "locator/tablets.hh" #include "query-request.hh" #include "query-result.hh" #include @@ -593,6 +594,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG: case messaging_verb::NODE_OPS_CMD: case messaging_verb::HINT_MUTATION: + case messaging_verb::TABLET_STREAM_DATA: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 3fc07b0c03..1841d5e2f5 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -182,7 +182,8 @@ enum class messaging_verb : int32_t { DIRECT_FD_PING = 63, RAFT_TOPOLOGY_CMD = 64, RAFT_PULL_TOPOLOGY_SNAPSHOT = 65, - LAST = 66, + TABLET_STREAM_DATA = 66, + LAST = 67, }; } // namespace netw diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index cf53b4d9a5..68da22d35e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -22,6 +22,7 @@ #include "query_result_merger.hh" #include #include "message/messaging_service.hh" +#include "locator/tablets.hh" #include "gms/gossiper.hh" #include #include "db/read_repair_decision.hh" diff --git a/service/storage_service.cc b/service/storage_service.cc index 00cb1c777e..961c9b90f0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -19,6 +19,7 @@ #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/consistency_level.hh" +#include "locator/tablets.hh" #include #include "mutation/canonical_mutation.hh" #include "seastar/core/on_internal_error.hh" @@ -5358,6 +5359,100 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver }); } +static +locator::tablet_replica get_leaving_replica(const locator::tablet_info& tinfo, const locator::tablet_transition_info& trinfo) { + std::unordered_set leaving(tinfo.replicas.begin(), tinfo.replicas.end()); + for (auto&& r : trinfo.next) { + leaving.erase(r); + } + if (leaving.empty()) { + throw std::runtime_error(format("No leaving replicas")); + } + if (leaving.size() > 1) { + throw std::runtime_error(format("More than one leaving replica")); + } + return *leaving.begin(); +} + +inet_address storage_service::host2ip(locator::host_id host) { + auto ip = _group0->address_map().find(raft::server_id(host.uuid())); + if (!ip) { + throw std::runtime_error(::format("Cannot map host {} to ip", host)); + } + return *ip; +} + +// Streams data to the pending tablet replica of a given tablet on this node. +// The source tablet replica is determined from the current transition info of the tablet. +future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { + // The coordinator does not execute global token metadata barrier before jumping to "streaming" stage, so we need + // a barrier here to see the token metadata which is at least as recent as that of the sender. + auto& raft_server = _group0->group0_server(); + co_await raft_server.read_barrier(&_abort_source); + + auto tm = _shared_token_metadata.get(); + auto& tmap = tm->tablets().get_tablet_map(tablet.table); + auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet); + + // Check if the request is still valid. + // If there is mismatch, it means this streaming was canceled and the coordinator moved on. + if (!trinfo) { + throw std::runtime_error(format("No transition info for tablet {}", tablet)); + } + if (trinfo->stage != locator::tablet_transition_stage::streaming) { + throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet)); + } + if (trinfo->pending_replica.host != tm->get_my_id()) { + throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet)); + } + + auto& tinfo = tmap.get_tablet_info(tablet.tablet); + auto range = tmap.get_token_range(tablet.tablet); + locator::tablet_replica leaving_replica = get_leaving_replica(tinfo, *trinfo); + if (leaving_replica.host == tm->get_my_id()) { + // The algorithm doesn't work with tablet migration within the same node because + // it assumes there is only one tablet replica, picked by the sharder, on local node. + throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}", + tablet, leaving_replica.shard, trinfo->pending_replica.shard)); + } + auto leaving_replica_ip = host2ip(leaving_replica.host); + + if (_tablet_streaming[tablet]) { + slogger.debug("Streaming retry joining with existing session for tablet {}", tablet); + co_await _tablet_streaming[tablet]->get_future(); + co_return; + } + + auto async_gate_holder = _async_gate.hold(); + promise<> p; + _tablet_streaming[tablet] = seastar::shared_future<>(p.get_future()); + auto erase_tablet_streaming = seastar::defer([&] { + _tablet_streaming.erase(tablet); + }); + + try { + auto& table = _db.local().find_column_family(tablet.table); + std::vector tables = {table.schema()->cf_name()}; + auto streamer = make_lw_shared(_db, _stream_manager, tm, _abort_source, + get_broadcast_address(), _sys_ks.local().local_dc_rack(), + "Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables)); + streamer->add_source_filter(std::make_unique( + _gossiper.get_unreachable_members())); + + std::unordered_map ranges_per_endpoint; + ranges_per_endpoint[leaving_replica_ip].emplace_back(range); + streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint)); + co_await streamer->stream_async(); + + p.set_value(); + slogger.info("Streaming for tablet migration of {} successful", tablet); + } catch (...) { + p.set_exception(std::current_exception()); + slogger.warn("Streaming for tablet migration of {} from {} failed: {}", tablet, leaving_replica, std::current_exception()); + throw; + } +} + void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); @@ -5421,6 +5516,9 @@ void storage_service::init_messaging_service(sharded& pr }; }); }); + ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [this] (locator::global_tablet_id tablet) { + return stream_tablet(tablet); + }); } future<> storage_service::uninit_messaging_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index faf85daaa8..d1562f1324 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -15,6 +15,7 @@ #include "gms/i_endpoint_state_change_subscriber.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include "locator/abstract_replication_strategy.hh" +#include "locator/tablets.hh" #include "inet_address_vectors.hh" #include #include @@ -145,6 +146,8 @@ private: future<> node_ops_abort(node_ops_id ops_uuid); void node_ops_signal_abort(std::optional ops_uuid); future<> node_ops_abort_thread(); + future<> stream_tablet(locator::global_tablet_id); + inet_address host2ip(locator::host_id); public: storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, @@ -759,6 +762,7 @@ private: std::optional> _decomission_result; std::optional> _rebuild_result; std::unordered_map>> _remove_result; + std::unordered_map>> _tablet_streaming; // During decommission, the node waits for the coordinator to tell it to shut down. std::optional> _shutdown_request_promise; struct { diff --git a/streaming/stream_reason.hh b/streaming/stream_reason.hh index 5cbaad5018..f2bad65f39 100644 --- a/streaming/stream_reason.hh +++ b/streaming/stream_reason.hh @@ -22,6 +22,7 @@ enum class stream_reason : uint8_t { rebuild, repair, replace, + tablet_migration, }; } @@ -46,6 +47,8 @@ struct fmt::formatter : fmt::formatter::format("repair", ctx); case replace: return formatter::format("replace", ctx); + case tablet_migration: + return formatter::format("tablet migration", ctx); } std::abort(); }