service: storage_proxy: move handle_paxos_accept to remote

This commit is contained in:
Kamil Braun
2022-07-19 18:17:53 +02:00
parent 807c7f32de
commit 548767f91e
2 changed files with 34 additions and 36 deletions

View File

@@ -152,7 +152,7 @@ struct storage_proxy::remote {
ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this));
// Register PAXOS verb handlers
ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this));
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&storage_proxy::handle_paxos_accept, sp));
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this));
ser::storage_proxy_rpc_verbs::register_paxos_prune(&_ms, std::bind_front(&storage_proxy::handle_paxos_prune, sp));
}
@@ -560,6 +560,39 @@ struct storage_proxy::remote {
});
});
}
future<bool> handle_paxos_accept(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
paxos::proposal proposal, std::optional<tracing::trace_info> trace_info) {
auto src_addr = netw::messaging_service::get_source(cinfo);
auto src_ip = src_addr.addr;
tracing::trace_state_ptr tr_state;
if (trace_info) {
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
}
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr).then([&sp = _sp, tr_state = std::move(tr_state),
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
dht::token token = proposal.update.decorated_key(*schema).token();
unsigned shard = dht::shard_of(*schema, token);
bool local = shard == this_shard_id();
sp.get_stats().replica_cross_shard_ops += !local;
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
local, proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
});
});
if (tr_state) {
f = f.finally([tr_state, src_ip] {
tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_ip);
});
}
return f;
}
};
thread_local uint64_t paxos_response_handler::next_id = 0;
@@ -5624,39 +5657,6 @@ future<rpc::tuple<Elements..., replica::exception_variant>> storage_proxy::encod
return make_exception_future<final_tuple_type>(std::move(eptr));
}
future<bool>
storage_proxy::handle_paxos_accept(const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,
std::optional<tracing::trace_info> trace_info) {
auto src_addr = netw::messaging_service::get_source(cinfo);
auto src_ip = src_addr.addr;
tracing::trace_state_ptr tr_state;
if (trace_info) {
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
}
auto f = remote().get_schema_for_read(proposal.update.schema_version(), src_addr).then([this, tr_state = std::move(tr_state),
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
dht::token token = proposal.update.decorated_key(*schema).token();
unsigned shard = dht::shard_of(*schema, token);
bool local = shard == this_shard_id();
get_stats().replica_cross_shard_ops += !local;
return container().invoke_on(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
local, proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
});
});
if (tr_state) {
f = f.finally([tr_state, src_ip] {
tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_ip);
});
}
return f;
}
future<rpc::no_wait_type>
storage_proxy::handle_paxos_prune(const rpc::client_info& cinfo, rpc::opt_time_point timeout,
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {

View File

@@ -441,8 +441,6 @@ private:
utils::UUID schema_version, auto in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info,
auto&& apply_fn, auto&& forward_fn);
future<bool> handle_paxos_accept(const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,
std::optional<tracing::trace_info> trace_info);
future<rpc::no_wait_type> handle_paxos_prune(const rpc::client_info& cinfo, rpc::opt_time_point timeout,
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
public: