diff --git a/service/storage_service.cc b/service/storage_service.cc index 83ecb32dbd..76555136d6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2506,9 +2506,15 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid)); } auto ops = it->second.get_ops_info(); + auto as = it->second.get_abort_source(); for (auto& node : req.leaving_nodes) { - slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator); - ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get(); + if (ss.is_repair_based_node_ops_enabled()) { + slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator); + ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get(); + } else { + slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); + ss.removenode_with_stream(node, as).get(); + } } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); @@ -2893,6 +2899,56 @@ void storage_service::unbootstrap() { leave_ring(); } +future<> storage_service::removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node) { + auto tmptr = get_token_metadata_ptr(); + auto my_address = get_broadcast_address(); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node); + dht::token_range_vector my_new_ranges; + for (auto& x : changed_ranges) { + if (x.second == my_address) { + my_new_ranges.emplace_back(x.first); + } + } + std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); + std::unordered_map ranges_per_endpoint; + for (auto& x : source_ranges) { + ranges_per_endpoint[x.first].emplace_back(x.second); + } + streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); + } + return make_ready_future<>(); +} + +future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr) { + return seastar::async([this, leaving_node, as_ptr] { + auto tmptr = get_token_metadata_ptr(); + abort_source as; + auto sub = _abort_source.subscribe([&as] () noexcept { + if (!as.abort_requested()) { + as.request_abort(); + } + }); + if (!as_ptr) { + throw std::runtime_error("removenode_with_stream: abort_source is nullptr"); + } + auto as_ptr_sub = as_ptr->subscribe([&as] () noexcept { + if (!as.abort_requested()) { + as.request_abort(); + } + }); + auto streamer = make_lw_shared(_db, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode); + removenode_add_ranges(streamer, leaving_node).get(); + try { + streamer->stream_async().get(); + } catch (...) { + slogger.warn("removenode_with_stream: stream failed: {}", std::current_exception()); + throw; + } + }); +} + future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled()) { auto ops_uuid = utils::make_random_uuid(); @@ -2910,23 +2966,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr } }); auto streamer = make_lw_shared(_db, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode); - auto my_address = get_broadcast_address(); - auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); - dht::token_range_vector my_new_ranges; - for (auto& x : changed_ranges) { - if (x.second == my_address) { - my_new_ranges.emplace_back(x.first); - } - } - std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); - std::unordered_map ranges_per_endpoint; - for (auto& x : source_ranges) { - ranges_per_endpoint[x.first].emplace_back(x.second); - } - streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); - } + removenode_add_ranges(streamer, endpoint).get(); auto status_checker = seastar::async([this, endpoint, &as] { slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint); while (!as.abort_requested()) { @@ -3888,6 +3928,7 @@ node_ops_meta_data::node_ops_meta_data( : _ops_uuid(std::move(ops_uuid)) , _coordinator(std::move(coordinator)) , _abort(std::move(abort_func)) + , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) , _ops(std::move(ops)) , _watchdog([sig = _signal] { sig(); }) { @@ -3922,6 +3963,10 @@ shared_ptr node_ops_meta_data::get_ops_info() { return _ops; } +shared_ptr node_ops_meta_data::get_abort_source() { + return _abort_source; +} + void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) { slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid); auto permit = seastar::get_units(_node_ops_abort_sem, 1); @@ -3950,6 +3995,10 @@ void storage_service::node_ops_abort(utils::UUID ops_uuid) { if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; meta.abort().get(); + auto as = meta.get_abort_source(); + if (as && !as->abort_requested()) { + as->request_abort(); + } abort_repair_node_ops(ops_uuid).get(); _node_ops.erase(it); } diff --git a/service/storage_service.hh b/service/storage_service.hh index e386721a0b..a6c7392c86 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -89,6 +89,7 @@ class view_update_generator; namespace dht { class boot_strapper; +class range_streamer; } namespace gms { @@ -126,6 +127,7 @@ class node_ops_meta_data { utils::UUID _ops_uuid; gms::inet_address _coordinator; std::function ()> _abort; + shared_ptr _abort_source; std::function _signal; shared_ptr _ops; seastar::timer _watchdog; @@ -139,6 +141,7 @@ public: std::function ()> abort_func, std::function signal_func); shared_ptr get_ops_info(); + shared_ptr get_abort_source(); future<> abort(); void update_watchdog(); void cancel_watchdog(); @@ -692,6 +695,8 @@ private: * @param endpoint the node that left */ future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); + future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); + future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node); // needs to be modified to accept either a keyspace or ARS. std::unordered_multimap get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);