storage_service: Use local messaging reference

All the places the are (and had become such with previous patches) using
the global messaging service and the storage service methods, so they
can access the local reference on the messaging service.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-07-24 19:18:35 +03:00
parent 4ea3c2797c
commit a6f8f450ba

View File

@@ -934,9 +934,9 @@ void storage_service::bootstrap() {
set_mode(mode::JOINING, "Starting to bootstrap...", true);
if (is_repair_based_node_ops_enabled()) {
if (db().local().is_replacing()) {
replace_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get();
replace_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get();
} else {
bootstrap_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get();
bootstrap_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get();
}
} else {
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
@@ -1951,8 +1951,9 @@ static constexpr auto UNREACHABLE = "UNREACHABLE";
future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::describe_schema_versions() {
auto live_hosts = _gossiper.get_live_members();
std::unordered_map<sstring, std::vector<sstring>> results;
return map_reduce(std::move(live_hosts), [] (auto host) {
auto f0 = netw::get_messaging_service().local().send_schema_check(netw::msg_addr{ host, 0 });
netw::messaging_service& ms = _messaging.local();
return map_reduce(std::move(live_hosts), [&ms] (auto host) {
auto f0 = ms.send_schema_check(netw::msg_addr{ host, 0 });
return std::move(f0).then_wrapped([host] (auto f) {
if (f.failed()) {
f.ignore_ready_future();
@@ -2041,7 +2042,7 @@ future<> storage_service::do_stop_ms() {
return make_ready_future<>();
}
_ms_stopped = true;
return netw::get_messaging_service().invoke_on_all([] (auto& ms) {
return _messaging.invoke_on_all([] (auto& ms) {
return ms.shutdown();
}).then([] {
slogger.info("messaging_service stopped");
@@ -2298,7 +2299,7 @@ future<> storage_service::rebuild(sstring source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) {
slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
if (ss.is_repair_based_node_ops_enabled()) {
return rebuild_with_repair(ss._db, netw::get_messaging_service(), ss._token_metadata, std::move(source_dc));
return rebuild_with_repair(ss._db, ss._messaging, ss._token_metadata, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss._abort_source,
ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild);
@@ -2408,7 +2409,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
void storage_service::unbootstrap() {
db::get_local_batchlog_manager().do_batch_log_replay().get();
if (is_repair_based_node_ops_enabled()) {
decommission_with_repair(_db, netw::get_messaging_service(), _token_metadata).get();
decommission_with_repair(_db, _messaging, _token_metadata).get();
} else {
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
@@ -2450,7 +2451,7 @@ void storage_service::unbootstrap() {
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled()) {
return removenode_with_repair(_db, netw::get_messaging_service(), _token_metadata, endpoint).finally([this, notify_endpoint] () {
return removenode_with_repair(_db, _messaging, _token_metadata, endpoint).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
}
@@ -2519,11 +2520,10 @@ future<> storage_service::send_replication_notification(inet_address remote) {
// the number of retries.
return *done || !_gossiper.is_alive(remote) || *sent >= 3;
},
[done, sent, remote, local] {
auto& ms = netw::get_local_messaging_service();
[this, done, sent, remote, local] {
netw::msg_addr id{remote, 0};
(*sent)++;
return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) {
return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) {
try {
f.get();
*done = true;
@@ -2808,15 +2808,13 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}
void storage_service::init_messaging_service() {
auto& ms = netw::get_local_messaging_service();
ms.register_replication_finished([] (gms::inet_address from) {
_messaging.local().register_replication_finished([] (gms::inet_address from) {
return get_local_storage_service().confirm_replication(from);
});
}
future<> storage_service::uninit_messaging_service() {
auto& ms = netw::get_local_messaging_service();
return ms.unregister_replication_finished();
return _messaging.local().unregister_replication_finished();
}
void storage_service::do_isolate_on_error(disk_error type)
@@ -3024,7 +3022,7 @@ future<> deinit_storage_service() {
void storage_service::notify_down(inet_address endpoint) {
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
netw::get_local_messaging_service().remove_rpc_client(netw::msg_addr{endpoint, 0});
ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0});
return seastar::async([&ss, endpoint] {
for (auto&& subscriber : ss._lifecycle_subscribers) {
try {