service/paxos: Add db::system_keyspace& argument to some methods

The paxos_state's .prepare(), .accept(), .learn() and .prune() methods
access system keyspace via its static methods. The only caller of those
(storage_proxy::remote) already has the sharded system k.s. reference
and can pass its .local() one as argument

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-05-31 18:45:14 +03:00
parent b4fc1076e3
commit d9ba8eb8df
3 changed files with 24 additions and 23 deletions

View File

@@ -43,7 +43,7 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
co_return m;
}
future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema,
future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
@@ -139,7 +139,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
});
}
future<bool> paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout,
[&sp, token = std::move(token), &proposal, schema, tr_state, timeout] {
@@ -178,7 +178,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_
});
}
future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout,
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
@@ -237,7 +237,7 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis
});
}
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
logger.debug("Delete paxos state for ballot {}", ballot);
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);

View File

@@ -18,6 +18,7 @@
namespace service {
class storage_proxy;
}
namespace db { class system_keyspace; }
namespace service::paxos {
@@ -111,16 +112,16 @@ public:
, _accepted_proposal(std::move(accepted))
, _most_recent_commit(std::move(commit)) {}
// Replica RPC endpoint for Paxos "prepare" phase.
static future<prepare_response> prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema,
static future<prepare_response> prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "accept" phase.
static future<bool> accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
static future<bool> accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "learn".
static future<> learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
static future<> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
// Replica RPC endpoint for pruning Paxos table
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
static future<> prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state);
};

View File

@@ -567,9 +567,9 @@ private:
return handle_write(src_addr, t, schema_version, std::move(decision), forward, reply_to, shard,
response_id, trace_info,
fencing_token{},
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
/* apply_fn */ [this] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) {
return paxos::paxos_state::learn(*p, std::move(s), decision, timeout, tr_state);
return paxos::paxos_state::learn(*p, _sys_ks.local(), std::move(s), decision, timeout, tr_state);
},
/* forward_fn */ [this] (shared_ptr<storage_proxy>&, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m,
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
@@ -770,7 +770,7 @@ private:
cmd.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, cmd = std::move(cmd), key = std::move(key), ballot,
return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, cmd = std::move(cmd), key = std::move(key), ballot,
only_digest, da, timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = schema->table().shard_of(token);
@@ -778,9 +778,9 @@ private:
sp.get_stats().replica_cross_shard_ops += !local;
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
cmd = make_lw_shared<query::read_command>(std::move(cmd)), key = std::move(key),
ballot, only_digest, da, timeout, src_ip] (storage_proxy& sp) {
ballot, only_digest, da, timeout, src_ip, &sys_ks] (storage_proxy& sp) {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prepare(sp, tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) {
return paxos::paxos_state::prepare(sp, sys_ks.local(), tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) {
tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_ip);
return make_foreign(std::make_unique<paxos::prepare_response>(std::move(r)));
});
@@ -800,15 +800,15 @@ private:
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
}
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, tr_state = std::move(tr_state),
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, tr_state = std::move(tr_state),
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
dht::token token = proposal.update.decorated_key(*schema).token();
unsigned shard = schema->table().shard_of(token);
bool local = shard == this_shard_id();
sp.get_stats().replica_cross_shard_ops += !local;
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
proposal = std::move(proposal), timeout, token, &sys_ks] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, sys_ks.local(), gt, gs, token, proposal, *timeout);
});
});
@@ -843,16 +843,16 @@ private:
pruning++;
auto d = defer([] { pruning--; });
return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, key = std::move(key), ballot,
return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, key = std::move(key), ballot,
timeout, tr_state = std::move(tr_state), src_ip, d = std::move(d)] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = schema->table().shard_of(token);
bool local = shard == this_shard_id();
sp.get_stats().replica_cross_shard_ops += !local;
return smp::submit_to(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
key = std::move(key), ballot, timeout, src_ip, d = std::move(d)] () {
key = std::move(key), ballot, timeout, src_ip, d = std::move(d), &sys_ks] () {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
return paxos::paxos_state::prune(sys_ks.local(), gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
return netw::messaging_service::no_wait();
});
@@ -1205,7 +1205,7 @@ public:
fencing_token) override {
tracing::trace(tr_state, "Executing a learn locally");
// TODO: Enforce per partition rate limiting in paxos
return paxos::paxos_state::learn(sp, _schema, *_proposal, timeout, tr_state);
return paxos::paxos_state::learn(sp, sp.remote().system_keyspace(), _schema, *_proposal, timeout, tr_state);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
@@ -1891,7 +1891,7 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
auto da = digest_algorithm(*_proxy);
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot);
response = co_await paxos::paxos_state::prepare(*_proxy, tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
} else {
response = co_await _proxy->remote().send_paxos_prepare(netw::msg_addr(peer), _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da);
}
@@ -2050,7 +2050,7 @@ future<bool> paxos_response_handler::accept_proposal(lw_shared_ptr<paxos::propos
try {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", *proposal);
accepted = co_await paxos::paxos_state::accept(*_proxy, tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
} else {
accepted = co_await _proxy->remote().send_paxos_accept(netw::msg_addr(peer), _timeout, tr_state, *proposal);
}
@@ -2206,7 +2206,7 @@ void paxos_response_handler::prune(utils::UUID ballot) {
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
return paxos::paxos_state::prune(_proxy->remote().system_keyspace(), _schema, _key.key(), ballot, _timeout, tr_state);
} else {
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
return _proxy->remote().send_paxos_prune(netw::msg_addr(peer), _timeout, tr_state, _schema->version(), _key.key(), ballot);