From a6f8f450babc3ebb2395b93db82cd44341ec24d5 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 24 Jul 2020 19:18:35 +0300 Subject: [PATCH] storage_service: Use local messaging reference All the places the are (and had become such with previous patches) using the global messaging service and the storage service methods, so they can access the local reference on the messaging service. Signed-off-by: Pavel Emelyanov --- service/storage_service.cc | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 788f1fa7ec..05a3d8d597 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -934,9 +934,9 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get(); + replace_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get(); } else { - bootstrap_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get(); + bootstrap_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get(); } } else { dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); @@ -1951,8 +1951,9 @@ static constexpr auto UNREACHABLE = "UNREACHABLE"; future>> storage_service::describe_schema_versions() { auto live_hosts = _gossiper.get_live_members(); std::unordered_map> results; - return map_reduce(std::move(live_hosts), [] (auto host) { - auto f0 = netw::get_messaging_service().local().send_schema_check(netw::msg_addr{ host, 0 }); + netw::messaging_service& ms = _messaging.local(); + return map_reduce(std::move(live_hosts), [&ms] (auto host) { + auto f0 = ms.send_schema_check(netw::msg_addr{ host, 0 }); return std::move(f0).then_wrapped([host] (auto f) { if (f.failed()) { f.ignore_ready_future(); @@ -2041,7 +2042,7 @@ future<> storage_service::do_stop_ms() { return make_ready_future<>(); } _ms_stopped = true; - return netw::get_messaging_service().invoke_on_all([] (auto& ms) { + return _messaging.invoke_on_all([] (auto& ms) { return ms.shutdown(); }).then([] { slogger.info("messaging_service stopped"); @@ -2298,7 +2299,7 @@ future<> storage_service::rebuild(sstring source_dc) { return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) { slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); if (ss.is_repair_based_node_ops_enabled()) { - return rebuild_with_repair(ss._db, netw::get_messaging_service(), ss._token_metadata, std::move(source_dc)); + return rebuild_with_repair(ss._db, ss._messaging, ss._token_metadata, std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); @@ -2408,7 +2409,7 @@ std::unordered_multimap storage_service::get_cha void storage_service::unbootstrap() { db::get_local_batchlog_manager().do_batch_log_replay().get(); if (is_repair_based_node_ops_enabled()) { - decommission_with_repair(_db, netw::get_messaging_service(), _token_metadata).get(); + decommission_with_repair(_db, _messaging, _token_metadata).get(); } else { std::unordered_map> ranges_to_stream; @@ -2450,7 +2451,7 @@ void storage_service::unbootstrap() { future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled()) { - return removenode_with_repair(_db, netw::get_messaging_service(), _token_metadata, endpoint).finally([this, notify_endpoint] () { + return removenode_with_repair(_db, _messaging, _token_metadata, endpoint).finally([this, notify_endpoint] () { return send_replication_notification(notify_endpoint); }); } @@ -2519,11 +2520,10 @@ future<> storage_service::send_replication_notification(inet_address remote) { // the number of retries. return *done || !_gossiper.is_alive(remote) || *sent >= 3; }, - [done, sent, remote, local] { - auto& ms = netw::get_local_messaging_service(); + [this, done, sent, remote, local] { netw::msg_addr id{remote, 0}; (*sent)++; - return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { + return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { try { f.get(); *done = true; @@ -2808,15 +2808,13 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) { } void storage_service::init_messaging_service() { - auto& ms = netw::get_local_messaging_service(); - ms.register_replication_finished([] (gms::inet_address from) { + _messaging.local().register_replication_finished([] (gms::inet_address from) { return get_local_storage_service().confirm_replication(from); }); } future<> storage_service::uninit_messaging_service() { - auto& ms = netw::get_local_messaging_service(); - return ms.unregister_replication_finished(); + return _messaging.local().unregister_replication_finished(); } void storage_service::do_isolate_on_error(disk_error type) @@ -3024,7 +3022,7 @@ future<> deinit_storage_service() { void storage_service::notify_down(inet_address endpoint) { get_storage_service().invoke_on_all([endpoint] (auto&& ss) { - netw::get_local_messaging_service().remove_rpc_client(netw::msg_addr{endpoint, 0}); + ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0}); return seastar::async([&ss, endpoint] { for (auto&& subscriber : ss._lifecycle_subscribers) { try {