From 5c25066ea724f2738bae24c343df07b6c913d947 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 24 Oct 2022 17:35:02 +0300 Subject: [PATCH 1/6] repair: node_ops_info: prevent accidental copy Delete node_ops_info copy and move constructors before we add a sharded member for the per-shard repairs in the next patch. Signed-off-by: Benny Halevy --- repair/repair.cc | 6 ++++++ repair/repair.hh | 5 +++++ service/storage_service.cc | 4 ++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index cd7351577e..05b0fe5356 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -49,6 +49,12 @@ logging::logger rlogger("repair"); +node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept + : ops_uuid(ops_uuid_) + , as(std::move(as_)) + , ignore_nodes(std::move(ignore_nodes_)) +{} + void node_ops_info::check_abort() { if (as && as->abort_requested()) { auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid); diff --git a/repair/repair.hh b/repair/repair.hh index e894aafeab..53ada83eb4 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -71,6 +71,11 @@ struct node_ops_info { utils::UUID ops_uuid; shared_ptr as; std::list ignore_nodes; + + node_ops_info(utils::UUID ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept; + node_ops_info(const node_ops_info&) = delete; + node_ops_info(node_ops_info&&) = delete; + void check_abort(); }; diff --git a/service/storage_service.cc b/service/storage_service.cc index 453ccbb1fe..8e6f23f205 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2925,7 +2925,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { auto ops_uuid = utils::make_random_uuid(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, nullptr, std::list()}); + auto ops = seastar::make_shared(ops_uuid, nullptr, std::list()); return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () { return send_replication_notification(notify_endpoint); }); @@ -3658,7 +3658,7 @@ node_ops_meta_data::node_ops_meta_data( , _abort(std::move(abort_func)) , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) - , _ops(seastar::make_shared({_ops_uuid, _abort_source, std::move(ignore_nodes)})) + , _ops(seastar::make_shared(_ops_uuid, _abort_source, std::move(ignore_nodes))) , _watchdog([sig = _signal] { sig(); }) { _watchdog.arm(_watchdog_interval); } From 47e4761b4e0d3618a8b6dd5b9bf7b5fdf5af86db Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 26 Oct 2022 15:34:41 +0300 Subject: [PATCH 2/6] storage_service: node_ops_meta_data: add start and stop methods Prepare for starting and stopping repair node_ops_info Signed-off-by: Benny Halevy --- service/storage_service.cc | 14 ++++++++++++++ service/storage_service.hh | 2 ++ 2 files changed, 16 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8e6f23f205..68d20674b6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2534,6 +2534,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -2582,6 +2583,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -2626,6 +2628,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { // Wait for local node has marked replacing node as alive @@ -2680,6 +2683,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -3663,6 +3667,14 @@ node_ops_meta_data::node_ops_meta_data( _watchdog.arm(_watchdog_interval); } +future<> node_ops_meta_data::start() { + return make_ready_future<>(); // for now +} + +future<> node_ops_meta_data::stop() noexcept { + return make_ready_future<>(); // for now +} + future<> node_ops_meta_data::abort() { slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid); _watchdog.cancel(); @@ -3708,6 +3720,7 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) { if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; meta.cancel_watchdog(); + co_await meta.stop(); _node_ops.erase(it); } } @@ -3723,6 +3736,7 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) { if (as && !as->abort_requested()) { as->request_abort(); } + co_await meta.stop(); _node_ops.erase(it); } } diff --git a/service/storage_service.hh b/service/storage_service.hh index 47e25c478b..6ed787f258 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -105,6 +105,8 @@ public: std::list ignore_nodes, std::function ()> abort_func, std::function signal_func); + future<> start(); + future<> stop() noexcept; shared_ptr get_ops_info(); shared_ptr get_abort_source(); future<> abort(); From 0efd290378a991ef5f2aff9870954a9e652fe072 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 26 Oct 2022 14:29:14 +0300 Subject: [PATCH 3/6] storage_service: node_ops_abort_thread: co_return only after printing log message Currently the function co_returns if (!uuid_opt) so the log info message indicating it's stopped is not printed. Signed-off-by: Benny Halevy --- service/storage_service.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 68d20674b6..4837225b99 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3756,6 +3756,7 @@ future<> storage_service::node_ops_abort_thread() { auto uuid_opt = _node_ops_abort_queue.front(); _node_ops_abort_queue.pop_front(); if (!uuid_opt) { + slogger.info("Stopped node_ops_abort_thread"); co_return; } try { @@ -3765,7 +3766,7 @@ future<> storage_service::node_ops_abort_thread() { } } } - slogger.info("Stopped node_ops_abort_thread"); + __builtin_unreachable(); } } // namespace service From c2f384093d9a2c690fe4a2991fab83f5d4183ebf Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 26 Oct 2022 14:36:45 +0300 Subject: [PATCH 4/6] storage_service: node_ops_abort_thread: abort all node ops on shutdown A later patch adds a sharded to node_ops_info. On shutdown, we must orderly stop it, so use node_ops_abort_thread shutdown path (where node_ops_singal_abort is called will a nullopt) to abort (and stop) all outstanding node_ops by passing a null_uuid to node_ops_abort, and let it iterate over all node ops to abort and stop them. Signed-off-by: Benny Halevy --- service/storage_service.cc | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 4837225b99..b97e578aac 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3728,6 +3728,24 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) { future<> storage_service::node_ops_abort(utils::UUID ops_uuid) { slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid); auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1); + + if (!ops_uuid) { + for (auto& [uuid, meta] : _node_ops) { + co_await meta.abort(); + auto as = meta.get_abort_source(); + if (as && !as->abort_requested()) { + as->request_abort(); + } + } + + for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) { + node_ops_meta_data& meta = it->second; + co_await meta.stop(); + } + + co_return; + } + auto it = _node_ops.find(ops_uuid); if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; @@ -3755,15 +3773,15 @@ future<> storage_service::node_ops_abort_thread() { while (!_node_ops_abort_queue.empty()) { auto uuid_opt = _node_ops_abort_queue.front(); _node_ops_abort_queue.pop_front(); + try { + co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid())); + } catch (...) { + slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception()); + } if (!uuid_opt) { slogger.info("Stopped node_ops_abort_thread"); co_return; } - try { - co_await node_ops_abort(*uuid_opt); - } catch (...) { - slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception()); - } } } __builtin_unreachable(); From 88f993e5ed37e4a5c5c4ef6b07fbdeac32e1eaa9 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 26 Oct 2022 15:41:39 +0300 Subject: [PATCH 5/6] repair: node_ops_info: add start and stop methods Prepare for adding a sharded member. Wire start/stop in storage_service::node_ops_meta_data. Signed-off-by: Benny Halevy --- repair/repair.cc | 8 ++++++++ repair/repair.hh | 3 +++ service/storage_service.cc | 4 ++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 05b0fe5356..40fbb69f44 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -63,6 +63,14 @@ void node_ops_info::check_abort() { } } +future<> node_ops_info::start() { + co_return; // for now +} + +future<> node_ops_info::stop() noexcept { + co_return; // for now +} + node_ops_metrics::node_ops_metrics(tracker& tracker) : _tracker(tracker) { diff --git a/repair/repair.hh b/repair/repair.hh index 53ada83eb4..11540ec9cb 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -76,6 +76,9 @@ struct node_ops_info { node_ops_info(const node_ops_info&) = delete; node_ops_info(node_ops_info&&) = delete; + future<> start(); + future<> stop() noexcept; + void check_abort(); }; diff --git a/service/storage_service.cc b/service/storage_service.cc index b97e578aac..89ec16fac9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3668,11 +3668,11 @@ node_ops_meta_data::node_ops_meta_data( } future<> node_ops_meta_data::start() { - return make_ready_future<>(); // for now + return _ops ? _ops->start() : make_ready_future<>(); } future<> node_ops_meta_data::stop() noexcept { - return make_ready_future<>(); // for now + return _ops ? _ops->stop() : make_ready_future<>(); } future<> node_ops_meta_data::abort() { From 0ea8250e83acc6e13e8b93afb6c911341349a307 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 24 Oct 2022 21:01:32 +0300 Subject: [PATCH 6/6] repair: use sharded abort_source to abort repair_info Currently we use a single shared_ptr that can't be copied across shards. Instead, use a sharded in node_ops_info so that each repair_info instance will use an (optional) abort_source* on its own shard. Added respective start and stop methodsm plus a local_abort_source getter to get the shard-local abort_source (if available). Fixes #11826 Signed-off-by: Benny Halevy --- repair/repair.cc | 23 +++++++++++++++++++---- repair/repair.hh | 13 +++++++++++-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 40fbb69f44..e1912a14c1 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -64,11 +64,25 @@ void node_ops_info::check_abort() { } future<> node_ops_info::start() { - co_return; // for now + if (as) { + co_await _sas.start(); + _abort_subscription = as->subscribe([this] () noexcept { + _abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept { + as.request_abort(); + }); + }); + } } future<> node_ops_info::stop() noexcept { - co_return; // for now + if (as) { + co_await std::exchange(_abort_done, make_ready_future<>()); + co_await _sas.stop(); + } +} + +abort_source* node_ops_info::local_abort_source() { + return as ? &_sas.local() : nullptr; } node_ops_metrics::node_ops_metrics(tracker& tracker) @@ -546,7 +560,7 @@ repair_info::repair_info(repair_service& repair, const std::vector& hosts_, const std::unordered_set& ignore_nodes_, streaming::stream_reason reason_, - shared_ptr as, + abort_source* as, bool hints_batchlog_flushed) : rs(repair) , db(repair.get_db()) @@ -1309,9 +1323,10 @@ future<> repair_service::do_sync_data_using_repair( auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); bool hints_batchlog_flushed = false; + abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr; auto ri = make_lw_shared(local_repair, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed); ri->neighbors = std::move(neighbors); return repair_ranges(ri); }); diff --git a/repair/repair.hh b/repair/repair.hh index 11540ec9cb..a768337a6b 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -67,11 +67,18 @@ struct repair_uniq_id { }; std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x); -struct node_ops_info { +class node_ops_info { +public: utils::UUID ops_uuid; shared_ptr as; std::list ignore_nodes; +private: + optimized_optional _abort_subscription; + sharded _sas; + future<> _abort_done = make_ready_future<>(); + +public: node_ops_info(utils::UUID ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept; node_ops_info(const node_ops_info&) = delete; node_ops_info(node_ops_info&&) = delete; @@ -80,6 +87,8 @@ struct node_ops_info { future<> stop() noexcept; void check_abort(); + + abort_source* local_abort_source(); }; // NOTE: repair_start() can be run on any node, but starts a node-global @@ -187,7 +196,7 @@ public: const std::vector& hosts_, const std::unordered_set& ingore_nodes_, streaming::stream_reason reason_, - shared_ptr as, + abort_source* as, bool hints_batchlog_flushed); void check_failed_ranges(); void abort() noexcept;