From c030e6713bdea3ba1f418a1eecbca6f39fa92d87 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 29 Oct 2015 08:58:11 +0800 Subject: [PATCH] storage_service: Serialize rebuild Do not hold the api lock while streaming the data since it might take a long time, so we need to reconcile other operations while we are in the middle of rebuild. --- service/storage_service.cc | 46 +++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index c3c7afa9ea..eb9b60947a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1778,27 +1778,31 @@ future> storage_service::get_load_map() { future<> storage_service::rebuild(sstring source_dc) { - logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - auto streamer = make_lw_shared(_db, _token_metadata, get_broadcast_address(), "Rebuild"); - streamer->add_source_filter(std::make_unique(gms::get_local_failure_detector())); - - if (source_dc != "") { - streamer->add_source_filter(std::make_unique(source_dc)); - } - - for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) { - streamer->add_ranges(keyspace_name, get_local_ranges(keyspace_name)); - } - - return streamer->fetch_async().then_wrapped([streamer] (auto&& f) { - try { - auto state = f.get0(); - } catch (...) { - // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - logger.error("Error while rebuilding node: {}", std::current_exception()); - throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception())); - } - return make_ready_future<>(); + return run_with_no_api_lock([source_dc] (storage_service& ss) { + return with_lock(ss.api_lock().for_write(), [&ss, source_dc] { + logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild"); + streamer->add_source_filter(std::make_unique(gms::get_local_failure_detector())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) { + streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); + } + return streamer; + }).then([] (auto streamer) mutable { + // FIXME: reconcile other operations while we are in the middle of rebuild since rebuild might take a long time. + return streamer->fetch_async().then_wrapped([streamer] (auto&& f) { + try { + auto state = f.get0(); + } catch (...) { + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + logger.error("Error while rebuilding node: {}", std::current_exception()); + throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception())); + } + return make_ready_future<>(); + }); + }); }); }