storage_service: coroutinize restore_replica_count

and unwrap the async thread started for streaming.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-01-02 17:57:17 +02:00
parent d1eadc39c1
commit afece5bdc4

View File

@@ -2931,11 +2931,11 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = node_ops_id::create_random_id();
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops));
co_await send_replication_notification(notify_endpoint);
co_return co_await std::move(f);
}
return seastar::async([this, endpoint, notify_endpoint] {
auto tmptr = get_token_metadata_ptr();
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
@@ -2968,14 +2968,14 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
auto status_checker = check_status_loop();
std::exception_ptr ex;
try {
streamer->stream_async().get();
co_await streamer->stream_async();
} catch (...) {
ex = std::current_exception();
slogger.debug("Streaming to restore replica count failed: {}.", ex);
// We still want to send the notification
}
try {
this->send_replication_notification(notify_endpoint).get();
co_await this->send_replication_notification(notify_endpoint);
} catch (...) {
auto ex2 = std::current_exception();
slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2);
@@ -2988,7 +2988,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
if (!as.abort_requested()) {
as.request_abort();
}
status_checker.get();
co_await std::move(status_checker);
} catch (const seastar::sleep_aborted& ignored) {
slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored);
} catch (...) {
@@ -2997,9 +2997,8 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
}
slogger.info("restore_replica_count: Finished to stop status checker for removing node {}", endpoint);
if (ex) {
std::rethrow_exception(std::move(ex));
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
}
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint) {