diff --git a/db/config.cc b/db/config.cc index 5af8abd7a8..30defa200e 100644 --- a/db/config.cc +++ b/db/config.cc @@ -909,6 +909,8 @@ db::config::config(std::shared_ptr exts) , force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false, "Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.") , task_ttl_seconds(this, "task_ttl_in_seconds", liveness::LiveUpdate, value_status::Used, 10, "Time for which information about finished task stays in memory.") + , nodeops_watchdog_timeout_seconds(this, "nodeops_watchdog_timeout_seconds", liveness::LiveUpdate, value_status::Used, 120, "Time in seconds after which node operations abort when not hearing from the coordinator") + , nodeops_heartbeat_interval_seconds(this, "nodeops_heartbeat_interval_seconds", liveness::LiveUpdate, value_status::Used, 10, "Period of heartbeat ticks in node operations") , cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, false, "Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.") , x_log2_compaction_groups(this, "x_log2_compaction_groups", value_status::Used, 0, "Controls static number of compaction groups per table per shard. For X groups, set the option to log (base 2) of X. Example: Value of 3 implies 8 groups.") diff --git a/db/config.hh b/db/config.hh index b8942a50b4..46dba107fc 100644 --- a/db/config.hh +++ b/db/config.hh @@ -388,6 +388,8 @@ public: named_value force_schema_commit_log; named_value task_ttl_seconds; + named_value nodeops_watchdog_timeout_seconds; + named_value nodeops_heartbeat_interval_seconds; named_value cache_index_pages; diff --git a/service/storage_service.cc b/service/storage_service.cc index 366578f553..9126f4e5c6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1924,9 +1924,11 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ } else { throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported")); } - slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid); + auto interval = std::chrono::seconds(_db.local().get_config().nodeops_heartbeat_interval_seconds()); + slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", ops, uuid, interval.count()); while (!(*heartbeat_updater_done)) { auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}}; + auto next = std::chrono::steady_clock::now() + interval; try { co_await coroutine::parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) { return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) { @@ -1938,14 +1940,18 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ auto ep = std::current_exception(); slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep); } - int nr_seconds = 10; - while (!(*heartbeat_updater_done) && nr_seconds--) { + while (!(*heartbeat_updater_done) && std::chrono::steady_clock::now() < next) { co_await sleep(std::chrono::seconds(1)); } } slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid); } +static +void on_streaming_finished() { + utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get(); +} + future<> storage_service::decommission() { return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) { return seastar::async([&ss] { @@ -2056,6 +2062,7 @@ future<> storage_service::decommission() { // Step 4: Start to sync data slogger.info("DECOMMISSIONING: unbootstrap starts"); ss.unbootstrap().get(); + on_streaming_finished(); slogger.info("DECOMMISSIONING: unbootstrap done"); // Step 5: Become a group 0 non-voter before leaving the token ring. @@ -2068,13 +2075,26 @@ future<> storage_service::decommission() { slogger.info("decommission[{}]: became a group 0 non-voter", uuid); } - // Step 6: Leave the token ring + // Step 6: Verify that other nodes didn't abort in the meantime. + // See https://github.com/scylladb/scylladb/issues/12989. + req.cmd = node_ops_cmd::query_pending_ops; + parallel_for_each(nodes, [&ss, req, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("decommission[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops); + if (boost::find(resp.pending_ops, uuid) == resp.pending_ops.end()) { + throw std::runtime_error(format("decommission[{}]: Node {} no longer tracks the operation", uuid, node)); + } + return make_ready_future<>(); + }); + }).get(); + + // Step 7: Leave the token ring slogger.info("decommission[{}]: leaving token ring", uuid); ss.leave_ring().get(); left_token_ring = true; slogger.info("decommission[{}]: left token ring", uuid); - // Step 7: Finish token movement + // Step 8: Finish token movement req.cmd = node_ops_cmd::decommission_done; parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -2247,6 +2267,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok // Step 5: Sync data for bootstrap _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get(); + on_streaming_finished(); // Step 6: Finish req.cmd = node_ops_cmd::bootstrap_done; @@ -2372,7 +2393,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr()); bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get(); } - + on_streaming_finished(); // Step 8: Finish req.cmd = node_ops_cmd::replace_done; @@ -2520,15 +2541,9 @@ future<> storage_service::removenode(locator::host_id host_id, std::list(); }); }).get(); + on_streaming_finished(); - - // Step 6: Announce the node has left - ss._gossiper.advertise_token_removed(endpoint, host_id).get(); - std::unordered_set tmp(tokens.begin(), tokens.end()); - ss.excise(std::move(tmp), endpoint).get(); - removed_from_token_ring = true; - - // Step 7: Finish token movement + // Step 6: Finish token movement req.cmd = node_ops_cmd::removenode_done; parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -2537,6 +2552,12 @@ future<> storage_service::removenode(locator::host_id host_id, std::list tmp(tokens.begin(), tokens.end()); + ss.excise(std::move(tmp), endpoint).get(); + removed_from_token_ring = true; + slogger.info("removenode[{}]: Finished token movement, node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); } catch (...) { slogger.warn("removenode[{}]: removing node={}, sync_nodes={}, ignore_nodes={} failed, error {}", @@ -2616,6 +2637,28 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no } } +void storage_service::on_node_ops_registered(node_ops_id ops_uuid) { + utils::get_local_injector().inject("storage_service_nodeops_prepare_handler_sleep3", std::chrono::seconds{3}).get(); + utils::get_local_injector().inject("storage_service_nodeops_abort_after_1s", [this, ops_uuid] { + (void)with_gate(_async_gate, [this, ops_uuid] { + return seastar::sleep_abortable(std::chrono::seconds(1), _abort_source).then([this, ops_uuid] { + node_ops_singal_abort(ops_uuid); + }); + }); + }); +} + +void storage_service::node_ops_insert(node_ops_id ops_uuid, + gms::inet_address coordinator, + std::list ignore_nodes, + std::function()> abort_func) { + auto watchdog_interval = std::chrono::seconds(_db.local().get_config().nodeops_watchdog_timeout_seconds()); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ignore_nodes), watchdog_interval, std::move(abort_func), + [this, ops_uuid]() mutable { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + on_node_ops_registered(ops_uuid); +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return seastar::async([this, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; @@ -2662,7 +2705,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& node : req.leaving_nodes) { slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); @@ -2670,9 +2713,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); }); - }, - [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -2712,7 +2753,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& node : req.leaving_nodes) { slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); @@ -2720,9 +2761,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); }); - }, - [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -2754,7 +2793,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return make_ready_future<>(); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.replace_nodes) { auto existing_node = x.first; @@ -2764,9 +2803,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); }); - }, - [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { // Wait for local node has marked replacing node as alive auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); @@ -2808,7 +2845,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.bootstrap_nodes) { auto& endpoint = x.first; @@ -2818,9 +2855,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); }); - }, - [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -3771,6 +3806,7 @@ node_ops_meta_data::node_ops_meta_data( node_ops_id ops_uuid, gms::inet_address coordinator, std::list ignore_nodes, + std::chrono::seconds watchdog_interval, std::function ()> abort_func, std::function signal_func) : _ops_uuid(std::move(ops_uuid)) @@ -3779,7 +3815,10 @@ node_ops_meta_data::node_ops_meta_data( , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) , _ops(seastar::make_shared(_ops_uuid, _abort_source, std::move(ignore_nodes))) - , _watchdog([sig = _signal] { sig(); }) { + , _watchdog([sig = _signal] { sig(); }) + , _watchdog_interval(watchdog_interval) +{ + slogger.debug("node_ops_meta_data: ops_uuid={} arm interval={}", _ops_uuid, _watchdog_interval.count()); _watchdog.arm(_watchdog_interval); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 2588772f64..4f021f6dc7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -99,12 +99,13 @@ class node_ops_meta_data { std::function _signal; shared_ptr _ops; seastar::timer _watchdog; - std::chrono::seconds _watchdog_interval{120}; + std::chrono::seconds _watchdog_interval; public: explicit node_ops_meta_data( node_ops_id ops_uuid, gms::inet_address coordinator, std::list ignore_nodes, + std::chrono::seconds watchdog_interval, std::function ()> abort_func, std::function signal_func); shared_ptr get_ops_info(); @@ -161,6 +162,8 @@ private: seastar::condition_variable _node_ops_abort_cond; named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}}; future<> _node_ops_abort_thread; + void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list ignore_nodes, + std::function()> abort_func); future<> node_ops_update_heartbeat(node_ops_id ops_uuid); future<> node_ops_done(node_ops_id ops_uuid); future<> node_ops_abort(node_ops_id ops_uuid); @@ -697,6 +700,7 @@ public: future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done); + void on_node_ops_registered(node_ops_id); future get_operation_mode();