mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-07 15:33:15 +00:00
Merge 'storage_service: Respect --enable-repair-based-node-ops flag during removenode' from Asias He
In commit 829b4c1 (repair: Make removenode safe by default), removenode
was changed to use repair based node operations unconditionally. Since
repair based node operations is not enabled by default, we should
respect the flag to use stream to sync data if the flag is false.
Fixes #8700
Closes #8701
* github.com:scylladb/scylla:
storage_service: Add removenode_add_ranges helper
storage_service: Respect --enable-repair-based-node-ops flag during removenode
This commit is contained in:
@@ -2506,9 +2506,15 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
|
||||
}
|
||||
auto ops = it->second.get_ops_info();
|
||||
auto as = it->second.get_abort_source();
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get();
|
||||
if (ss.is_repair_based_node_ops_enabled()) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get();
|
||||
} else {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
ss.removenode_with_stream(node, as).get();
|
||||
}
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
@@ -2893,6 +2899,56 @@ void storage_service::unbootstrap() {
|
||||
leave_ring();
|
||||
}
|
||||
|
||||
future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node) {
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
auto my_address = get_broadcast_address();
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node);
|
||||
dht::token_range_vector my_new_ranges;
|
||||
for (auto& x : changed_ranges) {
|
||||
if (x.second == my_address) {
|
||||
my_new_ranges.emplace_back(x.first);
|
||||
}
|
||||
}
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
for (auto& x : source_ranges) {
|
||||
ranges_per_endpoint[x.first].emplace_back(x.second);
|
||||
}
|
||||
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr) {
|
||||
return seastar::async([this, leaving_node, as_ptr] {
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
abort_source as;
|
||||
auto sub = _abort_source.subscribe([&as] () noexcept {
|
||||
if (!as.abort_requested()) {
|
||||
as.request_abort();
|
||||
}
|
||||
});
|
||||
if (!as_ptr) {
|
||||
throw std::runtime_error("removenode_with_stream: abort_source is nullptr");
|
||||
}
|
||||
auto as_ptr_sub = as_ptr->subscribe([&as] () noexcept {
|
||||
if (!as.abort_requested()) {
|
||||
as.request_abort();
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, leaving_node).get();
|
||||
try {
|
||||
streamer->stream_async().get();
|
||||
} catch (...) {
|
||||
slogger.warn("removenode_with_stream: stream failed: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
@@ -2910,23 +2966,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
auto my_address = get_broadcast_address();
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
|
||||
dht::token_range_vector my_new_ranges;
|
||||
for (auto& x : changed_ranges) {
|
||||
if (x.second == my_address) {
|
||||
my_new_ranges.emplace_back(x.first);
|
||||
}
|
||||
}
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges, *tmptr);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
for (auto& x : source_ranges) {
|
||||
ranges_per_endpoint[x.first].emplace_back(x.second);
|
||||
}
|
||||
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
|
||||
}
|
||||
removenode_add_ranges(streamer, endpoint).get();
|
||||
auto status_checker = seastar::async([this, endpoint, &as] {
|
||||
slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint);
|
||||
while (!as.abort_requested()) {
|
||||
@@ -3888,6 +3928,7 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
: _ops_uuid(std::move(ops_uuid))
|
||||
, _coordinator(std::move(coordinator))
|
||||
, _abort(std::move(abort_func))
|
||||
, _abort_source(seastar::make_shared<abort_source>())
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(std::move(ops))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
@@ -3922,6 +3963,10 @@ shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
|
||||
return _ops;
|
||||
}
|
||||
|
||||
shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
|
||||
return _abort_source;
|
||||
}
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
@@ -3950,6 +3995,10 @@ void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.abort().get();
|
||||
auto as = meta.get_abort_source();
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
abort_repair_node_ops(ops_uuid).get();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
|
||||
@@ -89,6 +89,7 @@ class view_update_generator;
|
||||
|
||||
namespace dht {
|
||||
class boot_strapper;
|
||||
class range_streamer;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
@@ -126,6 +127,7 @@ class node_ops_meta_data {
|
||||
utils::UUID _ops_uuid;
|
||||
gms::inet_address _coordinator;
|
||||
std::function<future<> ()> _abort;
|
||||
shared_ptr<abort_source> _abort_source;
|
||||
std::function<void ()> _signal;
|
||||
shared_ptr<node_ops_info> _ops;
|
||||
seastar::timer<lowres_clock> _watchdog;
|
||||
@@ -139,6 +141,7 @@ public:
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
shared_ptr<abort_source> get_abort_source();
|
||||
future<> abort();
|
||||
void update_watchdog();
|
||||
void cancel_watchdog();
|
||||
@@ -692,6 +695,8 @@ private:
|
||||
* @param endpoint the node that left
|
||||
*/
|
||||
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
|
||||
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
|
||||
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
|
||||
|
||||
// needs to be modified to accept either a keyspace or ARS.
|
||||
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
|
||||
Reference in New Issue
Block a user