From d9ba8eb8df25c334b0889e781fa4ef3361d9c46e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 31 May 2023 18:45:14 +0300 Subject: [PATCH] service/paxos: Add db::system_keyspace& argument to some methods The paxos_state's .prepare(), .accept(), .learn() and .prune() methods access system keyspace via its static methods. The only caller of those (storage_proxy::remote) already has the sharded system k.s. reference and can pass its .local() one as argument Signed-off-by: Pavel Emelyanov --- service/paxos/paxos_state.cc | 8 ++++---- service/paxos/paxos_state.hh | 9 +++++---- service/storage_proxy.cc | 30 +++++++++++++++--------------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index a0d0ab66fe..5569403c3b 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -43,7 +43,7 @@ future paxos_state::get_cas_lock(const dht::token& key, cloc co_return m; } -future paxos_state::prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, +future paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] { @@ -139,7 +139,7 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ }); } -future paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, +future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout) { return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout, [&sp, token = std::move(token), &proposal, schema, tr_state, timeout] { @@ -178,7 +178,7 @@ future paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_ }); } -future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, +future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { if (utils::get_local_injector().enter("paxos_error_before_learn")) { return make_exception_future<>(utils::injected_error("injected_error_before_learn")); @@ -237,7 +237,7 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis }); } -future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, +future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { logger.debug("Delete paxos state for ballot {}", ballot); tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index e582841838..c92d549e96 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -18,6 +18,7 @@ namespace service { class storage_proxy; } +namespace db { class system_keyspace; } namespace service::paxos { @@ -111,16 +112,16 @@ public: , _accepted_proposal(std::move(accepted)) , _most_recent_commit(std::move(commit)) {} // Replica RPC endpoint for Paxos "prepare" phase. - static future prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, + static future prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "accept" phase. - static future accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, + static future accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "learn". - static future<> learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); + static future<> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); // Replica RPC endpoint for pruning Paxos table - static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, + static future<> prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3ebf69b759..3db4cc9e9b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -567,9 +567,9 @@ private: return handle_write(src_addr, t, schema_version, std::move(decision), forward, reply_to, shard, response_id, trace_info, fencing_token{}, - /* apply_fn */ [] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, + /* apply_fn */ [this] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) { - return paxos::paxos_state::learn(*p, std::move(s), decision, timeout, tr_state); + return paxos::paxos_state::learn(*p, _sys_ks.local(), std::move(s), decision, timeout, tr_state); }, /* forward_fn */ [this] (shared_ptr&, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m, gms::inet_address reply_to, unsigned shard, response_id_type response_id, @@ -770,7 +770,7 @@ private: cmd.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); } - return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, cmd = std::move(cmd), key = std::move(key), ballot, + return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, cmd = std::move(cmd), key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable { dht::token token = dht::get_token(*schema, key); unsigned shard = schema->table().shard_of(token); @@ -778,9 +778,9 @@ private: sp.get_stats().replica_cross_shard_ops += !local; return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), cmd = make_lw_shared(std::move(cmd)), key = std::move(key), - ballot, only_digest, da, timeout, src_ip] (storage_proxy& sp) { + ballot, only_digest, da, timeout, src_ip, &sys_ks] (storage_proxy& sp) { tracing::trace_state_ptr tr_state = gt; - return paxos::paxos_state::prepare(sp, tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) { + return paxos::paxos_state::prepare(sp, sys_ks.local(), tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) { tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_ip); return make_foreign(std::make_unique(std::move(r))); }); @@ -800,15 +800,15 @@ private: tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal); } - auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, tr_state = std::move(tr_state), + auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, tr_state = std::move(tr_state), proposal = std::move(proposal), timeout] (schema_ptr schema) mutable { dht::token token = proposal.update.decorated_key(*schema).token(); unsigned shard = schema->table().shard_of(token); bool local = shard == this_shard_id(); sp.get_stats().replica_cross_shard_ops += !local; return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), - proposal = std::move(proposal), timeout, token] (storage_proxy& sp) { - return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout); + proposal = std::move(proposal), timeout, token, &sys_ks] (storage_proxy& sp) { + return paxos::paxos_state::accept(sp, sys_ks.local(), gt, gs, token, proposal, *timeout); }); }); @@ -843,16 +843,16 @@ private: pruning++; auto d = defer([] { pruning--; }); - return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, key = std::move(key), ballot, + return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, key = std::move(key), ballot, timeout, tr_state = std::move(tr_state), src_ip, d = std::move(d)] (schema_ptr schema) mutable { dht::token token = dht::get_token(*schema, key); unsigned shard = schema->table().shard_of(token); bool local = shard == this_shard_id(); sp.get_stats().replica_cross_shard_ops += !local; return smp::submit_to(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), - key = std::move(key), ballot, timeout, src_ip, d = std::move(d)] () { + key = std::move(key), ballot, timeout, src_ip, d = std::move(d), &sys_ks] () { tracing::trace_state_ptr tr_state = gt; - return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () { + return paxos::paxos_state::prune(sys_ks.local(), gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () { tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip); return netw::messaging_service::no_wait(); }); @@ -1205,7 +1205,7 @@ public: fencing_token) override { tracing::trace(tr_state, "Executing a learn locally"); // TODO: Enforce per partition rate limiting in paxos - return paxos::paxos_state::learn(sp, _schema, *_proposal, timeout, tr_state); + return paxos::paxos_state::learn(sp, sp.remote().system_keyspace(), _schema, *_proposal, timeout, tr_state); } virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, @@ -1891,7 +1891,7 @@ future paxos_response_handler::prepare_ballot(utils::UUI auto da = digest_algorithm(*_proxy); if (fbu::is_me(peer)) { tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot); - response = co_await paxos::paxos_state::prepare(*_proxy, tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout); + response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout); } else { response = co_await _proxy->remote().send_paxos_prepare(netw::msg_addr(peer), _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da); } @@ -2050,7 +2050,7 @@ future paxos_response_handler::accept_proposal(lw_shared_ptrupdate.decorated_key(*_schema).token(), *proposal, _timeout); + accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout); } else { accepted = co_await _proxy->remote().send_paxos_accept(netw::msg_addr(peer), _timeout, tr_state, *proposal); } @@ -2206,7 +2206,7 @@ void paxos_response_handler::prune(utils::UUID ballot) { (void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable { if (fbu::is_me(peer)) { tracing::trace(tr_state, "prune: prune {} locally", ballot); - return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state); + return paxos::paxos_state::prune(_proxy->remote().system_keyspace(), _schema, _key.key(), ballot, _timeout, tr_state); } else { tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer); return _proxy->remote().send_paxos_prune(netw::msg_addr(peer), _timeout, tr_state, _schema->version(), _key.key(), ballot);