From a285bd28e228d42b1c815ee5fb147a020738443c Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 21 May 2021 15:30:56 +0800 Subject: [PATCH 1/2] storage_service: Respect --enable-repair-based-node-ops flag during removenode In commit 829b4c1 (repair: Make removenode safe by default), removenode was changed to use repair based node operations unconditionally. Since repair based node operations is not enabled by default, we should respect the flag to use stream to sync data if the flag is false. Fixes #8700 --- service/storage_service.cc | 63 ++++++++++++++++++++++++++++++++++++-- service/storage_service.hh | 3 ++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 6f1a906659..4875ff0806 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2503,9 +2503,15 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid)); } auto ops = it->second.get_ops_info(); + auto as = it->second.get_abort_source(); for (auto& node : req.leaving_nodes) { - slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator); - ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get(); + if (ss.is_repair_based_node_ops_enabled()) { + slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator); + ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get(); + } else { + slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); + ss.removenode_with_stream(node, as).get(); + } } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); @@ -2890,6 +2896,50 @@ void storage_service::unbootstrap() { leave_ring(); } +future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr) { + return seastar::async([this, leaving_node, as_ptr] { + auto tmptr = get_token_metadata_ptr(); + abort_source as; + auto sub = _abort_source.subscribe([&as] () noexcept { + if (!as.abort_requested()) { + as.request_abort(); + } + }); + if (!as_ptr) { + throw std::runtime_error("removenode_with_stream: abort_source is nullptr"); + } + auto as_ptr_sub = as_ptr->subscribe([&as] () noexcept { + if (!as.abort_requested()) { + as.request_abort(); + } + }); + auto streamer = make_lw_shared(_db, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode); + auto my_address = get_broadcast_address(); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node); + dht::token_range_vector my_new_ranges; + for (auto& x : changed_ranges) { + if (x.second == my_address) { + my_new_ranges.emplace_back(x.first); + } + } + std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); + std::unordered_map ranges_per_endpoint; + for (auto& x : source_ranges) { + ranges_per_endpoint[x.first].emplace_back(x.second); + } + streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); + } + try { + streamer->stream_async().get(); + } catch (...) { + slogger.warn("removenode_with_stream: stream failed: {}", std::current_exception()); + throw; + } + }); +} + future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled()) { auto ops_uuid = utils::make_random_uuid(); @@ -3885,6 +3935,7 @@ node_ops_meta_data::node_ops_meta_data( : _ops_uuid(std::move(ops_uuid)) , _coordinator(std::move(coordinator)) , _abort(std::move(abort_func)) + , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) , _ops(std::move(ops)) , _watchdog([sig = _signal] { sig(); }) { @@ -3919,6 +3970,10 @@ shared_ptr node_ops_meta_data::get_ops_info() { return _ops; } +shared_ptr node_ops_meta_data::get_abort_source() { + return _abort_source; +} + void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) { slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid); auto permit = seastar::get_units(_node_ops_abort_sem, 1); @@ -3947,6 +4002,10 @@ void storage_service::node_ops_abort(utils::UUID ops_uuid) { if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; meta.abort().get(); + auto as = meta.get_abort_source(); + if (as && !as->abort_requested()) { + as->request_abort(); + } abort_repair_node_ops(ops_uuid).get(); _node_ops.erase(it); } diff --git a/service/storage_service.hh b/service/storage_service.hh index e386721a0b..4252406a44 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -126,6 +126,7 @@ class node_ops_meta_data { utils::UUID _ops_uuid; gms::inet_address _coordinator; std::function ()> _abort; + shared_ptr _abort_source; std::function _signal; shared_ptr _ops; seastar::timer _watchdog; @@ -139,6 +140,7 @@ public: std::function ()> abort_func, std::function signal_func); shared_ptr get_ops_info(); + shared_ptr get_abort_source(); future<> abort(); void update_watchdog(); void cancel_watchdog(); @@ -692,6 +694,7 @@ private: * @param endpoint the node that left */ future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); + future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); // needs to be modified to accept either a keyspace or ARS. std::unordered_multimap get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); From 70147dcb5ab1dfd8fd5714ca4de7748526a1e6b4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 May 2021 09:15:21 +0800 Subject: [PATCH 2/2] storage_service: Add removenode_add_ranges helper Share the code between restore_replica_count and removenode_with_stream to reduce duplication. Refs #8700 --- service/storage_service.cc | 58 ++++++++++++++++---------------------- service/storage_service.hh | 2 ++ 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 4875ff0806..fb23e9f2cf 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2896,6 +2896,28 @@ void storage_service::unbootstrap() { leave_ring(); } +future<> storage_service::removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node) { + auto tmptr = get_token_metadata_ptr(); + auto my_address = get_broadcast_address(); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node); + dht::token_range_vector my_new_ranges; + for (auto& x : changed_ranges) { + if (x.second == my_address) { + my_new_ranges.emplace_back(x.first); + } + } + std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); + std::unordered_map ranges_per_endpoint; + for (auto& x : source_ranges) { + ranges_per_endpoint[x.first].emplace_back(x.second); + } + streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); + } + return make_ready_future<>(); +} + future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr) { return seastar::async([this, leaving_node, as_ptr] { auto tmptr = get_token_metadata_ptr(); @@ -2914,23 +2936,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, } }); auto streamer = make_lw_shared(_db, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode); - auto my_address = get_broadcast_address(); - auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node); - dht::token_range_vector my_new_ranges; - for (auto& x : changed_ranges) { - if (x.second == my_address) { - my_new_ranges.emplace_back(x.first); - } - } - std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); - std::unordered_map ranges_per_endpoint; - for (auto& x : source_ranges) { - ranges_per_endpoint[x.first].emplace_back(x.second); - } - streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); - } + removenode_add_ranges(streamer, leaving_node).get(); try { streamer->stream_async().get(); } catch (...) { @@ -2957,23 +2963,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr } }); auto streamer = make_lw_shared(_db, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode); - auto my_address = get_broadcast_address(); - auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); - dht::token_range_vector my_new_ranges; - for (auto& x : changed_ranges) { - if (x.second == my_address) { - my_new_ranges.emplace_back(x.first); - } - } - std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr); - std::unordered_map ranges_per_endpoint; - for (auto& x : source_ranges) { - ranges_per_endpoint[x.first].emplace_back(x.second); - } - streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint)); - } + removenode_add_ranges(streamer, endpoint).get(); auto status_checker = seastar::async([this, endpoint, &as] { slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint); while (!as.abort_requested()) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 4252406a44..a6c7392c86 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -89,6 +89,7 @@ class view_update_generator; namespace dht { class boot_strapper; +class range_streamer; } namespace gms { @@ -695,6 +696,7 @@ private: */ future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); + future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node); // needs to be modified to accept either a keyspace or ARS. std::unordered_multimap get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);