storage_service: Serialize rebuild

Do not hold the api lock while streaming the data since it might take a
long time, so we need to reconcile other operations while we are in the
middle of rebuild.
This commit is contained in:
Asias He
2015-10-29 08:58:11 +08:00
parent cc6d4e0fc9
commit c030e6713b

View File

@@ -1778,27 +1778,31 @@ future<std::map<sstring, sstring>> storage_service::get_load_map() {
future<> storage_service::rebuild(sstring source_dc) {
logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _token_metadata, get_broadcast_address(), "Rebuild");
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
}
for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) {
streamer->add_ranges(keyspace_name, get_local_ranges(keyspace_name));
}
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
logger.error("Error while rebuilding node: {}", std::current_exception());
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
}
return make_ready_future<>();
return run_with_no_api_lock([source_dc] (storage_service& ss) {
return with_lock(ss.api_lock().for_write(), [&ss, source_dc] {
logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild");
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
}
for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) {
streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
}
return streamer;
}).then([] (auto streamer) mutable {
// FIXME: reconcile other operations while we are in the middle of rebuild since rebuild might take a long time.
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
logger.error("Error while rebuilding node: {}", std::current_exception());
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
}
return make_ready_future<>();
});
});
});
}