Merge 'storage_service: Make node operations safer by detecting asymmetric abort' from Tomasz Grabiec
This patch fixes a problem which affects decommission and removenode
which may lead to data consistency problems under conditions which
lead one of the nodes to unliaterally decide to abort the node
operation without the coordinator noticing.
If this happens during streaming, the node operation coordinator would
proceed to make a change in the gossiper, and only later dectect that
one of the nodes aborted during sending of decommission_done or
removenode_done command. That's too late, because the operation will
be finalized by all the nodes once gossip propagates.
It's unsafe to finalize the operation while another node aborted. The
other node reverted to the old topolgy, with which they were running
for some time, without considering the pending replica when handling
requests. As a result, we may end up with consistency issues. Writes
made by those coordinators may not be replicated to CL replicas in the
new topology. Streaming may have missed to replicate those writes
depending on timing.
It's possible that some node aborts but streaming succeeds if the
abort is not due to network problems, or if the network problems are
transient and/or localized and affect only heartbeats.
There is no way to revert after we commit the node operation to the
gossiper, so it's ok to close node_ops sessions before making the
change to the gossiper, and thus detect aborts and prevent later aborts
after the change in the gossiper is made. This is already done during
bootstrap (RBNO enabled) and replacenode. This patch canges removenode
to also take this approach by moving sending of remove_done earlier.
We cannot take this approach with decommission easily, because
decommission_done command includes a wait for the node to leave the
ring, which won't happen before the change to the gossiper is
made. Separating this from decommission_done would require protocol
changes. This patch adds a second-best solution, which is to check if
sessions are still there right before making a change to the gossiper,
leaving decommission_done where it was.
The race can still happen, but the time window is now much smaller.
The PR also lays down infrastructure which enables testing the scenarios. It makes node ops
watchdog periods configurable, and adds error injections.
Fixes #12989
Refs #12969
Closes #13028
* github.com:scylladb/scylladb:
storage_service: node ops: Extract node_ops_insert() to reduce code duplication
storage_service: Make node operations safer by detecting asymmetric abort
storage_service: node ops: Add error injections
service: node_ops: Make watchdog and heartbeat intervals configurable
(cherry picked from commit 2b44631ded)
This commit is contained in:
committed by
Benny Halevy
parent
c9d19b3595
commit
42fd3704e4
@@ -909,6 +909,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
|
||||
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
|
||||
, task_ttl_seconds(this, "task_ttl_in_seconds", liveness::LiveUpdate, value_status::Used, 10, "Time for which information about finished task stays in memory.")
|
||||
, nodeops_watchdog_timeout_seconds(this, "nodeops_watchdog_timeout_seconds", liveness::LiveUpdate, value_status::Used, 120, "Time in seconds after which node operations abort when not hearing from the coordinator")
|
||||
, nodeops_heartbeat_interval_seconds(this, "nodeops_heartbeat_interval_seconds", liveness::LiveUpdate, value_status::Used, 10, "Period of heartbeat ticks in node operations")
|
||||
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, false,
|
||||
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.")
|
||||
, x_log2_compaction_groups(this, "x_log2_compaction_groups", value_status::Used, 0, "Controls static number of compaction groups per table per shard. For X groups, set the option to log (base 2) of X. Example: Value of 3 implies 8 groups.")
|
||||
|
||||
@@ -388,6 +388,8 @@ public:
|
||||
named_value<bool> force_schema_commit_log;
|
||||
|
||||
named_value<uint32_t> task_ttl_seconds;
|
||||
named_value<uint32_t> nodeops_watchdog_timeout_seconds;
|
||||
named_value<uint32_t> nodeops_heartbeat_interval_seconds;
|
||||
|
||||
named_value<bool> cache_index_pages;
|
||||
|
||||
|
||||
@@ -1924,9 +1924,11 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_
|
||||
} else {
|
||||
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
|
||||
}
|
||||
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
|
||||
auto interval = std::chrono::seconds(_db.local().get_config().nodeops_heartbeat_interval_seconds());
|
||||
slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", ops, uuid, interval.count());
|
||||
while (!(*heartbeat_updater_done)) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
|
||||
auto next = std::chrono::steady_clock::now() + interval;
|
||||
try {
|
||||
co_await coroutine::parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
|
||||
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
|
||||
@@ -1938,14 +1940,18 @@ future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_
|
||||
auto ep = std::current_exception();
|
||||
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
|
||||
}
|
||||
int nr_seconds = 10;
|
||||
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
||||
while (!(*heartbeat_updater_done) && std::chrono::steady_clock::now() < next) {
|
||||
co_await sleep(std::chrono::seconds(1));
|
||||
}
|
||||
}
|
||||
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
|
||||
}
|
||||
|
||||
static
|
||||
void on_streaming_finished() {
|
||||
utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get();
|
||||
}
|
||||
|
||||
future<> storage_service::decommission() {
|
||||
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
@@ -2056,6 +2062,7 @@ future<> storage_service::decommission() {
|
||||
// Step 4: Start to sync data
|
||||
slogger.info("DECOMMISSIONING: unbootstrap starts");
|
||||
ss.unbootstrap().get();
|
||||
on_streaming_finished();
|
||||
slogger.info("DECOMMISSIONING: unbootstrap done");
|
||||
|
||||
// Step 5: Become a group 0 non-voter before leaving the token ring.
|
||||
@@ -2068,13 +2075,26 @@ future<> storage_service::decommission() {
|
||||
slogger.info("decommission[{}]: became a group 0 non-voter", uuid);
|
||||
}
|
||||
|
||||
// Step 6: Leave the token ring
|
||||
// Step 6: Verify that other nodes didn't abort in the meantime.
|
||||
// See https://github.com/scylladb/scylladb/issues/12989.
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
parallel_for_each(nodes, [&ss, req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("decommission[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops);
|
||||
if (boost::find(resp.pending_ops, uuid) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(format("decommission[{}]: Node {} no longer tracks the operation", uuid, node));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
|
||||
// Step 7: Leave the token ring
|
||||
slogger.info("decommission[{}]: leaving token ring", uuid);
|
||||
ss.leave_ring().get();
|
||||
left_token_ring = true;
|
||||
slogger.info("decommission[{}]: left token ring", uuid);
|
||||
|
||||
// Step 7: Finish token movement
|
||||
// Step 8: Finish token movement
|
||||
req.cmd = node_ops_cmd::decommission_done;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
@@ -2247,6 +2267,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
|
||||
// Step 5: Sync data for bootstrap
|
||||
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
|
||||
on_streaming_finished();
|
||||
|
||||
// Step 6: Finish
|
||||
req.cmd = node_ops_cmd::bootstrap_done;
|
||||
@@ -2372,7 +2393,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr());
|
||||
bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get();
|
||||
}
|
||||
|
||||
on_streaming_finished();
|
||||
|
||||
// Step 8: Finish
|
||||
req.cmd = node_ops_cmd::replace_done;
|
||||
@@ -2520,15 +2541,9 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
on_streaming_finished();
|
||||
|
||||
|
||||
// Step 6: Announce the node has left
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint).get();
|
||||
removed_from_token_ring = true;
|
||||
|
||||
// Step 7: Finish token movement
|
||||
// Step 6: Finish token movement
|
||||
req.cmd = node_ops_cmd::removenode_done;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
@@ -2537,6 +2552,12 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
});
|
||||
}).get();
|
||||
|
||||
// Step 7: Announce the node has left
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint).get();
|
||||
removed_from_token_ring = true;
|
||||
|
||||
slogger.info("removenode[{}]: Finished token movement, node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
} catch (...) {
|
||||
slogger.warn("removenode[{}]: removing node={}, sync_nodes={}, ignore_nodes={} failed, error {}",
|
||||
@@ -2616,6 +2637,28 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::on_node_ops_registered(node_ops_id ops_uuid) {
|
||||
utils::get_local_injector().inject("storage_service_nodeops_prepare_handler_sleep3", std::chrono::seconds{3}).get();
|
||||
utils::get_local_injector().inject("storage_service_nodeops_abort_after_1s", [this, ops_uuid] {
|
||||
(void)with_gate(_async_gate, [this, ops_uuid] {
|
||||
return seastar::sleep_abortable(std::chrono::seconds(1), _abort_source).then([this, ops_uuid] {
|
||||
node_ops_singal_abort(ops_uuid);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::node_ops_insert(node_ops_id ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
std::list<inet_address> ignore_nodes,
|
||||
std::function<future<>()> abort_func) {
|
||||
auto watchdog_interval = std::chrono::seconds(_db.local().get_config().nodeops_watchdog_timeout_seconds());
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ignore_nodes), watchdog_interval, std::move(abort_func),
|
||||
[this, ops_uuid]() mutable { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
on_node_ops_registered(ops_uuid);
|
||||
}
|
||||
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
|
||||
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
@@ -2662,7 +2705,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
@@ -2670,9 +2713,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
@@ -2712,7 +2753,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
@@ -2720,9 +2761,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
@@ -2754,7 +2793,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
@@ -2764,9 +2803,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
// Wait for local node has marked replacing node as alive
|
||||
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
|
||||
@@ -2808,7 +2845,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
@@ -2818,9 +2855,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
@@ -3771,6 +3806,7 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
node_ops_id ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::chrono::seconds watchdog_interval,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func)
|
||||
: _ops_uuid(std::move(ops_uuid))
|
||||
@@ -3779,7 +3815,10 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
, _abort_source(seastar::make_shared<abort_source>())
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
, _watchdog([sig = _signal] { sig(); })
|
||||
, _watchdog_interval(watchdog_interval)
|
||||
{
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} arm interval={}", _ops_uuid, _watchdog_interval.count());
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
|
||||
@@ -99,12 +99,13 @@ class node_ops_meta_data {
|
||||
std::function<void ()> _signal;
|
||||
shared_ptr<node_ops_info> _ops;
|
||||
seastar::timer<lowres_clock> _watchdog;
|
||||
std::chrono::seconds _watchdog_interval{120};
|
||||
std::chrono::seconds _watchdog_interval;
|
||||
public:
|
||||
explicit node_ops_meta_data(
|
||||
node_ops_id ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::chrono::seconds watchdog_interval,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
@@ -161,6 +162,8 @@ private:
|
||||
seastar::condition_variable _node_ops_abort_cond;
|
||||
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
|
||||
future<> _node_ops_abort_thread;
|
||||
void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list<inet_address> ignore_nodes,
|
||||
std::function<future<>()> abort_func);
|
||||
future<> node_ops_update_heartbeat(node_ops_id ops_uuid);
|
||||
future<> node_ops_done(node_ops_id ops_uuid);
|
||||
future<> node_ops_abort(node_ops_id ops_uuid);
|
||||
@@ -697,6 +700,7 @@ public:
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
void on_node_ops_registered(node_ops_id);
|
||||
|
||||
future<mode> get_operation_mode();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user