storage_service: Fix run with api lock

Start with coarse control:

1) converting the run_with_write_api_lock operations:

join_ring, start_gossiping, stop_gossiping, start_rpc_server,
stop_rpc_server, start_native_transport, stop_native_transport,
decommission, remove_node, drain, move, rebuild

to use run_with_api_lock which uses a flag to indicate current operation
in progress.

If one of the above operation is in progress when admin issues another
opeartion we return a "try again" exception to avoid running two
operations in parallel.

2) converting the run_with_read_api_lock to use no lock.

Fixes #850.

Message-Id: <00782b601028ed87437e5decae382f72dff634f6.1456758391.git.asias@scylladb.com>
This commit is contained in:
Asias He
2016-02-29 15:06:20 +00:00
committed by Avi Kivity
parent 4a4d288bba
commit a41bcad585
2 changed files with 48 additions and 61 deletions

View File

@@ -395,7 +395,7 @@ void storage_service::join_token_ring(int delay) {
}
future<> storage_service::join_ring() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("join_ring"), [] (storage_service& ss) {
return seastar::async([&ss] {
if (!ss._joined) {
logger.info("Joining ring by operator request");
@@ -418,7 +418,7 @@ future<> storage_service::join_ring() {
}
future<bool> storage_service::is_joined() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
return ss._joined && !ss._is_survey_mode;
});
}
@@ -1272,7 +1272,7 @@ future<std::unordered_set<token>> storage_service::prepare_replacement_info() {
}
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
auto token_map = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens());
// describeOwnership returns tokens in an unspecified order, let's re-order them
std::map<gms::inet_address, float> ownership;
@@ -1286,7 +1286,7 @@ future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
}
future<std::map<gms::inet_address, float>> storage_service::effective_ownership(sstring keyspace_name) {
return run_with_read_api_lock([keyspace_name] (storage_service& ss) mutable {
return run_with_no_api_lock([keyspace_name] (storage_service& ss) mutable {
if (keyspace_name != "") {
//find throws no such keyspace if it is missing
const keyspace& ks = ss._db.local().find_keyspace(keyspace_name);
@@ -1379,27 +1379,27 @@ sstring storage_service::get_schema_version() {
}
future<sstring> storage_service::get_operation_mode() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
auto mode = ss._operation_mode;
return make_ready_future<sstring>(sprint("%s", mode));
});
}
future<bool> storage_service::is_starting() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
auto mode = ss._operation_mode;
return mode == storage_service::mode::STARTING;
});
}
future<bool> storage_service::is_gossip_running() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
return gms::get_local_gossiper().is_enabled();
});
}
future<> storage_service::start_gossiping() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) {
return seastar::async([&ss] {
if (!ss._initialized) {
logger.warn("Starting gossip by operator request");
@@ -1414,7 +1414,7 @@ future<> storage_service::start_gossiping() {
}
future<> storage_service::stop_gossiping() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) {
if (ss._initialized) {
logger.warn("Stopping gossip by operator request");
return gms::get_local_gossiper().stop_gossiping().then([&ss] {
@@ -1585,7 +1585,7 @@ future<int64_t> storage_service::true_snapshots_size() {
}
future<> storage_service::start_rpc_server() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("start_rpc_server"), [] (storage_service& ss) {
if (ss._thrift_server) {
return make_ready_future<>();
}
@@ -1625,19 +1625,19 @@ future<> storage_service::do_stop_rpc_server() {
}
future<> storage_service::stop_rpc_server() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("stop_rpc_server"), [] (storage_service& ss) {
return ss.do_stop_rpc_server();
});
}
future<bool> storage_service::is_rpc_server_running() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
return bool(ss._thrift_server);
});
}
future<> storage_service::start_native_transport() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("start_native_transport"), [] (storage_service& ss) {
if (ss._cql_server) {
return make_ready_future<>();
}
@@ -1690,19 +1690,19 @@ future<> storage_service::do_stop_native_transport() {
}
future<> storage_service::stop_native_transport() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("stop_native_transport"), [] (storage_service& ss) {
return ss.do_stop_native_transport();
});
}
future<bool> storage_service::is_native_transport_running() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
return bool(ss._cql_server);
});
}
future<> storage_service::decommission() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
return seastar::async([&ss] {
auto& tm = ss.get_token_metadata();
auto& db = ss.db().local();
@@ -1756,7 +1756,7 @@ future<> storage_service::decommission() {
}
future<> storage_service::remove_node(sstring host_id_string) {
return run_with_write_api_lock([host_id_string] (storage_service& ss) mutable {
return run_with_api_lock(sstring("remove_node"), [host_id_string] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string] {
logger.debug("remove_node: host_id = {}", host_id_string);
auto my_address = ss.get_broadcast_address();
@@ -1879,7 +1879,7 @@ void storage_service::flush_column_families() {
}
future<> storage_service::drain() {
return run_with_write_api_lock([] (storage_service& ss) {
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
return seastar::async([&ss] {
if (ss._operation_mode == mode::DRAINED) {
logger.warn("Cannot drain node (did it already happen?)");
@@ -1974,30 +1974,25 @@ future<std::map<sstring, double>> storage_service::get_load_map() {
future<> storage_service::rebuild(sstring source_dc) {
return run_with_no_api_lock([source_dc] (storage_service& ss) {
return with_lock(ss.api_lock().for_write(), [&ss, source_dc] {
logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild");
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) {
logger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss.get_broadcast_address(), "Rebuild");
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
}
for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) {
streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
}
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
logger.error("Error while rebuilding node: {}", std::current_exception());
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
}
for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) {
streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
}
return streamer;
}).then([] (auto streamer) mutable {
// FIXME: reconcile other operations while we are in the middle of rebuild since rebuild might take a long time.
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
logger.error("Error while rebuilding node: {}", std::current_exception());
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
}
return make_ready_future<>();
});
return make_ready_future<>();
});
});
}
@@ -2012,7 +2007,7 @@ int32_t storage_service::get_exception_count() {
}
future<bool> storage_service::is_initialized() {
return run_with_read_api_lock([] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
return ss._initialized;
});
}
@@ -2683,7 +2678,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
future<> storage_service::move(token new_token) {
return run_with_write_api_lock([new_token] (storage_service& ss) mutable {
return run_with_api_lock(sstring("move"), [new_token] (storage_service& ss) mutable {
return seastar::async([new_token, &ss] {
auto tokens = ss._token_metadata.sorted_tokens();
if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) {

View File

@@ -57,7 +57,6 @@
#include "streaming/stream_state.hh"
#include "streaming/stream_plan.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/rwlock.hh>
namespace transport {
class cql_server;
@@ -119,7 +118,7 @@ private:
shared_ptr<load_broadcaster> _lb;
shared_ptr<distributed<transport::cql_server>> _cql_server;
shared_ptr<distributed<thrift_server>> _thrift_server;
rwlock _api_lock;
sstring _operation_in_progress;
public:
storage_service(distributed<database>& db)
: _db(db) {
@@ -145,9 +144,6 @@ public:
return _db;
}
rwlock& api_lock() {
return _api_lock;
};
private:
bool is_auto_bootstrap();
inet_address get_broadcast_address() const {
@@ -2290,19 +2286,15 @@ public:
#endif
template <typename Func>
inline auto run_with_write_api_lock(Func&& func) {
return get_storage_service().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return with_lock(ss.api_lock().for_write(), [&ss, func = std::forward<Func>(func)] () mutable {
return func(ss);
});
});
}
template <typename Func>
inline auto run_with_read_api_lock(Func&& func) {
return get_storage_service().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return with_lock(ss.api_lock().for_read(), [&ss, func = std::forward<Func>(func)] () mutable {
return func(ss);
inline auto run_with_api_lock(sstring operation, Func&& func) {
return get_storage_service().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(sprint("Operation %s is in progress, try again", ss._operation_in_progress));
}
ss._operation_in_progress = std::move(operation);
return func(ss).finally([&ss] {
ss._operation_in_progress = sstring();
});
});
}