diff --git a/service/storage_service.cc b/service/storage_service.cc index aeb25e4fb0..67bbe416cd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2931,11 +2931,11 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { auto ops_uuid = node_ops_id::create_random_id(); auto ops = seastar::make_shared(ops_uuid, nullptr, std::list()); - return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () { - return send_replication_notification(notify_endpoint); - }); + auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops)); + co_await send_replication_notification(notify_endpoint); + co_return co_await std::move(f); } - return seastar::async([this, endpoint, notify_endpoint] { + auto tmptr = get_token_metadata_ptr(); abort_source as; auto sub = _abort_source.subscribe([&as] () noexcept { @@ -2968,14 +2968,14 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr auto status_checker = check_status_loop(); std::exception_ptr ex; try { - streamer->stream_async().get(); + co_await streamer->stream_async(); } catch (...) { ex = std::current_exception(); slogger.debug("Streaming to restore replica count failed: {}.", ex); // We still want to send the notification } try { - this->send_replication_notification(notify_endpoint).get(); + co_await this->send_replication_notification(notify_endpoint); } catch (...) { auto ex2 = std::current_exception(); slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2); @@ -2988,7 +2988,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr if (!as.abort_requested()) { as.request_abort(); } - status_checker.get(); + co_await std::move(status_checker); } catch (const seastar::sleep_aborted& ignored) { slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored); } catch (...) { @@ -2997,9 +2997,8 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr } slogger.info("restore_replica_count: Finished to stop status checker for removing node {}", endpoint); if (ex) { - std::rethrow_exception(std::move(ex)); + co_await coroutine::return_exception_ptr(std::move(ex)); } - }); } future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint) {