diff --git a/service/storage_service.cc b/service/storage_service.cc index 71c57b300e..9245bb4d04 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -395,7 +395,7 @@ void storage_service::join_token_ring(int delay) { } future<> storage_service::join_ring() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("join_ring"), [] (storage_service& ss) { return seastar::async([&ss] { if (!ss._joined) { logger.info("Joining ring by operator request"); @@ -418,7 +418,7 @@ future<> storage_service::join_ring() { } future storage_service::is_joined() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { return ss._joined && !ss._is_survey_mode; }); } @@ -1272,7 +1272,7 @@ future> storage_service::prepare_replacement_info() { } future> storage_service::get_ownership() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { auto token_map = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens()); // describeOwnership returns tokens in an unspecified order, let's re-order them std::map ownership; @@ -1286,7 +1286,7 @@ future> storage_service::get_ownership() { } future> storage_service::effective_ownership(sstring keyspace_name) { - return run_with_read_api_lock([keyspace_name] (storage_service& ss) mutable { + return run_with_no_api_lock([keyspace_name] (storage_service& ss) mutable { if (keyspace_name != "") { //find throws no such keyspace if it is missing const keyspace& ks = ss._db.local().find_keyspace(keyspace_name); @@ -1379,27 +1379,27 @@ sstring storage_service::get_schema_version() { } future storage_service::get_operation_mode() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { auto mode = ss._operation_mode; return make_ready_future(sprint("%s", mode)); }); } future storage_service::is_starting() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { auto mode = ss._operation_mode; return mode == storage_service::mode::STARTING; }); } future storage_service::is_gossip_running() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { return gms::get_local_gossiper().is_enabled(); }); } future<> storage_service::start_gossiping() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) { return seastar::async([&ss] { if (!ss._initialized) { logger.warn("Starting gossip by operator request"); @@ -1414,7 +1414,7 @@ future<> storage_service::start_gossiping() { } future<> storage_service::stop_gossiping() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) { if (ss._initialized) { logger.warn("Stopping gossip by operator request"); return gms::get_local_gossiper().stop_gossiping().then([&ss] { @@ -1585,7 +1585,7 @@ future storage_service::true_snapshots_size() { } future<> storage_service::start_rpc_server() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("start_rpc_server"), [] (storage_service& ss) { if (ss._thrift_server) { return make_ready_future<>(); } @@ -1625,19 +1625,19 @@ future<> storage_service::do_stop_rpc_server() { } future<> storage_service::stop_rpc_server() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("stop_rpc_server"), [] (storage_service& ss) { return ss.do_stop_rpc_server(); }); } future storage_service::is_rpc_server_running() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { return bool(ss._thrift_server); }); } future<> storage_service::start_native_transport() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("start_native_transport"), [] (storage_service& ss) { if (ss._cql_server) { return make_ready_future<>(); } @@ -1690,19 +1690,19 @@ future<> storage_service::do_stop_native_transport() { } future<> storage_service::stop_native_transport() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("stop_native_transport"), [] (storage_service& ss) { return ss.do_stop_native_transport(); }); } future storage_service::is_native_transport_running() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { return bool(ss._cql_server); }); } future<> storage_service::decommission() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) { return seastar::async([&ss] { auto& tm = ss.get_token_metadata(); auto& db = ss.db().local(); @@ -1756,7 +1756,7 @@ future<> storage_service::decommission() { } future<> storage_service::remove_node(sstring host_id_string) { - return run_with_write_api_lock([host_id_string] (storage_service& ss) mutable { + return run_with_api_lock(sstring("remove_node"), [host_id_string] (storage_service& ss) mutable { return seastar::async([&ss, host_id_string] { logger.debug("remove_node: host_id = {}", host_id_string); auto my_address = ss.get_broadcast_address(); @@ -1879,7 +1879,7 @@ void storage_service::flush_column_families() { } future<> storage_service::drain() { - return run_with_write_api_lock([] (storage_service& ss) { + return run_with_api_lock(sstring("drain"), [] (storage_service& ss) { return seastar::async([&ss] { if (ss._operation_mode == mode::DRAINED) { logger.warn("Cannot drain node (did it already happen?)"); @@ -1974,30 +1974,25 @@ future> storage_service::get_load_map() { future<> storage_service::rebuild(sstring source_dc) { - return run_with_no_api_lock([source_dc] (storage_service& ss) { - return with_lock(ss.api_lock().for_write(), [&ss, source_dc] { - logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild"); - streamer->add_source_filter(std::make_unique(gms::get_local_failure_detector())); - if (source_dc != "") { - streamer->add_source_filter(std::make_unique(source_dc)); + return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) { + logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild"); + streamer->add_source_filter(std::make_unique(gms::get_local_failure_detector())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) { + streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); + } + return streamer->fetch_async().then_wrapped([streamer] (auto&& f) { + try { + auto state = f.get0(); + } catch (...) { + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + logger.error("Error while rebuilding node: {}", std::current_exception()); + throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception())); } - for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) { - streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); - } - return streamer; - }).then([] (auto streamer) mutable { - // FIXME: reconcile other operations while we are in the middle of rebuild since rebuild might take a long time. - return streamer->fetch_async().then_wrapped([streamer] (auto&& f) { - try { - auto state = f.get0(); - } catch (...) { - // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - logger.error("Error while rebuilding node: {}", std::current_exception()); - throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception())); - } - return make_ready_future<>(); - }); + return make_ready_future<>(); }); }); } @@ -2012,7 +2007,7 @@ int32_t storage_service::get_exception_count() { } future storage_service::is_initialized() { - return run_with_read_api_lock([] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { return ss._initialized; }); } @@ -2683,7 +2678,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } future<> storage_service::move(token new_token) { - return run_with_write_api_lock([new_token] (storage_service& ss) mutable { + return run_with_api_lock(sstring("move"), [new_token] (storage_service& ss) mutable { return seastar::async([new_token, &ss] { auto tokens = ss._token_metadata.sorted_tokens(); if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 6f0b64f783..27288c1054 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -57,7 +57,6 @@ #include "streaming/stream_state.hh" #include "streaming/stream_plan.hh" #include -#include namespace transport { class cql_server; @@ -119,7 +118,7 @@ private: shared_ptr _lb; shared_ptr> _cql_server; shared_ptr> _thrift_server; - rwlock _api_lock; + sstring _operation_in_progress; public: storage_service(distributed& db) : _db(db) { @@ -145,9 +144,6 @@ public: return _db; } - rwlock& api_lock() { - return _api_lock; - }; private: bool is_auto_bootstrap(); inet_address get_broadcast_address() const { @@ -2290,19 +2286,15 @@ public: #endif template - inline auto run_with_write_api_lock(Func&& func) { - return get_storage_service().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable { - return with_lock(ss.api_lock().for_write(), [&ss, func = std::forward(func)] () mutable { - return func(ss); - }); - }); - } - - template - inline auto run_with_read_api_lock(Func&& func) { - return get_storage_service().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable { - return with_lock(ss.api_lock().for_read(), [&ss, func = std::forward(func)] () mutable { - return func(ss); + inline auto run_with_api_lock(sstring operation, Func&& func) { + return get_storage_service().invoke_on(0, [operation = std::move(operation), + func = std::forward(func)] (storage_service& ss) mutable { + if (!ss._operation_in_progress.empty()) { + throw std::runtime_error(sprint("Operation %s is in progress, try again", ss._operation_in_progress)); + } + ss._operation_in_progress = std::move(operation); + return func(ss).finally([&ss] { + ss._operation_in_progress = sstring(); }); }); }