service: storage_proxy: move handle_paxos_accept to remote
This commit is contained in:
@@ -152,7 +152,7 @@ struct storage_proxy::remote {
|
||||
ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this));
|
||||
// Register PAXOS verb handlers
|
||||
ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this));
|
||||
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&storage_proxy::handle_paxos_accept, sp));
|
||||
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this));
|
||||
ser::storage_proxy_rpc_verbs::register_paxos_prune(&_ms, std::bind_front(&storage_proxy::handle_paxos_prune, sp));
|
||||
}
|
||||
|
||||
@@ -560,6 +560,39 @@ struct storage_proxy::remote {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> handle_paxos_accept(
|
||||
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
paxos::proposal proposal, std::optional<tracing::trace_info> trace_info) {
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
auto src_ip = src_addr.addr;
|
||||
tracing::trace_state_ptr tr_state;
|
||||
if (trace_info) {
|
||||
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
|
||||
tracing::begin(tr_state);
|
||||
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
|
||||
}
|
||||
|
||||
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr).then([&sp = _sp, tr_state = std::move(tr_state),
|
||||
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
|
||||
dht::token token = proposal.update.decorated_key(*schema).token();
|
||||
unsigned shard = dht::shard_of(*schema, token);
|
||||
bool local = shard == this_shard_id();
|
||||
sp.get_stats().replica_cross_shard_ops += !local;
|
||||
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
|
||||
local, proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
|
||||
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
|
||||
});
|
||||
});
|
||||
|
||||
if (tr_state) {
|
||||
f = f.finally([tr_state, src_ip] {
|
||||
tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
}
|
||||
|
||||
return f;
|
||||
}
|
||||
};
|
||||
|
||||
thread_local uint64_t paxos_response_handler::next_id = 0;
|
||||
@@ -5624,39 +5657,6 @@ future<rpc::tuple<Elements..., replica::exception_variant>> storage_proxy::encod
|
||||
return make_exception_future<final_tuple_type>(std::move(eptr));
|
||||
}
|
||||
|
||||
future<bool>
|
||||
storage_proxy::handle_paxos_accept(const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,
|
||||
std::optional<tracing::trace_info> trace_info) {
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
auto src_ip = src_addr.addr;
|
||||
tracing::trace_state_ptr tr_state;
|
||||
if (trace_info) {
|
||||
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
|
||||
tracing::begin(tr_state);
|
||||
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
|
||||
}
|
||||
|
||||
auto f = remote().get_schema_for_read(proposal.update.schema_version(), src_addr).then([this, tr_state = std::move(tr_state),
|
||||
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
|
||||
dht::token token = proposal.update.decorated_key(*schema).token();
|
||||
unsigned shard = dht::shard_of(*schema, token);
|
||||
bool local = shard == this_shard_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
return container().invoke_on(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
|
||||
local, proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
|
||||
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
|
||||
});
|
||||
});
|
||||
|
||||
if (tr_state) {
|
||||
f = f.finally([tr_state, src_ip] {
|
||||
tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
}
|
||||
|
||||
return f;
|
||||
}
|
||||
|
||||
future<rpc::no_wait_type>
|
||||
storage_proxy::handle_paxos_prune(const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||
|
||||
@@ -441,8 +441,6 @@ private:
|
||||
utils::UUID schema_version, auto in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
|
||||
unsigned shard, storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info,
|
||||
auto&& apply_fn, auto&& forward_fn);
|
||||
future<bool> handle_paxos_accept(const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,
|
||||
std::optional<tracing::trace_info> trace_info);
|
||||
future<rpc::no_wait_type> handle_paxos_prune(const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user