From 42fd3704e441d1563fa2c8a2955d0d66b81842a3 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 7 Mar 2023 17:36:51 +0100 Subject: [PATCH] Merge 'storage_service: Make node operations safer by detecting asymmetric abort' from Tomasz Grabiec This patch fixes a problem which affects decommission and removenode which may lead to data consistency problems under conditions which lead one of the nodes to unliaterally decide to abort the node operation without the coordinator noticing. If this happens during streaming, the node operation coordinator would proceed to make a change in the gossiper, and only later dectect that one of the nodes aborted during sending of decommission_done or removenode_done command. That's too late, because the operation will be finalized by all the nodes once gossip propagates. It's unsafe to finalize the operation while another node aborted. The other node reverted to the old topolgy, with which they were running for some time, without considering the pending replica when handling requests. As a result, we may end up with consistency issues. Writes made by those coordinators may not be replicated to CL replicas in the new topology. Streaming may have missed to replicate those writes depending on timing. It's possible that some node aborts but streaming succeeds if the abort is not due to network problems, or if the network problems are transient and/or localized and affect only heartbeats. There is no way to revert after we commit the node operation to the gossiper, so it's ok to close node_ops sessions before making the change to the gossiper, and thus detect aborts and prevent later aborts after the change in the gossiper is made. This is already done during bootstrap (RBNO enabled) and replacenode. This patch canges removenode to also take this approach by moving sending of remove_done earlier. We cannot take this approach with decommission easily, because decommission_done command includes a wait for the node to leave the ring, which won't happen before the change to the gossiper is made. Separating this from decommission_done would require protocol changes. This patch adds a second-best solution, which is to check if sessions are still there right before making a change to the gossiper, leaving decommission_done where it was. The race can still happen, but the time window is now much smaller. The PR also lays down infrastructure which enables testing the scenarios. It makes node ops watchdog periods configurable, and adds error injections. Fixes #12989 Refs #12969 Closes #13028 * github.com:scylladb/scylladb: storage_service: node ops: Extract node_ops_insert() to reduce code duplication storage_service: Make node operations safer by detecting asymmetric abort storage_service: node ops: Add error injections service: node_ops: Make watchdog and heartbeat intervals configurable (cherry picked from commit 2b44631ded8b9dd4db70a0cabdc8c87789698cc6) --- db/config.cc | 2 + db/config.hh | 2 + service/storage_service.cc | 101 +++++++++++++++++++++++++------------ service/storage_service.hh | 6 ++- 4 files changed, 79 insertions(+), 32 deletions(-) diff --git a/db/config.cc b/db/config.cc index 5af8abd7a8..30defa200e 100644 --- a/db/config.cc +++ b/db/config.cc @@ -909,6 +909,8 @@ db::config::config(std::shared_ptr exts) , force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false, "Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.") , task_ttl_seconds(this, "task_ttl_in_seconds", liveness::LiveUpdate, value_status::Used, 10, "Time for which information about finished task stays in memory.") + , nodeops_watchdog_timeout_seconds(this, "nodeops_watchdog_timeout_seconds", liveness::LiveUpdate, value_status::Used, 120, "Time in seconds after which node operations abort when not hearing from the coordinator") + , nodeops_heartbeat_interval_seconds(this, "nodeops_heartbeat_interval_seconds", liveness::LiveUpdate, value_status::Used, 10, "Period of heartbeat ticks in node operations") , cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, false, "Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.") , x_log2_compaction_groups(this, "x_log2_compaction_groups", value_status::Used, 0, "Controls static number of compaction groups per table per shard. For X groups, set the option to log (base 2) of X. Example: Value of 3 implies 8 groups.") diff --git a/db/config.hh b/db/config.hh index b8942a50b4..46dba107fc 100644 --- a/db/config.hh +++ b/db/config.hh @@ -388,6 +388,8 @@ public: named_value force_schema_commit_log; named_value task_ttl_seconds; + named_value nodeops_watchdog_timeout_seconds; + named_value nodeops_heartbeat_interval_seconds; named_value cache_index_pages; diff --git a/service/storage_service.cc b/service/storage_service.cc index 366578f553..9126f4e5c6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1924,9 +1924,11 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ } else { throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported")); } - slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid); + auto interval = std::chrono::seconds(_db.local().get_config().nodeops_heartbeat_interval_seconds()); + slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", ops, uuid, interval.count()); while (!(*heartbeat_updater_done)) { auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}}; + auto next = std::chrono::steady_clock::now() + interval; try { co_await coroutine::parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) { return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) { @@ -1938,14 +1940,18 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ auto ep = std::current_exception(); slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep); } - int nr_seconds = 10; - while (!(*heartbeat_updater_done) && nr_seconds--) { + while (!(*heartbeat_updater_done) && std::chrono::steady_clock::now() < next) { co_await sleep(std::chrono::seconds(1)); } } slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid); } +static +void on_streaming_finished() { + utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get(); +} + future<> storage_service::decommission() { return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) { return seastar::async([&ss] { @@ -2056,6 +2062,7 @@ future<> storage_service::decommission() { // Step 4: Start to sync data slogger.info("DECOMMISSIONING: unbootstrap starts"); ss.unbootstrap().get(); + on_streaming_finished(); slogger.info("DECOMMISSIONING: unbootstrap done"); // Step 5: Become a group 0 non-voter before leaving the token ring. @@ -2068,13 +2075,26 @@ future<> storage_service::decommission() { slogger.info("decommission[{}]: became a group 0 non-voter", uuid); } - // Step 6: Leave the token ring + // Step 6: Verify that other nodes didn't abort in the meantime. + // See https://github.com/scylladb/scylladb/issues/12989. + req.cmd = node_ops_cmd::query_pending_ops; + parallel_for_each(nodes, [&ss, req, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("decommission[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops); + if (boost::find(resp.pending_ops, uuid) == resp.pending_ops.end()) { + throw std::runtime_error(format("decommission[{}]: Node {} no longer tracks the operation", uuid, node)); + } + return make_ready_future<>(); + }); + }).get(); + + // Step 7: Leave the token ring slogger.info("decommission[{}]: leaving token ring", uuid); ss.leave_ring().get(); left_token_ring = true; slogger.info("decommission[{}]: left token ring", uuid); - // Step 7: Finish token movement + // Step 8: Finish token movement req.cmd = node_ops_cmd::decommission_done; parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -2247,6 +2267,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok // Step 5: Sync data for bootstrap _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get(); + on_streaming_finished(); // Step 6: Finish req.cmd = node_ops_cmd::bootstrap_done; @@ -2372,7 +2393,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr()); bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get(); } - + on_streaming_finished(); // Step 8: Finish req.cmd = node_ops_cmd::replace_done; @@ -2520,15 +2541,9 @@ future<> storage_service::removenode(locator::host_id host_id, std::list(); }); }).get(); + on_streaming_finished(); - - // Step 6: Announce the node has left - ss._gossiper.advertise_token_removed(endpoint, host_id).get(); - std::unordered_set tmp(tokens.begin(), tokens.end()); - ss.excise(std::move(tmp), endpoint).get(); - removed_from_token_ring = true; - - // Step 7: Finish token movement + // Step 6: Finish token movement req.cmd = node_ops_cmd::removenode_done; parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -2537,6 +2552,12 @@ future<> storage_service::removenode(locator::host_id host_id, std::list tmp(tokens.begin(), tokens.end()); + ss.excise(std::move(tmp), endpoint).get(); + removed_from_token_ring = true; + slogger.info("removenode[{}]: Finished token movement, node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); } catch (...) { slogger.warn("removenode[{}]: removing node={}, sync_nodes={}, ignore_nodes={} failed, error {}", @@ -2616,6 +2637,28 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no } } +void storage_service::on_node_ops_registered(node_ops_id ops_uuid) { + utils::get_local_injector().inject("storage_service_nodeops_prepare_handler_sleep3", std::chrono::seconds{3}).get(); + utils::get_local_injector().inject("storage_service_nodeops_abort_after_1s", [this, ops_uuid] { + (void)with_gate(_async_gate, [this, ops_uuid] { + return seastar::sleep_abortable(std::chrono::seconds(1), _abort_source).then([this, ops_uuid] { + node_ops_singal_abort(ops_uuid); + }); + }); + }); +} + +void storage_service::node_ops_insert(node_ops_id ops_uuid, + gms::inet_address coordinator, + std::list ignore_nodes, + std::function()> abort_func) { + auto watchdog_interval = std::chrono::seconds(_db.local().get_config().nodeops_watchdog_timeout_seconds()); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ignore_nodes), watchdog_interval, std::move(abort_func), + [this, ops_uuid]() mutable { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + on_node_ops_registered(ops_uuid); +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return seastar::async([this, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; @@ -2662,7 +2705,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& node : req.leaving_nodes) { slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); @@ -2670,9 +2713,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); }); - }, - [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -2712,7 +2753,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& node : req.leaving_nodes) { slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); @@ -2720,9 +2761,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); }); - }, - [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -2754,7 +2793,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return make_ready_future<>(); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.replace_nodes) { auto existing_node = x.first; @@ -2764,9 +2803,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); }); - }, - [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { // Wait for local node has marked replacing node as alive auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); @@ -2808,7 +2845,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); }).get(); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.bootstrap_nodes) { auto& endpoint = x.first; @@ -2818,9 +2855,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); }); - }, - [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); - _node_ops.emplace(ops_uuid, std::move(meta)); + }); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); node_ops_update_heartbeat(ops_uuid).get(); @@ -3771,6 +3806,7 @@ node_ops_meta_data::node_ops_meta_data( node_ops_id ops_uuid, gms::inet_address coordinator, std::list ignore_nodes, + std::chrono::seconds watchdog_interval, std::function ()> abort_func, std::function signal_func) : _ops_uuid(std::move(ops_uuid)) @@ -3779,7 +3815,10 @@ node_ops_meta_data::node_ops_meta_data( , _abort_source(seastar::make_shared()) , _signal(std::move(signal_func)) , _ops(seastar::make_shared(_ops_uuid, _abort_source, std::move(ignore_nodes))) - , _watchdog([sig = _signal] { sig(); }) { + , _watchdog([sig = _signal] { sig(); }) + , _watchdog_interval(watchdog_interval) +{ + slogger.debug("node_ops_meta_data: ops_uuid={} arm interval={}", _ops_uuid, _watchdog_interval.count()); _watchdog.arm(_watchdog_interval); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 2588772f64..4f021f6dc7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -99,12 +99,13 @@ class node_ops_meta_data { std::function _signal; shared_ptr _ops; seastar::timer _watchdog; - std::chrono::seconds _watchdog_interval{120}; + std::chrono::seconds _watchdog_interval; public: explicit node_ops_meta_data( node_ops_id ops_uuid, gms::inet_address coordinator, std::list ignore_nodes, + std::chrono::seconds watchdog_interval, std::function ()> abort_func, std::function signal_func); shared_ptr get_ops_info(); @@ -161,6 +162,8 @@ private: seastar::condition_variable _node_ops_abort_cond; named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}}; future<> _node_ops_abort_thread; + void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list ignore_nodes, + std::function()> abort_func); future<> node_ops_update_heartbeat(node_ops_id ops_uuid); future<> node_ops_done(node_ops_id ops_uuid); future<> node_ops_abort(node_ops_id ops_uuid); @@ -697,6 +700,7 @@ public: future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done); + void on_node_ops_registered(node_ops_id); future get_operation_mode();