diff --git a/repair/repair.cc b/repair/repair.cc index cd7351577e..e1912a14c1 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -49,6 +49,12 @@ logging::logger rlogger("repair"); +node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept + : ops_uuid(ops_uuid_) + , as(std::move(as_)) + , ignore_nodes(std::move(ignore_nodes_)) +{} + void node_ops_info::check_abort() { if (as && as->abort_requested()) { auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid); @@ -57,6 +63,28 @@ void node_ops_info::check_abort() { } } +future<> node_ops_info::start() { + if (as) { + co_await _sas.start(); + _abort_subscription = as->subscribe([this] () noexcept { + _abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept { + as.request_abort(); + }); + }); + } +} + +future<> node_ops_info::stop() noexcept { + if (as) { + co_await std::exchange(_abort_done, make_ready_future<>()); + co_await _sas.stop(); + } +} + +abort_source* node_ops_info::local_abort_source() { + return as ? &_sas.local() : nullptr; +} + node_ops_metrics::node_ops_metrics(tracker& tracker) : _tracker(tracker) { @@ -532,7 +560,7 @@ repair_info::repair_info(repair_service& repair, const std::vector& hosts_, const std::unordered_set& ignore_nodes_, streaming::stream_reason reason_, - shared_ptr as, + abort_source* as, bool hints_batchlog_flushed) : rs(repair) , db(repair.get_db()) @@ -1295,9 +1323,10 @@ future<> repair_service::do_sync_data_using_repair( auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); bool hints_batchlog_flushed = false; + abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr; auto ri = make_lw_shared(local_repair, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed); ri->neighbors = std::move(neighbors); return repair_ranges(ri); }); diff --git a/repair/repair.hh b/repair/repair.hh index e894aafeab..a768337a6b 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -67,11 +67,28 @@ struct repair_uniq_id { }; std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x); -struct node_ops_info { +class node_ops_info { +public: utils::UUID ops_uuid; shared_ptr as; std::list ignore_nodes; + +private: + optimized_optional _abort_subscription; + sharded _sas; + future<> _abort_done = make_ready_future<>(); + +public: + node_ops_info(utils::UUID ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept; + node_ops_info(const node_ops_info&) = delete; + node_ops_info(node_ops_info&&) = delete; + + future<> start(); + future<> stop() noexcept; + void check_abort(); + + abort_source* local_abort_source(); }; // NOTE: repair_start() can be run on any node, but starts a node-global @@ -179,7 +196,7 @@ public: const std::vector& hosts_, const std::unordered_set& ingore_nodes_, streaming::stream_reason reason_, - shared_ptr as, + abort_source* as, bool hints_batchlog_flushed); void check_failed_ranges(); void abort() noexcept; diff --git a/service/storage_service.cc b/service/storage_service.cc index b0163db015..03611a7ed0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2539,6 +2539,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -2587,6 +2588,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -2631,6 +2633,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { // Wait for local node has marked replacing node as alive @@ -2685,6 +2688,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); }, [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + meta.start().get(); _node_ops.emplace(ops_uuid, std::move(meta)); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -2930,7 +2934,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { auto ops_uuid = utils::make_random_uuid(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, nullptr, std::list()}); + auto ops = seastar::make_shared(ops_uuid, nullptr, std::list()); return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () { return send_replication_notification(notify_endpoint); }); @@ -3663,11 +3667,19 @@ node_ops_meta_data::node_ops_meta_data( , _abort(std::move(abort_func)) , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) - , _ops(seastar::make_shared({_ops_uuid, _abort_source, std::move(ignore_nodes)})) + , _ops(seastar::make_shared(_ops_uuid, _abort_source, std::move(ignore_nodes))) , _watchdog([sig = _signal] { sig(); }) { _watchdog.arm(_watchdog_interval); } +future<> node_ops_meta_data::start() { + return _ops ? _ops->start() : make_ready_future<>(); +} + +future<> node_ops_meta_data::stop() noexcept { + return _ops ? _ops->stop() : make_ready_future<>(); +} + future<> node_ops_meta_data::abort() { slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid); _watchdog.cancel(); @@ -3713,6 +3725,7 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) { if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; meta.cancel_watchdog(); + co_await meta.stop(); _node_ops.erase(it); } } @@ -3720,6 +3733,24 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) { future<> storage_service::node_ops_abort(utils::UUID ops_uuid) { slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid); auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1); + + if (!ops_uuid) { + for (auto& [uuid, meta] : _node_ops) { + co_await meta.abort(); + auto as = meta.get_abort_source(); + if (as && !as->abort_requested()) { + as->request_abort(); + } + } + + for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) { + node_ops_meta_data& meta = it->second; + co_await meta.stop(); + } + + co_return; + } + auto it = _node_ops.find(ops_uuid); if (it != _node_ops.end()) { node_ops_meta_data& meta = it->second; @@ -3728,6 +3759,7 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) { if (as && !as->abort_requested()) { as->request_abort(); } + co_await meta.stop(); _node_ops.erase(it); } } @@ -3746,17 +3778,18 @@ future<> storage_service::node_ops_abort_thread() { while (!_node_ops_abort_queue.empty()) { auto uuid_opt = _node_ops_abort_queue.front(); _node_ops_abort_queue.pop_front(); - if (!uuid_opt) { - co_return; - } try { - co_await node_ops_abort(*uuid_opt); + co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid())); } catch (...) { slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception()); } + if (!uuid_opt) { + slogger.info("Stopped node_ops_abort_thread"); + co_return; + } } } - slogger.info("Stopped node_ops_abort_thread"); + __builtin_unreachable(); } } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3694d85416..bdcdc02da7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -105,6 +105,8 @@ public: std::list ignore_nodes, std::function ()> abort_func, std::function signal_func); + future<> start(); + future<> stop() noexcept; shared_ptr get_ops_info(); shared_ptr get_abort_source(); future<> abort();