Merge 'Coroutinize some storage_service member functions' from Pavel Solodovnikov
These trivial changes are mostly intended to reduce the use of `seastar::async`. Closes #10416 * github.com:scylladb/scylla: service: storage_service: coroutinize `start_gossiping()` service: storage_service: coroutinize `node_ops_cmd_heartbeat_updater()` service: storage_service: coroutinize `node_ops_abort_thread()` service: storage_service: coroutinize `node_ops_abort()` service: storage_service: coroutinize `node_ops_done()` service: storage_service: coroutinize `node_ops_update_heartbeat()` service: storage_service: coroutinize `force_remove_completion()` service: storage_service: coroutinize `start_leaving()` service: storage_service: coroutinize `start_sys_dist_ks()` service: storage_service: coroutinize `prepare_to_join()` service: storage_service: coroutinize `removenode_add_ranges()` service: storage_service: coroutinize `unbootstrap()` service: storage_service: coroutinize `get_changed_ranges_for_leaving()`
This commit is contained in:
@@ -222,8 +222,7 @@ future<> storage_service::snitch_reconfigured() {
|
||||
return update_topology(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void storage_service::prepare_to_join(
|
||||
future<> storage_service::prepare_to_join(
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features) {
|
||||
@@ -231,7 +230,7 @@ void storage_service::prepare_to_join(
|
||||
if (_sys_ks.local().was_decommissioned()) {
|
||||
if (_db.local().get_config().override_decommission()) {
|
||||
slogger.warn("This node was decommissioned, but overriding by operator request.");
|
||||
_sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
|
||||
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
|
||||
} else {
|
||||
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set,"
|
||||
"or all existing data is removed and the node is bootstrapped again");
|
||||
@@ -242,13 +241,13 @@ void storage_service::prepare_to_join(
|
||||
|
||||
bool replacing_a_node_with_same_ip = false;
|
||||
bool replacing_a_node_with_diff_ip = false;
|
||||
auto tmlock = std::make_unique<token_metadata_lock>(get_token_metadata_lock().get0());
|
||||
auto tmptr = get_mutable_token_metadata_ptr().get0();
|
||||
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
if (is_replacing()) {
|
||||
if (_sys_ks.local().bootstrap_complete()) {
|
||||
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
|
||||
}
|
||||
_bootstrap_tokens = prepare_replacement_info(initial_contact_nodes, loaded_peer_features).get0();
|
||||
_bootstrap_tokens = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
|
||||
auto replace_address = get_replace_address();
|
||||
replacing_a_node_with_same_ip = *replace_address == get_broadcast_address();
|
||||
replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address();
|
||||
@@ -256,18 +255,18 @@ void storage_service::prepare_to_join(
|
||||
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
|
||||
get_broadcast_address() == *replace_address ? "the same" : "a different",
|
||||
get_broadcast_address(), *replace_address);
|
||||
tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address).get();
|
||||
co_await tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address);
|
||||
} else if (should_bootstrap()) {
|
||||
check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features).get();
|
||||
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
|
||||
} else {
|
||||
auto local_features = _feature_service.known_feature_set();
|
||||
slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes);
|
||||
_gossiper.do_shadow_round(initial_contact_nodes).get();
|
||||
co_await _gossiper.do_shadow_round(initial_contact_nodes);
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
_gossiper.check_snitch_name_matches();
|
||||
_gossiper.reset_endpoint_state_map().get();
|
||||
co_await _gossiper.reset_endpoint_state_map();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
_gossiper.add_saved_endpoint(ep).get();
|
||||
co_await _gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
}
|
||||
auto features = _feature_service.supported_feature_set();
|
||||
@@ -275,19 +274,19 @@ void storage_service::prepare_to_join(
|
||||
// Save the advertised feature set to system.local table after
|
||||
// all remote feature checks are complete and after gossip shadow rounds are done.
|
||||
// At this point, the final feature set is already determined before the node joins the ring.
|
||||
db::system_keyspace::save_local_supported_features(features).get0();
|
||||
co_await db::system_keyspace::save_local_supported_features(features);
|
||||
|
||||
// If this is a restarting node, we should update tokens before gossip starts
|
||||
auto my_tokens = db::system_keyspace::get_saved_tokens().get0();
|
||||
auto my_tokens = co_await db::system_keyspace::get_saved_tokens();
|
||||
bool restarting_normal_node = _sys_ks.local().bootstrap_complete() && !is_replacing() && !my_tokens.empty();
|
||||
if (restarting_normal_node) {
|
||||
slogger.info("Restarting a node in NORMAL status");
|
||||
// This node must know about its chosen tokens before other nodes do
|
||||
// since they may start sending writes to this node after it gossips status = NORMAL.
|
||||
// Therefore we update _token_metadata now, before gossip starts.
|
||||
tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get();
|
||||
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
|
||||
|
||||
_cdc_gen_id = db::system_keyspace::get_cdc_generation_id().get0();
|
||||
_cdc_gen_id = co_await db::system_keyspace::get_cdc_generation_id();
|
||||
if (!_cdc_gen_id) {
|
||||
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
|
||||
// unless we are restarting after upgrading from non-CDC supported version.
|
||||
@@ -314,13 +313,13 @@ void storage_service::prepare_to_join(
|
||||
// Replicate the tokens early because once gossip runs other nodes
|
||||
// might send reads/writes to this node. Replicate it early to make
|
||||
// sure the tokens are valid on all the shards.
|
||||
replicate_to_all_cores(std::move(tmptr)).get();
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
tmlock.reset();
|
||||
|
||||
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
|
||||
auto& proxy = service::get_storage_proxy();
|
||||
// Ensure we know our own actual Schema UUID in preparation for updates
|
||||
db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service).get0();
|
||||
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
|
||||
app_states.emplace(gms::application_state::NET_VERSION, versioned_value::network_version());
|
||||
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
|
||||
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
|
||||
@@ -352,14 +351,14 @@ void storage_service::prepare_to_join(
|
||||
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
|
||||
auto generation_number = co_await db::system_keyspace::increment_and_get_generation();
|
||||
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
|
||||
_gossiper.start_gossiping(generation_number, app_states, advertise).get();
|
||||
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
|
||||
}
|
||||
|
||||
void storage_service::start_sys_dist_ks() {
|
||||
future<> storage_service::start_sys_dist_ks() {
|
||||
supervisor::notify("starting system distributed keyspace");
|
||||
_sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get();
|
||||
co_await _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
|
||||
}
|
||||
|
||||
/* Broadcasts the chosen tokens through gossip,
|
||||
@@ -490,12 +489,12 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
|
||||
slogger.info("Replacing a node with token(s): {}", _bootstrap_tokens);
|
||||
// _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node
|
||||
}
|
||||
start_sys_dist_ks();
|
||||
start_sys_dist_ks().get();
|
||||
mark_existing_views_as_built();
|
||||
_sys_ks.local().update_tokens(_bootstrap_tokens).get();
|
||||
bootstrap(); // blocks until finished
|
||||
} else {
|
||||
start_sys_dist_ks();
|
||||
start_sys_dist_ks().get();
|
||||
_bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0();
|
||||
if (_bootstrap_tokens.empty()) {
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
|
||||
@@ -1362,7 +1361,7 @@ future<> storage_service::init_server(cql3::query_processor& qp) {
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features));
|
||||
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features)).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1750,24 +1749,28 @@ future<bool> storage_service::is_gossip_running() {
|
||||
}
|
||||
|
||||
future<> storage_service::start_gossiping() {
|
||||
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
if (!ss._gossiper.is_enabled()) {
|
||||
slogger.warn("Starting gossip by operator request");
|
||||
ss._gossiper.container().invoke_on_all(&gms::gossiper::start).get();
|
||||
auto undo = defer([&ss] { ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).get(); });
|
||||
auto cdc_gen_ts = db::system_keyspace::get_cdc_generation_id().get0();
|
||||
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) -> future<> {
|
||||
if (!ss._gossiper.is_enabled()) {
|
||||
slogger.warn("Starting gossip by operator request");
|
||||
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::start);
|
||||
bool should_stop_gossiper = false; // undo action
|
||||
try {
|
||||
auto cdc_gen_ts = co_await db::system_keyspace::get_cdc_generation_id();
|
||||
if (!cdc_gen_ts) {
|
||||
cdc_log.warn("CDC generation timestamp missing when starting gossip");
|
||||
}
|
||||
set_gossip_tokens(ss._gossiper,
|
||||
db::system_keyspace::get_local_tokens().get0(),
|
||||
cdc_gen_ts).get();
|
||||
co_await set_gossip_tokens(ss._gossiper,
|
||||
co_await db::system_keyspace::get_local_tokens(),
|
||||
cdc_gen_ts);
|
||||
ss._gossiper.force_newer_generation();
|
||||
ss._gossiper.start_gossiping(utils::get_generation_number()).get();
|
||||
undo.cancel();
|
||||
co_await ss._gossiper.start_gossiping(utils::get_generation_number());
|
||||
} catch (...) {
|
||||
should_stop_gossiper = true;
|
||||
}
|
||||
});
|
||||
if (should_stop_gossiper) {
|
||||
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1789,38 +1792,36 @@ future<> storage_service::do_stop_ms() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
|
||||
return seastar::async([this, cmd, uuid, nodes = std::move(nodes), heartbeat_updater_done] {
|
||||
std::string ops;
|
||||
if (cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
ops = "decommission";
|
||||
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
ops = "removenode";
|
||||
} else if (cmd == node_ops_cmd::replace_heartbeat) {
|
||||
ops = "replace";
|
||||
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
ops = "bootstrap";
|
||||
} else {
|
||||
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
|
||||
future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
|
||||
std::string ops;
|
||||
if (cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
ops = "decommission";
|
||||
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
ops = "removenode";
|
||||
} else if (cmd == node_ops_cmd::replace_heartbeat) {
|
||||
ops = "replace";
|
||||
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
ops = "bootstrap";
|
||||
} else {
|
||||
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
|
||||
}
|
||||
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
|
||||
while (!(*heartbeat_updater_done)) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
|
||||
co_await parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
|
||||
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
|
||||
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
|
||||
});
|
||||
int nr_seconds = 10;
|
||||
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
||||
co_await sleep(std::chrono::seconds(1));
|
||||
}
|
||||
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
|
||||
while (!(*heartbeat_updater_done)) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
|
||||
parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
|
||||
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
|
||||
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
|
||||
}).get();
|
||||
int nr_seconds = 10;
|
||||
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
||||
sleep(std::chrono::seconds(1)).get();
|
||||
}
|
||||
}
|
||||
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
|
||||
});
|
||||
}
|
||||
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
|
||||
}
|
||||
|
||||
future<> storage_service::decommission() {
|
||||
@@ -1906,7 +1907,7 @@ future<> storage_service::decommission() {
|
||||
|
||||
// Step 5: Start to sync data
|
||||
slogger.info("DECOMMISSIONING: unbootstrap starts");
|
||||
ss.unbootstrap();
|
||||
ss.unbootstrap().get();
|
||||
slogger.info("DECOMMISSIONING: unbootstrap done");
|
||||
|
||||
// Step 6: Finish
|
||||
@@ -2400,10 +2401,10 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::removenode_done) {
|
||||
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
node_ops_done(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it == _node_ops.end()) {
|
||||
@@ -2421,7 +2422,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
node_ops_abort(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
@@ -2449,7 +2450,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::decommission_done) {
|
||||
slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion");
|
||||
@@ -2458,9 +2459,9 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
table->trigger_offstrategy_compaction();
|
||||
}
|
||||
}).get();
|
||||
node_ops_done(ops_uuid);
|
||||
node_ops_done(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::decommission_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
node_ops_abort(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare) {
|
||||
// Mark the replacing node as replacing
|
||||
if (req.replace_nodes.size() > 1) {
|
||||
@@ -2509,12 +2510,12 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
|
||||
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_done) {
|
||||
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
node_ops_done(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
node_ops_abort(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_prepare) {
|
||||
// Mark the bootstrap node as bootstrapping
|
||||
if (req.bootstrap_nodes.size() > 1) {
|
||||
@@ -2547,12 +2548,12 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
node_ops_update_heartbeat(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_done) {
|
||||
slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
node_ops_done(ops_uuid).get();
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
node_ops_abort(ops_uuid).get();
|
||||
} else {
|
||||
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, req.cmd);
|
||||
slogger.warn("{}", msg);
|
||||
@@ -2631,8 +2632,7 @@ int32_t storage_service::get_exception_count() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
std::unordered_multimap<dht::token_range, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
|
||||
future<std::unordered_multimap<dht::token_range, inet_address>> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
|
||||
// First get all ranges the leaving endpoint is responsible for
|
||||
auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint);
|
||||
|
||||
@@ -2647,10 +2647,10 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = erm->get_natural_endpoints(end_token);
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
seastar::thread::maybe_yield();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
auto temp = get_token_metadata_ptr()->clone_after_all_left().get0();
|
||||
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
|
||||
|
||||
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
|
||||
// command was used), it is still present in temp and must be removed.
|
||||
@@ -2668,7 +2668,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
for (auto& r : ranges) {
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto new_replica_endpoints = rs.calculate_natural_endpoints(end_token, temp).get0();
|
||||
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
|
||||
|
||||
auto rg = current_replica_endpoints.equal_range(r);
|
||||
for (auto it = rg.first; it != rg.second; it++) {
|
||||
@@ -2694,24 +2694,23 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
}
|
||||
// Replication strategy doesn't necessarily yield in calculate_natural_endpoints.
|
||||
// E.g. everywhere_replication_strategy
|
||||
seastar::thread::maybe_yield();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
temp.clear_gently().get();
|
||||
co_await temp.clear_gently();
|
||||
|
||||
return changed_ranges;
|
||||
co_return changed_ranges;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void storage_service::unbootstrap() {
|
||||
get_batchlog_manager().local().do_batch_log_replay().get();
|
||||
future<> storage_service::unbootstrap() {
|
||||
co_await get_batchlog_manager().local().do_batch_log_replay();
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::decommission)) {
|
||||
_repair.local().decommission_with_repair(get_token_metadata_ptr()).get();
|
||||
co_await _repair.local().decommission_with_repair(get_token_metadata_ptr());
|
||||
} else {
|
||||
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
|
||||
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
|
||||
auto ranges_mm = co_await get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
|
||||
if (slogger.is_enabled(logging::log_level::debug)) {
|
||||
std::vector<range<token>> ranges;
|
||||
for (auto& x : ranges_mm) {
|
||||
@@ -2728,29 +2727,28 @@ void storage_service::unbootstrap() {
|
||||
// Wait for batch log to complete before streaming hints.
|
||||
slogger.debug("waiting for batch log processing.");
|
||||
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
|
||||
get_batchlog_manager().local().do_batch_log_replay().get();
|
||||
co_await get_batchlog_manager().local().do_batch_log_replay();
|
||||
|
||||
slogger.info("streaming hints to other nodes");
|
||||
|
||||
// wait for the transfer runnables to signal the latch.
|
||||
slogger.debug("waiting for stream acks.");
|
||||
try {
|
||||
stream_success.get();
|
||||
co_await std::move(stream_success);
|
||||
} catch (...) {
|
||||
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
slogger.debug("stream acks all received.");
|
||||
}
|
||||
leave_ring().get();
|
||||
co_await leave_ring();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node) {
|
||||
future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node) {
|
||||
auto my_address = get_broadcast_address();
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node);
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = co_await get_changed_ranges_for_leaving(keyspace_name, leaving_node);
|
||||
dht::token_range_vector my_new_ranges;
|
||||
for (auto& x : changed_ranges) {
|
||||
if (x.second == my_address) {
|
||||
@@ -2784,7 +2782,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, leaving_node);
|
||||
removenode_add_ranges(streamer, leaving_node).get();
|
||||
try {
|
||||
streamer->stream_async().get();
|
||||
} catch (...) {
|
||||
@@ -2811,7 +2809,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, endpoint);
|
||||
removenode_add_ranges(streamer, endpoint).get();
|
||||
auto status_checker = seastar::async([this, endpoint, &as] {
|
||||
slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint);
|
||||
while (!as.abort_requested()) {
|
||||
@@ -2971,12 +2969,11 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
|
||||
}
|
||||
|
||||
future<> storage_service::start_leaving() {
|
||||
return _gossiper.add_local_application_state(application_state::STATUS, versioned_value::leaving(db::system_keyspace::get_local_tokens().get0())).then([this] {
|
||||
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), format("start_leaving {}", endpoint));
|
||||
});
|
||||
co_await _gossiper.add_local_application_state(application_state::STATUS, versioned_value::leaving(co_await db::system_keyspace::get_local_tokens()));
|
||||
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), format("start_leaving {}", endpoint));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3274,50 +3271,48 @@ future<sstring> storage_service::get_removal_status() {
|
||||
}
|
||||
|
||||
future<> storage_service::force_remove_completion() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
while (!ss._operation_in_progress.empty()) {
|
||||
if (ss._operation_in_progress != sstring("removenode")) {
|
||||
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
|
||||
}
|
||||
|
||||
// This flag will make removenode stop waiting for the confirmation,
|
||||
// wait it to complete
|
||||
slogger.info("Operation removenode is in progress, wait for it to complete");
|
||||
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
|
||||
return run_with_no_api_lock([] (storage_service& ss) -> future<> {
|
||||
while (!ss._operation_in_progress.empty()) {
|
||||
if (ss._operation_in_progress != sstring("removenode")) {
|
||||
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
|
||||
}
|
||||
ss._operation_in_progress = sstring("removenode_force");
|
||||
|
||||
try {
|
||||
const auto& tm = ss.get_token_metadata();
|
||||
if (!ss._replicating_nodes.empty() || !tm.get_leaving_endpoints().empty()) {
|
||||
auto leaving = tm.get_leaving_endpoints();
|
||||
slogger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving);
|
||||
for (auto endpoint : leaving) {
|
||||
utils::UUID host_id;
|
||||
auto tokens = tm.get_tokens(endpoint);
|
||||
try {
|
||||
host_id = tm.get_host_id(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("No host_id is found for endpoint {}", endpoint);
|
||||
continue;
|
||||
}
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
|
||||
ss.excise(tokens_set, endpoint).get();
|
||||
ss._group0->leave_group0(endpoint).get();
|
||||
// This flag will make removenode stop waiting for the confirmation,
|
||||
// wait it to complete
|
||||
slogger.info("Operation removenode is in progress, wait for it to complete");
|
||||
co_await sleep_abortable(std::chrono::seconds(1), ss._abort_source);
|
||||
}
|
||||
ss._operation_in_progress = sstring("removenode_force");
|
||||
|
||||
try {
|
||||
const auto& tm = ss.get_token_metadata();
|
||||
if (!ss._replicating_nodes.empty() || !tm.get_leaving_endpoints().empty()) {
|
||||
auto leaving = tm.get_leaving_endpoints();
|
||||
slogger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving);
|
||||
for (auto endpoint : leaving) {
|
||||
utils::UUID host_id;
|
||||
auto tokens = tm.get_tokens(endpoint);
|
||||
try {
|
||||
host_id = tm.get_host_id(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("No host_id is found for endpoint {}", endpoint);
|
||||
continue;
|
||||
}
|
||||
ss._replicating_nodes.clear();
|
||||
ss._removing_node = std::nullopt;
|
||||
} else {
|
||||
slogger.warn("No tokens to force removal on, call 'removenode' first");
|
||||
co_await ss._gossiper.advertise_token_removed(endpoint, host_id);
|
||||
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
|
||||
co_await ss.excise(tokens_set, endpoint);
|
||||
co_await ss._group0->leave_group0(endpoint);
|
||||
}
|
||||
ss._operation_in_progress = {};
|
||||
} catch (...) {
|
||||
ss._operation_in_progress = {};
|
||||
throw;
|
||||
ss._replicating_nodes.clear();
|
||||
ss._removing_node = std::nullopt;
|
||||
} else {
|
||||
slogger.warn("No tokens to force removal on, call 'removenode' first");
|
||||
}
|
||||
});
|
||||
ss._operation_in_progress = {};
|
||||
} catch (...) {
|
||||
ss._operation_in_progress = {};
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3603,9 +3598,9 @@ shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
|
||||
return _abort_source;
|
||||
}
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
future<> storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3613,9 +3608,9 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3624,18 +3619,18 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.abort().get();
|
||||
co_await meta.abort();
|
||||
auto as = meta.get_abort_source();
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
_repair.local().abort_repair_node_ops(ops_uuid).get();
|
||||
co_await _repair.local().abort_repair_node_ops(ops_uuid);
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -3647,26 +3642,24 @@ void storage_service::node_ops_singal_abort(std::optional<utils::UUID> ops_uuid)
|
||||
}
|
||||
|
||||
future<> storage_service::node_ops_abort_thread() {
|
||||
return seastar::async([this] {
|
||||
slogger.info("Started node_ops_abort_thread");
|
||||
for (;;) {
|
||||
_node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get();
|
||||
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storage_service::node_ops_abort(*uuid_opt);
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
slogger.info("Started node_ops_abort_thread");
|
||||
for (;;) {
|
||||
co_await _node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); });
|
||||
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await node_ops_abort(*uuid_opt);
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
}
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
});
|
||||
}
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
}
|
||||
|
||||
future<> storage_service::join_group0() {
|
||||
|
||||
@@ -163,9 +163,9 @@ private:
|
||||
seastar::condition_variable _node_ops_abort_cond;
|
||||
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
|
||||
future<> _node_ops_abort_thread;
|
||||
void node_ops_update_heartbeat(utils::UUID ops_uuid);
|
||||
void node_ops_done(utils::UUID ops_uuid);
|
||||
void node_ops_abort(utils::UUID ops_uuid);
|
||||
future<> node_ops_update_heartbeat(utils::UUID ops_uuid);
|
||||
future<> node_ops_done(utils::UUID ops_uuid);
|
||||
future<> node_ops_abort(utils::UUID ops_uuid);
|
||||
void node_ops_singal_abort(std::optional<utils::UUID> ops_uuid);
|
||||
future<> node_ops_abort_thread();
|
||||
public:
|
||||
@@ -387,12 +387,12 @@ private:
|
||||
std::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
bool is_first_node();
|
||||
void prepare_to_join(
|
||||
future<> prepare_to_join(
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features);
|
||||
void join_token_ring(std::chrono::milliseconds);
|
||||
void start_sys_dist_ks();
|
||||
future<> start_sys_dist_ks();
|
||||
public:
|
||||
|
||||
future<> rebuild(sstring source_dc);
|
||||
@@ -636,10 +636,10 @@ private:
|
||||
*/
|
||||
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
|
||||
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
|
||||
void removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
|
||||
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
|
||||
|
||||
// needs to be modified to accept either a keyspace or ARS.
|
||||
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
|
||||
public:
|
||||
|
||||
@@ -704,7 +704,7 @@ private:
|
||||
*/
|
||||
future<> start_leaving();
|
||||
future<> leave_ring();
|
||||
void unbootstrap();
|
||||
future<> unbootstrap();
|
||||
|
||||
public:
|
||||
future<> move(sstring new_token) {
|
||||
@@ -748,7 +748,7 @@ public:
|
||||
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
|
||||
future<mode> get_operation_mode();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user