Merge 'Coroutinize some storage_service member functions' from Pavel Solodovnikov

These trivial changes are mostly intended to reduce the use of `seastar::async`.

Closes #10416

* github.com:scylladb/scylla:
  service: storage_service: coroutinize `start_gossiping()`
  service: storage_service: coroutinize `node_ops_cmd_heartbeat_updater()`
  service: storage_service: coroutinize `node_ops_abort_thread()`
  service: storage_service: coroutinize `node_ops_abort()`
  service: storage_service: coroutinize `node_ops_done()`
  service: storage_service: coroutinize `node_ops_update_heartbeat()`
  service: storage_service: coroutinize `force_remove_completion()`
  service: storage_service: coroutinize `start_leaving()`
  service: storage_service: coroutinize `start_sys_dist_ks()`
  service: storage_service: coroutinize `prepare_to_join()`
  service: storage_service: coroutinize `removenode_add_ranges()`
  service: storage_service: coroutinize `unbootstrap()`
  service: storage_service: coroutinize `get_changed_ranges_for_leaving()`
This commit is contained in:
Avi Kivity
2022-05-02 12:59:36 +03:00
2 changed files with 175 additions and 182 deletions

View File

@@ -222,8 +222,7 @@ future<> storage_service::snitch_reconfigured() {
return update_topology(utils::fb_utilities::get_broadcast_address());
}
// Runs inside seastar::async context
void storage_service::prepare_to_join(
future<> storage_service::prepare_to_join(
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features) {
@@ -231,7 +230,7 @@ void storage_service::prepare_to_join(
if (_sys_ks.local().was_decommissioned()) {
if (_db.local().get_config().override_decommission()) {
slogger.warn("This node was decommissioned, but overriding by operator request.");
_sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
} else {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set,"
"or all existing data is removed and the node is bootstrapped again");
@@ -242,13 +241,13 @@ void storage_service::prepare_to_join(
bool replacing_a_node_with_same_ip = false;
bool replacing_a_node_with_diff_ip = false;
auto tmlock = std::make_unique<token_metadata_lock>(get_token_metadata_lock().get0());
auto tmptr = get_mutable_token_metadata_ptr().get0();
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (is_replacing()) {
if (_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
_bootstrap_tokens = prepare_replacement_info(initial_contact_nodes, loaded_peer_features).get0();
_bootstrap_tokens = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
auto replace_address = get_replace_address();
replacing_a_node_with_same_ip = *replace_address == get_broadcast_address();
replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address();
@@ -256,18 +255,18 @@ void storage_service::prepare_to_join(
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address).get();
co_await tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address);
} else if (should_bootstrap()) {
check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features).get();
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
} else {
auto local_features = _feature_service.known_feature_set();
slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes);
_gossiper.do_shadow_round(initial_contact_nodes).get();
co_await _gossiper.do_shadow_round(initial_contact_nodes);
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
_gossiper.check_snitch_name_matches();
_gossiper.reset_endpoint_state_map().get();
co_await _gossiper.reset_endpoint_state_map();
for (auto ep : loaded_endpoints) {
_gossiper.add_saved_endpoint(ep).get();
co_await _gossiper.add_saved_endpoint(ep);
}
}
auto features = _feature_service.supported_feature_set();
@@ -275,19 +274,19 @@ void storage_service::prepare_to_join(
// Save the advertised feature set to system.local table after
// all remote feature checks are complete and after gossip shadow rounds are done.
// At this point, the final feature set is already determined before the node joins the ring.
db::system_keyspace::save_local_supported_features(features).get0();
co_await db::system_keyspace::save_local_supported_features(features);
// If this is a restarting node, we should update tokens before gossip starts
auto my_tokens = db::system_keyspace::get_saved_tokens().get0();
auto my_tokens = co_await db::system_keyspace::get_saved_tokens();
bool restarting_normal_node = _sys_ks.local().bootstrap_complete() && !is_replacing() && !my_tokens.empty();
if (restarting_normal_node) {
slogger.info("Restarting a node in NORMAL status");
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get();
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
_cdc_gen_id = db::system_keyspace::get_cdc_generation_id().get0();
_cdc_gen_id = co_await db::system_keyspace::get_cdc_generation_id();
if (!_cdc_gen_id) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
@@ -314,13 +313,13 @@ void storage_service::prepare_to_join(
// Replicate the tokens early because once gossip runs other nodes
// might send reads/writes to this node. Replicate it early to make
// sure the tokens are valid on all the shards.
replicate_to_all_cores(std::move(tmptr)).get();
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
auto& proxy = service::get_storage_proxy();
// Ensure we know our own actual Schema UUID in preparation for updates
db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service).get0();
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
app_states.emplace(gms::application_state::NET_VERSION, versioned_value::network_version());
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
@@ -352,14 +351,14 @@ void storage_service::prepare_to_join(
slogger.info("Starting up server gossip");
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
auto generation_number = co_await db::system_keyspace::increment_and_get_generation();
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
_gossiper.start_gossiping(generation_number, app_states, advertise).get();
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
}
void storage_service::start_sys_dist_ks() {
future<> storage_service::start_sys_dist_ks() {
supervisor::notify("starting system distributed keyspace");
_sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get();
co_await _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
}
/* Broadcasts the chosen tokens through gossip,
@@ -490,12 +489,12 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
slogger.info("Replacing a node with token(s): {}", _bootstrap_tokens);
// _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node
}
start_sys_dist_ks();
start_sys_dist_ks().get();
mark_existing_views_as_built();
_sys_ks.local().update_tokens(_bootstrap_tokens).get();
bootstrap(); // blocks until finished
} else {
start_sys_dist_ks();
start_sys_dist_ks().get();
_bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0();
if (_bootstrap_tokens.empty()) {
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
@@ -1362,7 +1361,7 @@ future<> storage_service::init_server(cql3::query_processor& qp) {
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features));
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features)).get();
});
}
@@ -1750,24 +1749,28 @@ future<bool> storage_service::is_gossip_running() {
}
future<> storage_service::start_gossiping() {
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) {
return seastar::async([&ss] {
if (!ss._gossiper.is_enabled()) {
slogger.warn("Starting gossip by operator request");
ss._gossiper.container().invoke_on_all(&gms::gossiper::start).get();
auto undo = defer([&ss] { ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).get(); });
auto cdc_gen_ts = db::system_keyspace::get_cdc_generation_id().get0();
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) -> future<> {
if (!ss._gossiper.is_enabled()) {
slogger.warn("Starting gossip by operator request");
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::start);
bool should_stop_gossiper = false; // undo action
try {
auto cdc_gen_ts = co_await db::system_keyspace::get_cdc_generation_id();
if (!cdc_gen_ts) {
cdc_log.warn("CDC generation timestamp missing when starting gossip");
}
set_gossip_tokens(ss._gossiper,
db::system_keyspace::get_local_tokens().get0(),
cdc_gen_ts).get();
co_await set_gossip_tokens(ss._gossiper,
co_await db::system_keyspace::get_local_tokens(),
cdc_gen_ts);
ss._gossiper.force_newer_generation();
ss._gossiper.start_gossiping(utils::get_generation_number()).get();
undo.cancel();
co_await ss._gossiper.start_gossiping(utils::get_generation_number());
} catch (...) {
should_stop_gossiper = true;
}
});
if (should_stop_gossiper) {
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
}
});
}
@@ -1789,38 +1792,36 @@ future<> storage_service::do_stop_ms() {
});
}
future<> storage_service::node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
return seastar::async([this, cmd, uuid, nodes = std::move(nodes), heartbeat_updater_done] {
std::string ops;
if (cmd == node_ops_cmd::decommission_heartbeat) {
ops = "decommission";
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
ops = "removenode";
} else if (cmd == node_ops_cmd::replace_heartbeat) {
ops = "replace";
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
ops = "bootstrap";
} else {
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
std::string ops;
if (cmd == node_ops_cmd::decommission_heartbeat) {
ops = "decommission";
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
ops = "removenode";
} else if (cmd == node_ops_cmd::replace_heartbeat) {
ops = "replace";
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
ops = "bootstrap";
} else {
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
}
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
co_await parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
return make_ready_future<>();
});
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
});
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
co_await sleep(std::chrono::seconds(1));
}
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
return make_ready_future<>();
});
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
}).get();
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
sleep(std::chrono::seconds(1)).get();
}
}
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
});
}
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
}
future<> storage_service::decommission() {
@@ -1906,7 +1907,7 @@ future<> storage_service::decommission() {
// Step 5: Start to sync data
slogger.info("DECOMMISSIONING: unbootstrap starts");
ss.unbootstrap();
ss.unbootstrap().get();
slogger.info("DECOMMISSIONING: unbootstrap done");
// Step 6: Finish
@@ -2400,10 +2401,10 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid);
node_ops_update_heartbeat(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::removenode_done) {
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
node_ops_done(ops_uuid);
node_ops_done(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
auto it = _node_ops.find(ops_uuid);
if (it == _node_ops.end()) {
@@ -2421,7 +2422,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
}
} else if (req.cmd == node_ops_cmd::removenode_abort) {
node_ops_abort(ops_uuid);
node_ops_abort(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
if (req.leaving_nodes.size() > 1) {
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
@@ -2449,7 +2450,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid);
node_ops_update_heartbeat(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::decommission_done) {
slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion");
@@ -2458,9 +2459,9 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
table->trigger_offstrategy_compaction();
}
}).get();
node_ops_done(ops_uuid);
node_ops_done(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::decommission_abort) {
node_ops_abort(ops_uuid);
node_ops_abort(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::replace_prepare) {
// Mark the replacing node as replacing
if (req.replace_nodes.size() > 1) {
@@ -2509,12 +2510,12 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}).get();
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid);
node_ops_update_heartbeat(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::replace_done) {
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
node_ops_done(ops_uuid);
node_ops_done(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::replace_abort) {
node_ops_abort(ops_uuid);
node_ops_abort(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::bootstrap_prepare) {
// Mark the bootstrap node as bootstrapping
if (req.bootstrap_nodes.size() > 1) {
@@ -2547,12 +2548,12 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid);
node_ops_update_heartbeat(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::bootstrap_done) {
slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
node_ops_done(ops_uuid);
node_ops_done(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::bootstrap_abort) {
node_ops_abort(ops_uuid);
node_ops_abort(ops_uuid).get();
} else {
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, req.cmd);
slogger.warn("{}", msg);
@@ -2631,8 +2632,7 @@ int32_t storage_service::get_exception_count() {
return 0;
}
// Runs inside seastar::async context
std::unordered_multimap<dht::token_range, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
future<std::unordered_multimap<dht::token_range, inet_address>> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint);
@@ -2647,10 +2647,10 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = erm->get_natural_endpoints(end_token);
current_replica_endpoints.emplace(r, std::move(eps));
seastar::thread::maybe_yield();
co_await coroutine::maybe_yield();
}
auto temp = get_token_metadata_ptr()->clone_after_all_left().get0();
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
// command was used), it is still present in temp and must be removed.
@@ -2668,7 +2668,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
auto& rs = ks.get_replication_strategy();
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = rs.calculate_natural_endpoints(end_token, temp).get0();
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
@@ -2694,24 +2694,23 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
}
// Replication strategy doesn't necessarily yield in calculate_natural_endpoints.
// E.g. everywhere_replication_strategy
seastar::thread::maybe_yield();
co_await coroutine::maybe_yield();
}
temp.clear_gently().get();
co_await temp.clear_gently();
return changed_ranges;
co_return changed_ranges;
}
// Runs inside seastar::async context
void storage_service::unbootstrap() {
get_batchlog_manager().local().do_batch_log_replay().get();
future<> storage_service::unbootstrap() {
co_await get_batchlog_manager().local().do_batch_log_replay();
if (is_repair_based_node_ops_enabled(streaming::stream_reason::decommission)) {
_repair.local().decommission_with_repair(get_token_metadata_ptr()).get();
co_await _repair.local().decommission_with_repair(get_token_metadata_ptr());
} else {
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
auto ranges_mm = co_await get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
if (slogger.is_enabled(logging::log_level::debug)) {
std::vector<range<token>> ranges;
for (auto& x : ranges_mm) {
@@ -2728,29 +2727,28 @@ void storage_service::unbootstrap() {
// Wait for batch log to complete before streaming hints.
slogger.debug("waiting for batch log processing.");
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
get_batchlog_manager().local().do_batch_log_replay().get();
co_await get_batchlog_manager().local().do_batch_log_replay();
slogger.info("streaming hints to other nodes");
// wait for the transfer runnables to signal the latch.
slogger.debug("waiting for stream acks.");
try {
stream_success.get();
co_await std::move(stream_success);
} catch (...) {
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
throw;
}
slogger.debug("stream acks all received.");
}
leave_ring().get();
co_await leave_ring();
}
// Runs inside seastar::async context
void storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node) {
future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node) {
auto my_address = get_broadcast_address();
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node);
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = co_await get_changed_ranges_for_leaving(keyspace_name, leaving_node);
dht::token_range_vector my_new_ranges;
for (auto& x : changed_ranges) {
if (x.second == my_address) {
@@ -2784,7 +2782,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode);
removenode_add_ranges(streamer, leaving_node);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
} catch (...) {
@@ -2811,7 +2809,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
removenode_add_ranges(streamer, endpoint);
removenode_add_ranges(streamer, endpoint).get();
auto status_checker = seastar::async([this, endpoint, &as] {
slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint);
while (!as.abort_requested()) {
@@ -2971,12 +2969,11 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
}
future<> storage_service::start_leaving() {
return _gossiper.add_local_application_state(application_state::STATUS, versioned_value::leaving(db::system_keyspace::get_local_tokens().get0())).then([this] {
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->add_leaving_endpoint(endpoint);
return update_pending_ranges(std::move(tmptr), format("start_leaving {}", endpoint));
});
co_await _gossiper.add_local_application_state(application_state::STATUS, versioned_value::leaving(co_await db::system_keyspace::get_local_tokens()));
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->add_leaving_endpoint(endpoint);
return update_pending_ranges(std::move(tmptr), format("start_leaving {}", endpoint));
});
}
@@ -3274,50 +3271,48 @@ future<sstring> storage_service::get_removal_status() {
}
future<> storage_service::force_remove_completion() {
return run_with_no_api_lock([] (storage_service& ss) {
return seastar::async([&ss] {
while (!ss._operation_in_progress.empty()) {
if (ss._operation_in_progress != sstring("removenode")) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
}
// This flag will make removenode stop waiting for the confirmation,
// wait it to complete
slogger.info("Operation removenode is in progress, wait for it to complete");
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
return run_with_no_api_lock([] (storage_service& ss) -> future<> {
while (!ss._operation_in_progress.empty()) {
if (ss._operation_in_progress != sstring("removenode")) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
}
ss._operation_in_progress = sstring("removenode_force");
try {
const auto& tm = ss.get_token_metadata();
if (!ss._replicating_nodes.empty() || !tm.get_leaving_endpoints().empty()) {
auto leaving = tm.get_leaving_endpoints();
slogger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving);
for (auto endpoint : leaving) {
utils::UUID host_id;
auto tokens = tm.get_tokens(endpoint);
try {
host_id = tm.get_host_id(endpoint);
} catch (...) {
slogger.warn("No host_id is found for endpoint {}", endpoint);
continue;
}
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
ss.excise(tokens_set, endpoint).get();
ss._group0->leave_group0(endpoint).get();
// This flag will make removenode stop waiting for the confirmation,
// wait it to complete
slogger.info("Operation removenode is in progress, wait for it to complete");
co_await sleep_abortable(std::chrono::seconds(1), ss._abort_source);
}
ss._operation_in_progress = sstring("removenode_force");
try {
const auto& tm = ss.get_token_metadata();
if (!ss._replicating_nodes.empty() || !tm.get_leaving_endpoints().empty()) {
auto leaving = tm.get_leaving_endpoints();
slogger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving);
for (auto endpoint : leaving) {
utils::UUID host_id;
auto tokens = tm.get_tokens(endpoint);
try {
host_id = tm.get_host_id(endpoint);
} catch (...) {
slogger.warn("No host_id is found for endpoint {}", endpoint);
continue;
}
ss._replicating_nodes.clear();
ss._removing_node = std::nullopt;
} else {
slogger.warn("No tokens to force removal on, call 'removenode' first");
co_await ss._gossiper.advertise_token_removed(endpoint, host_id);
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
co_await ss.excise(tokens_set, endpoint);
co_await ss._group0->leave_group0(endpoint);
}
ss._operation_in_progress = {};
} catch (...) {
ss._operation_in_progress = {};
throw;
ss._replicating_nodes.clear();
ss._removing_node = std::nullopt;
} else {
slogger.warn("No tokens to force removal on, call 'removenode' first");
}
});
ss._operation_in_progress = {};
} catch (...) {
ss._operation_in_progress = {};
throw;
}
});
}
@@ -3603,9 +3598,9 @@ shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
return _abort_source;
}
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
future<> storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3613,9 +3608,9 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
}
}
void storage_service::node_ops_done(utils::UUID ops_uuid) {
future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3624,18 +3619,18 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
}
}
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.abort().get();
co_await meta.abort();
auto as = meta.get_abort_source();
if (as && !as->abort_requested()) {
as->request_abort();
}
_repair.local().abort_repair_node_ops(ops_uuid).get();
co_await _repair.local().abort_repair_node_ops(ops_uuid);
_node_ops.erase(it);
}
}
@@ -3647,26 +3642,24 @@ void storage_service::node_ops_singal_abort(std::optional<utils::UUID> ops_uuid)
}
future<> storage_service::node_ops_abort_thread() {
return seastar::async([this] {
slogger.info("Started node_ops_abort_thread");
for (;;) {
_node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get();
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
return;
}
try {
storage_service::node_ops_abort(*uuid_opt);
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
slogger.info("Started node_ops_abort_thread");
for (;;) {
co_await _node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); });
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
co_return;
}
try {
co_await node_ops_abort(*uuid_opt);
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
}
slogger.info("Stopped node_ops_abort_thread");
});
}
slogger.info("Stopped node_ops_abort_thread");
}
future<> storage_service::join_group0() {

View File

@@ -163,9 +163,9 @@ private:
seastar::condition_variable _node_ops_abort_cond;
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
future<> _node_ops_abort_thread;
void node_ops_update_heartbeat(utils::UUID ops_uuid);
void node_ops_done(utils::UUID ops_uuid);
void node_ops_abort(utils::UUID ops_uuid);
future<> node_ops_update_heartbeat(utils::UUID ops_uuid);
future<> node_ops_done(utils::UUID ops_uuid);
future<> node_ops_abort(utils::UUID ops_uuid);
void node_ops_singal_abort(std::optional<utils::UUID> ops_uuid);
future<> node_ops_abort_thread();
public:
@@ -387,12 +387,12 @@ private:
std::optional<gms::inet_address> get_replace_address();
bool is_replacing();
bool is_first_node();
void prepare_to_join(
future<> prepare_to_join(
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features);
void join_token_ring(std::chrono::milliseconds);
void start_sys_dist_ks();
future<> start_sys_dist_ks();
public:
future<> rebuild(sstring source_dc);
@@ -636,10 +636,10 @@ private:
*/
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
void removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
// needs to be modified to accept either a keyspace or ARS.
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
public:
@@ -704,7 +704,7 @@ private:
*/
future<> start_leaving();
future<> leave_ring();
void unbootstrap();
future<> unbootstrap();
public:
future<> move(sstring new_token) {
@@ -748,7 +748,7 @@ public:
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
future<mode> get_operation_mode();