storage_service: Use sleep_abortable instead of sleep (#4697)

Make the sleep abortable so that it is able to break the loop during
shutdown.

Fixes #4885
This commit is contained in:
Asias He
2019-08-26 18:35:44 +08:00
committed by Avi Kivity
parent b60d201a11
commit 3ea1255020

View File

@@ -635,13 +635,13 @@ void storage_service::join_token_ring(int delay) {
auto& gossiper = gms::get_gossiper().local();
// first sleep the delay to make sure we see *at least* one other node
for (int i = 0; i < delay && gossiper.get_live_members().size() < 2; i += 1000) {
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
}
// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
while (!get_local_migration_manager().have_schema_agreement()) {
set_mode(mode::JOINING, "waiting for schema information to complete", true);
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
}
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
set_mode(mode::JOINING, "waiting for pending range calculation", true);
@@ -659,7 +659,7 @@ void storage_service::join_token_ring(int delay) {
_token_metadata.get_leaving_endpoints().size(),
elapsed);
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
if (gms::gossiper::clk::now() > t + std::chrono::seconds(60)) {
throw std::runtime_error("Other bootstrapping/leaving nodes detected, cannot bootstrap while consistent_rangemovement is true");
@@ -668,7 +668,7 @@ void storage_service::join_token_ring(int delay) {
// Check the schema and pending range again
while (!get_local_migration_manager().have_schema_agreement()) {
set_mode(mode::JOINING, "waiting for schema information to complete", true);
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
}
update_pending_ranges().get();
}
@@ -685,7 +685,7 @@ void storage_service::join_token_ring(int delay) {
if (replace_addr && *replace_addr != get_broadcast_address()) {
// Sleep additionally to make sure that the server actually is not alive
// and giving it more time to gossip if alive.
sleep(service::load_broadcaster::BROADCAST_INTERVAL).get();
sleep_abortable(service::load_broadcaster::BROADCAST_INTERVAL, _abort_source).get();
// check for operator errors...
for (auto token : _bootstrap_tokens) {
@@ -701,7 +701,7 @@ void storage_service::join_token_ring(int delay) {
}
}
} else {
sleep(get_ring_delay()).get();
sleep_abortable(get_ring_delay(), _abort_source).get();
}
std::stringstream ss;
ss << _bootstrap_tokens;
@@ -1723,7 +1723,7 @@ future<> storage_service::check_for_endpoint_collision(const std::unordered_map<
found_bootstrapping_node = true;
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(gms::gossiper::clk::now() - t).count();
slogger.info("Checking bootstrapping/leaving/moving nodes: node={}, status={}, sleep 1 second and check again ({} seconds elapsed) (check_for_endpoint_collision)", addr, state, elapsed);
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
break;
}
}
@@ -2390,7 +2390,7 @@ future<> storage_service::decommission() {
// FIXME: long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout());
auto timeout = ss.get_ring_delay();
ss.set_mode(mode::LEAVING, format("sleeping {} ms for batch processing and pending range setup", timeout.count()), true);
sleep(timeout).get();
sleep_abortable(timeout, ss._abort_source).get();
slogger.info("DECOMMISSIONING: unbootstrap starts");
ss.unbootstrap();
@@ -2495,7 +2495,7 @@ future<> storage_service::removenode(sstring host_id_string) {
// wait for ReplicationFinishedVerbHandler to signal we're done
while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) {
sleep(std::chrono::milliseconds(100)).get();
sleep_abortable(std::chrono::milliseconds(100), ss._abort_source).get();
}
if (ss._force_remove_completion) {
@@ -2887,7 +2887,7 @@ void storage_service::leave_ring() {
_gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens().get0(), expire_time)).get();
auto delay = std::max(get_ring_delay(), gms::gossiper::INTERVAL);
slogger.info("Announcing that I have left the ring for {}ms", delay.count());
sleep(delay).get();
sleep_abortable(delay, _abort_source).get();
}
future<>
@@ -3265,7 +3265,7 @@ future<> storage_service::force_remove_completion() {
while (!ss._operation_in_progress.empty()) {
// Wait removenode operation to complete
slogger.info("Operation {} is in progress, wait for it to complete", ss._operation_in_progress);
sleep(std::chrono::seconds(1)).get();
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
}
ss._force_remove_completion = false;
}