diff --git a/service/storage_service.cc b/service/storage_service.cc index 788f1fa7ec..05a3d8d597 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -934,9 +934,9 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get(); + replace_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get(); } else { - bootstrap_with_repair(_db, netw::get_messaging_service(), _token_metadata, _bootstrap_tokens).get(); + bootstrap_with_repair(_db, _messaging, _token_metadata, _bootstrap_tokens).get(); } } else { dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); @@ -1951,8 +1951,9 @@ static constexpr auto UNREACHABLE = "UNREACHABLE"; future>> storage_service::describe_schema_versions() { auto live_hosts = _gossiper.get_live_members(); std::unordered_map> results; - return map_reduce(std::move(live_hosts), [] (auto host) { - auto f0 = netw::get_messaging_service().local().send_schema_check(netw::msg_addr{ host, 0 }); + netw::messaging_service& ms = _messaging.local(); + return map_reduce(std::move(live_hosts), [&ms] (auto host) { + auto f0 = ms.send_schema_check(netw::msg_addr{ host, 0 }); return std::move(f0).then_wrapped([host] (auto f) { if (f.failed()) { f.ignore_ready_future(); @@ -2041,7 +2042,7 @@ future<> storage_service::do_stop_ms() { return make_ready_future<>(); } _ms_stopped = true; - return netw::get_messaging_service().invoke_on_all([] (auto& ms) { + return _messaging.invoke_on_all([] (auto& ms) { return ms.shutdown(); }).then([] { slogger.info("messaging_service stopped"); @@ -2298,7 +2299,7 @@ future<> storage_service::rebuild(sstring source_dc) { return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) { slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); if (ss.is_repair_based_node_ops_enabled()) { - return rebuild_with_repair(ss._db, netw::get_messaging_service(), ss._token_metadata, std::move(source_dc)); + return rebuild_with_repair(ss._db, ss._messaging, ss._token_metadata, std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); @@ -2408,7 +2409,7 @@ std::unordered_multimap storage_service::get_cha void storage_service::unbootstrap() { db::get_local_batchlog_manager().do_batch_log_replay().get(); if (is_repair_based_node_ops_enabled()) { - decommission_with_repair(_db, netw::get_messaging_service(), _token_metadata).get(); + decommission_with_repair(_db, _messaging, _token_metadata).get(); } else { std::unordered_map> ranges_to_stream; @@ -2450,7 +2451,7 @@ void storage_service::unbootstrap() { future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled()) { - return removenode_with_repair(_db, netw::get_messaging_service(), _token_metadata, endpoint).finally([this, notify_endpoint] () { + return removenode_with_repair(_db, _messaging, _token_metadata, endpoint).finally([this, notify_endpoint] () { return send_replication_notification(notify_endpoint); }); } @@ -2519,11 +2520,10 @@ future<> storage_service::send_replication_notification(inet_address remote) { // the number of retries. return *done || !_gossiper.is_alive(remote) || *sent >= 3; }, - [done, sent, remote, local] { - auto& ms = netw::get_local_messaging_service(); + [this, done, sent, remote, local] { netw::msg_addr id{remote, 0}; (*sent)++; - return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { + return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { try { f.get(); *done = true; @@ -2808,15 +2808,13 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) { } void storage_service::init_messaging_service() { - auto& ms = netw::get_local_messaging_service(); - ms.register_replication_finished([] (gms::inet_address from) { + _messaging.local().register_replication_finished([] (gms::inet_address from) { return get_local_storage_service().confirm_replication(from); }); } future<> storage_service::uninit_messaging_service() { - auto& ms = netw::get_local_messaging_service(); - return ms.unregister_replication_finished(); + return _messaging.local().unregister_replication_finished(); } void storage_service::do_isolate_on_error(disk_error type) @@ -3024,7 +3022,7 @@ future<> deinit_storage_service() { void storage_service::notify_down(inet_address endpoint) { get_storage_service().invoke_on_all([endpoint] (auto&& ss) { - netw::get_local_messaging_service().remove_rpc_client(netw::msg_addr{endpoint, 0}); + ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0}); return seastar::async([&ss, endpoint] { for (auto&& subscriber : ss._lifecycle_subscribers) { try {