From d397d7e734359b7189bddded0b0a3544355aaff4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 24 Jul 2020 15:35:45 +0300 Subject: [PATCH] storage_proxy: Pass proxy into forward_fn lambda of handle_write It is alive there, so it is safe to pass one to lambda. Once in forward_fn, it can be used to get messaging from. Signed-off-by: Pavel Emelyanov --- service/storage_proxy.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 7d41777e5d..6195aa59d9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4830,7 +4830,7 @@ void storage_proxy::init_messaging_service() { parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout, &errors, forward_fn = std::move(forward_fn)] (gms::inet_address forward) { tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward); - return forward_fn(netw::messaging_service::msg_addr{forward, 0}, timeout, m, reply_to, shard, response_id, + return forward_fn(p, netw::messaging_service::msg_addr{forward, 0}, timeout, m, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)) .then_wrapped([&p, &errors] (future<> f) { if (f.failed()) { @@ -4876,11 +4876,10 @@ void storage_proxy::init_messaging_service() { clock_type::time_point timeout) { return p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout); }, - /* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m, + /* forward_fn */ [] (shared_ptr& p, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m, gms::inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { - auto& ms = netw::get_local_messaging_service(); - return ms.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); + return p->_messaging.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); }); }; ms.register_mutation(receive_mutation_handler); @@ -4899,11 +4898,10 @@ void storage_proxy::init_messaging_service() { const paxos::proposal& decision, clock_type::time_point timeout) { return paxos::paxos_state::learn(std::move(s), decision, timeout, tr_state); }, - /* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m, + /* forward_fn */ [] (shared_ptr& p, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m, gms::inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { - auto& ms = netw::get_local_messaging_service(); - return ms.send_paxos_learn(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); + return p->_messaging.send_paxos_learn(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); }); }); ms.register_mutation_done([this] (const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional backlog) {