mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 22:13:19 +00:00
storage_proxy: Pass proxy into forward_fn lambda of handle_write
It is alive there, so it is safe to pass one to lambda. Once in forward_fn, it can be used to get messaging from. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -4830,7 +4830,7 @@ void storage_proxy::init_messaging_service() {
|
||||
parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr,
|
||||
timeout, &errors, forward_fn = std::move(forward_fn)] (gms::inet_address forward) {
|
||||
tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward);
|
||||
return forward_fn(netw::messaging_service::msg_addr{forward, 0}, timeout, m, reply_to, shard, response_id,
|
||||
return forward_fn(p, netw::messaging_service::msg_addr{forward, 0}, timeout, m, reply_to, shard, response_id,
|
||||
tracing::make_trace_info(trace_state_ptr))
|
||||
.then_wrapped([&p, &errors] (future<> f) {
|
||||
if (f.failed()) {
|
||||
@@ -4876,11 +4876,10 @@ void storage_proxy::init_messaging_service() {
|
||||
clock_type::time_point timeout) {
|
||||
return p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout);
|
||||
},
|
||||
/* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m,
|
||||
/* forward_fn */ [] (shared_ptr<storage_proxy>& p, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m,
|
||||
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info) {
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
return ms.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info));
|
||||
return p->_messaging.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info));
|
||||
});
|
||||
};
|
||||
ms.register_mutation(receive_mutation_handler);
|
||||
@@ -4899,11 +4898,10 @@ void storage_proxy::init_messaging_service() {
|
||||
const paxos::proposal& decision, clock_type::time_point timeout) {
|
||||
return paxos::paxos_state::learn(std::move(s), decision, timeout, tr_state);
|
||||
},
|
||||
/* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m,
|
||||
/* forward_fn */ [] (shared_ptr<storage_proxy>& p, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m,
|
||||
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info) {
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
return ms.send_paxos_learn(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info));
|
||||
return p->_messaging.send_paxos_learn(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info));
|
||||
});
|
||||
});
|
||||
ms.register_mutation_done([this] (const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<db::view::update_backlog> backlog) {
|
||||
|
||||
Reference in New Issue
Block a user