From 4f546b8b7919728fd8d2d93691bde7815e9d01d5 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 14:52:15 +0300 Subject: [PATCH 1/6] paxos: introduce get_replica_lock() function to take RAII guard for local paxos table access --- service/paxos/paxos_state.cc | 6 +++++ service/paxos/paxos_state.hh | 51 ++++++++++++++++++------------------ service/storage_proxy.cc | 2 +- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index e7f98b047d..400a1e3281 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -36,6 +36,12 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key) } } +future paxos_state::get_replica_lock(const dht::token& key, clock_type::time_point timeout) { + guard m(_paxos_table_lock, key, timeout); + co_await m.lock(); + co_return m; +} + future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { guard m(_coordinator_lock, key, timeout); co_await m.lock(); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index 5d001b52c2..a91746ab9a 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -26,9 +26,8 @@ using clock_type = db::timeout_clock; // The state of a CAS update of a given primary key as persisted in the paxos table. class paxos_state { -public: - class guard; private: + class guard; class key_lock_map { using semaphore = basic_semaphore; @@ -55,29 +54,6 @@ private: friend class guard; }; - // Locks are local to the shard which owns the corresponding token range. - // Protects concurrent reads and writes of the same row in system.paxos table. - static thread_local key_lock_map _paxos_table_lock; - // Taken by the coordinator code to allow only one instance of PAXOS to run for each key. - // This prevents contantion between multiple clients trying to modify the - // same key through the same coordinator and stealing the ballot from - // each other. - static thread_local key_lock_map _coordinator_lock; - - - // protects access to system.paxos - template - static - futurize_t> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) { - return _paxos_table_lock.with_locked_key(key, timeout, std::move(func)); - } - - utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(); - std::optional _accepted_proposal; - std::optional _most_recent_commit; - -public: - class guard { key_lock_map& _map; dht::token _key; @@ -101,6 +77,31 @@ public: } }; + // Locks are local to the shard which owns the corresponding token range. + // Protects concurrent reads and writes of the same row in system.paxos table. + static thread_local key_lock_map _paxos_table_lock; + // Taken by the coordinator code to allow only one instance of PAXOS to run for each key. + // This prevents contantion between multiple clients trying to modify the + // same key through the same coordinator and stealing the ballot from + // each other. + static thread_local key_lock_map _coordinator_lock; + + + // protects access to system.paxos + template + static + futurize_t> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) { + return _paxos_table_lock.with_locked_key(key, timeout, std::move(func)); + } + + static future get_replica_lock(const dht::token& key, clock_type::time_point timeout); + + utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(); + std::optional _accepted_proposal; + std::optional _most_recent_commit; + +public: + static future get_cas_lock(const dht::token& key, clock_type::time_point timeout); static logging::logger logger; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 289c02da7a..1ad66dbd61 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6244,7 +6244,7 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque } }); - paxos::paxos_state::guard l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout); + auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout); while (true) { // Finish the previous PAXOS round, if any, and, as a side effect, compute From 58912c2cc1cd6a725743351642780abce2fe4bde Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 14:52:45 +0300 Subject: [PATCH 2/6] paxos: co-routinize paxos_state::prepare function --- service/paxos/paxos_state.cc | 187 ++++++++++++++++++----------------- 1 file changed, 94 insertions(+), 93 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 400a1e3281..244449cd21 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -7,7 +7,9 @@ /* * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include #include +#include #include "service/storage_proxy.hh" #include "service/paxos/proposal.hh" #include "service/paxos/paxos_state.hh" @@ -51,101 +53,100 @@ future paxos_state::get_cas_lock(const dht::token& key, cloc future paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { - return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] { - dht::token token = dht::get_token(*schema, key); - utils::latency_counter lc; - lc.start(); - // FIXME: Handle tablet intra-node migration: #16594. - // The shard can change concurrently, so we cannot rely on locking on this shard. - return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable { - // When preparing, we need to use the same time as "now" (that's the time we use to decide if something - // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up - // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no - // amount of re-submit will fix this (because the node on which the commit has expired will have a - // tombstone that hides any re-submit). See CASSANDRA-12043 for details. - auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); + co_await utils::get_local_injector().inject("paxos_prepare_timeout", timeout); + dht::token token = dht::get_token(*schema, key); + utils::latency_counter lc; + lc.start(); - auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { - // If received ballot is newer that the one we already accepted it has to be accepted as well, - // but we will return the previously accepted proposal so that the new coordinator will use it instead of - // its own. - if (ballot.timestamp() > state._promised_ballot.timestamp()) { - logger.debug("Promising ballot {}", ballot); - tracing::trace(tr_state, "Promising ballot {}", ballot); - if (utils::get_local_injector().enter("paxos_error_before_save_promise")) { - return make_exception_future(utils::injected_error("injected_error_before_save_promise")); - } - auto f1 = futurize_invoke([&] { - return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout); - }); - auto f2 = futurize_invoke([&] { - return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), - [&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { - return sp.get_db().local().query(schema, cmd, - {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, - prv, tr_state, timeout); - }); - }); - return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema, &sys_ks] (auto t) mutable { - if (utils::get_local_injector().enter("paxos_error_after_save_promise")) { - return make_exception_future(utils::injected_error("injected_error_after_save_promise")); - } - auto&& f1 = std::get<0>(t); - auto&& f2 = std::get<1>(t); - if (f1.failed()) { - f2.ignore_ready_future(); - // Failed to save promise. Nothing we can do but throw. - return make_exception_future(f1.get_exception()); - } - std::optional>, query::result_digest>> data_or_digest; - if (!f2.failed()) { - auto&& [result, hit_rate] = f2.get(); - if (only_digest) { - data_or_digest = *result->digest(); - } else { - data_or_digest = std::move(make_foreign(std::move(result))); - } - } else { - // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back - // on querying it by itself in case it's missing in the response. - auto ex = f2.get_exception(); - logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex)); - } - auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional p) { - if (!p || p->update.schema_version() == schema->version()) { - return make_ready_future>(std::move(p)); - } - // In case current schema is not the same as the schema in the proposal - // try to look it up first in the local schema_registry cache and upgrade - // the mutation using schema from the cache. - // - // If there's no schema in the cache, then retrieve persisted column mapping - // for that version and upgrade the mutation with it. - logger.debug("Stored mutation references outdated schema version. " - "Trying to upgrade the accepted proposal mutation to the most recent schema version."); - return service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) { - return make_ready_future>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm)))); - }); - }; - return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable { - return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest))); - }); - }); - } else { - logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); - tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); - // Return the currently promised ballot (rather than, e.g., the ballot of the last - // accepted proposal) so the coordinator can make sure it uses a newer ballot next - // time (#5667). - return make_ready_future(prepare_response(std::move(state._promised_ballot))); - } - }); - }).finally([&sp, schema, lc] () mutable { - auto& stats = sp.get_db().local().find_column_family(schema).get_stats(); - stats.cas_prepare.mark(lc.stop().latency()); - }); + auto stats_updater = defer([&sp, schema, lc] () mutable { + auto& stats = sp.get_db().local().find_column_family(schema).get_stats(); + stats.cas_prepare.mark(lc.stop().latency()); }); + + auto guard = co_await get_replica_lock(token, timeout); + // FIXME: Handle tablet intra-node migration: #16594. + // The shard can change concurrently, so we cannot rely on locking on this shard. + + // When preparing, we need to use the same time as "now" (that's the time we use to decide if something + // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up + // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no + // amount of re-submit will fix this (because the node on which the commit has expired will have a + // tombstone that hides any re-submit). See CASSANDRA-12043 for details. + auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); + + paxos_state state = co_await sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); + // If received ballot is newer that the one we already accepted it has to be accepted as well, + // but we will return the previously accepted proposal so that the new coordinator will use it instead of + // its own. + if (ballot.timestamp() > state._promised_ballot.timestamp()) { + logger.debug("Promising ballot {}", ballot); + tracing::trace(tr_state, "Promising ballot {}", ballot); + if (utils::get_local_injector().enter("paxos_error_before_save_promise")) { + co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise")); + } + + auto [f1, f2] = co_await when_all( + [&] { + return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout); + }, + coroutine::lambda([&] () -> future, cache_temperature>> { + co_return co_await sp.get_db().local().query(schema, cmd, + {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, + dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout); + }) + ); + + if (utils::get_local_injector().enter("paxos_error_after_save_promise")) { + co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise")); + } + + if (f1.failed()) { + f2.ignore_ready_future(); + // Failed to save promise. Nothing we can do but throw. + co_return coroutine::exception(f1.get_exception()); + } + std::optional>, query::result_digest>> data_or_digest; + if (!f2.failed()) { + auto&& [result, hit_rate] = f2.get(); + if (only_digest) { + data_or_digest = *result->digest(); + } else { + data_or_digest = make_foreign(std::move(result)); + } + } else { + // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back + // on querying it by itself in case it's missing in the response. + auto ex = f2.get_exception(); + logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex)); + } + auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional p) -> future> { + if (!p || p->update.schema_version() == schema->version()) { + co_return std::move(p); + } + // In case current schema is not the same as the schema in the proposal + // try to look it up first in the local schema_registry cache and upgrade + // the mutation using schema from the cache. + // + // If there's no schema in the cache, then retrieve persisted column mapping + // for that version and upgrade the mutation with it. + logger.debug("Stored mutation references outdated schema version. " + "Trying to upgrade the accepted proposal mutation to the most recent schema version."); + const column_mapping& cm = co_await service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()); + + co_return std::make_optional(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm)))); + }; + + auto [u1, u2] = co_await coroutine::all(std::bind(upgrade_if_needed, std::move(state._accepted_proposal)), std::bind(upgrade_if_needed, std::move(state._most_recent_commit))); + + co_return prepare_response(promise(std::move(u1), std::move(u2), std::move(data_or_digest))); + } else { + logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); + tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); + // Return the currently promised ballot (rather than, e.g., the ballot of the last + // accepted proposal) so the coordinator can make sure it uses a newer ballot next + // time (#5667). + co_return std::move(state._promised_ballot); + } } future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, From 887a5a8f6250dc1c4723911933cabda856c3b35d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 15:07:23 +0300 Subject: [PATCH 3/6] paxos: co-routinize paxos_state::accept function --- service/paxos/paxos_state.cc | 71 ++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 244449cd21..8f0bf51121 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -10,6 +10,7 @@ #include #include #include +#include "seastar/coroutine/exception.hh" #include "service/storage_proxy.hh" #include "service/paxos/proposal.hh" #include "service/paxos/paxos_state.hh" @@ -151,43 +152,43 @@ future paxos_state::prepare(storage_proxy& sp, db::system_keys future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout) { - return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout, - [&sp, &sys_ks, token = std::move(token), &proposal, schema, tr_state, timeout] { - utils::latency_counter lc; - lc.start(); - // FIXME: Handle tablet intra-node migration: #16594. - // The shard can change concurrently, so we cannot rely on locking on this shard. - return with_locked_key(token, timeout, [&sys_ks, &proposal, schema, tr_state, timeout] () mutable { - auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); - auto f = sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([&sys_ks, &proposal, tr_state, schema, timeout] (paxos_state state) { - // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. - // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. - if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { - logger.debug("Accepting proposal {}", proposal); - tracing::trace(tr_state, "Accepting proposal {}", proposal); + co_await utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout); + utils::latency_counter lc; + lc.start(); - if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) { - return make_exception_future(utils::injected_error("injected_error_before_save_proposal")); - } - - return sys_ks.save_paxos_proposal(*schema, proposal, timeout).then([] { - if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) { - return make_exception_future(utils::injected_error("injected_error_after_save_proposal")); - } - return make_ready_future(true); - }); - } else { - logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); - tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); - return make_ready_future(false); - } - }); - }).finally([&sp, schema, lc] () mutable { - auto& stats = sp.get_db().local().find_column_family(schema).get_stats(); - stats.cas_accept.mark(lc.stop().latency()); - }); + auto stats_updater = defer([&sp, schema, lc] () mutable { + auto& stats = sp.get_db().local().find_column_family(schema).get_stats(); + stats.cas_accept.mark(lc.stop().latency()); }); + + // FIXME: Handle tablet intra-node migration: #16594. + // The shard can change concurrently, so we cannot rely on locking on this shard. + auto guard = co_await get_replica_lock(token, timeout); + + auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); + paxos_state state = co_await sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); + + // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. + // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. + if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { + logger.debug("Accepting proposal {}", proposal); + tracing::trace(tr_state, "Accepting proposal {}", proposal); + + if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) { + co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_proposal")); + } + + co_await sys_ks.save_paxos_proposal(*schema, proposal, timeout); + + if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) { + co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_proposal")); + } + co_return true; + } else { + logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); + tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); + co_return false; + } } future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, From 6bf307ffe833cf132c1e262629ae50f74d344929 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 15:11:12 +0300 Subject: [PATCH 4/6] paxos: remove no longer used with_locked_key functions --- service/paxos/paxos_state.hh | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index a91746ab9a..bc011542da 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -39,18 +39,6 @@ private: map _locks; public: - // - // A thin RAII aware wrapper around the lock map to garbage - // collect the decorated key from the map on unlock if there - // are no waiters. - /// - template - futurize_t> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) { - return with_semaphore(get_semaphore_for_key(key), 1, timeout - clock_type::now(), std::move(func)).finally([key, this] { - release_semaphore_for_key(key); - }); - } - friend class guard; }; @@ -87,13 +75,6 @@ private: static thread_local key_lock_map _coordinator_lock; - // protects access to system.paxos - template - static - futurize_t> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) { - return _paxos_table_lock.with_locked_key(key, timeout, std::move(func)); - } - static future get_replica_lock(const dht::token& key, clock_type::time_point timeout); utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(); From 2b7acdb32c6deb1a6d02c4cd01326118e88f8836 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 15:24:41 +0300 Subject: [PATCH 5/6] paxos: co-routinize paxos_state::learn function --- service/paxos/paxos_state.cc | 88 +++++++++++++++++------------------- 1 file changed, 42 insertions(+), 46 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 8f0bf51121..a0e83d560f 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -194,60 +194,56 @@ future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { if (utils::get_local_injector().enter("paxos_error_before_learn")) { - return make_exception_future<>(utils::injected_error("injected_error_before_learn")); + co_await coroutine::return_exception(utils::injected_error("injected_error_before_learn")); } utils::latency_counter lc; lc.start(); - return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) { - auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout); - - replica::table& cf = sp.get_db().local().find_column_family(schema); - db_clock::time_point t = cf.get_truncation_time(); - auto truncated_at = std::chrono::duration_cast(t.time_since_epoch()); - // When saving a decision, also delete the last accepted proposal. This is just an - // optimization to save space. - // Even though there is no guarantee we will see decisions in the right order, - // because messages can get delayed, so this decision can be older than our current most - // recent accepted proposal/committed decision, saving it is always safe due to column timestamps. - // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell - // is strictly greater than the decision one, saving the decision will not erase it. - // - // The table may have been truncated since the proposal was initiated. In that case, we - // don't want to perform the mutation and potentially resurrect truncated data. - if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) { - f = f.then([&sp, schema, &decision, timeout, tr_state] { - logger.debug("Committing decision {}", decision); - tracing::trace(tr_state, "Committing decision {}", decision); - - // In case current schema is not the same as the schema in the decision - // try to look it up first in the local schema_registry cache and upgrade - // the mutation using schema from the cache. - // - // If there's no schema in the cache, then retrieve persisted column mapping - // for that version and upgrade the mutation with it. - if (decision.update.schema_version() != schema->version()) { - on_internal_error(logger, format("schema version in learn does not match current schema")); - } - - return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); - }); - } else { - logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision); - tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); - } - return f.then([&sys_ks, &decision, schema, timeout] { - // We don't need to lock the partition key if there is no gap between loading paxos - // state and saving it, and here we're just blindly updating. - return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] { - return sys_ks.save_paxos_decision(*schema, decision, timeout); - }); - }); - }).finally([&sp, schema, lc] () mutable { + auto stats_updater = defer([&sp, schema, lc] () mutable { auto& stats = sp.get_db().local().find_column_family(schema).get_stats(); stats.cas_learn.mark(lc.stop().latency()); }); + + co_await utils::get_local_injector().inject("paxos_state_learn_timeout", timeout); + + replica::table& cf = sp.get_db().local().find_column_family(schema); + db_clock::time_point t = cf.get_truncation_time(); + auto truncated_at = std::chrono::duration_cast(t.time_since_epoch()); + // When saving a decision, also delete the last accepted proposal. This is just an + // optimization to save space. + // Even though there is no guarantee we will see decisions in the right order, + // because messages can get delayed, so this decision can be older than our current most + // recent accepted proposal/committed decision, saving it is always safe due to column timestamps. + // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell + // is strictly greater than the decision one, saving the decision will not erase it. + // + // The table may have been truncated since the proposal was initiated. In that case, we + // don't want to perform the mutation and potentially resurrect truncated data. + if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) { + logger.debug("Committing decision {}", decision); + tracing::trace(tr_state, "Committing decision {}", decision); + + // In case current schema is not the same as the schema in the decision + // try to look it up first in the local schema_registry cache and upgrade + // the mutation using schema from the cache. + // + // If there's no schema in the cache, then retrieve persisted column mapping + // for that version and upgrade the mutation with it. + if (decision.update.schema_version() != schema->version()) { + on_internal_error(logger, format("schema version in learn does not match current schema")); + } + + co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); + } else { + logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision); + tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); + } + + // We don't need to lock the partition key if there is no gap between loading paxos + // state and saving it, and here we're just blindly updating. + co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout); + co_return co_await sys_ks.save_paxos_decision(*schema, decision, timeout); } future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, From 5c72af7a93316f9dc63e20eb83a90066ad891df6 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 27 Jun 2024 15:46:25 +0300 Subject: [PATCH 6/6] paxos: simplify paxos_state::prepare code to not work with raw futures --- service/paxos/paxos_state.cc | 43 +++++++++++++++--------------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index a0e83d560f..8a2a21931c 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -86,40 +86,33 @@ future paxos_state::prepare(storage_proxy& sp, db::system_keys co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise")); } - auto [f1, f2] = co_await when_all( + // The all() below throws only if save_paxos_promise fails. + // If querying the result fails we continue without read round optimization + auto [data_or_digest] = co_await coroutine::all( [&] { return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout); }, - coroutine::lambda([&] () -> future, cache_temperature>> { - co_return co_await sp.get_db().local().query(schema, cmd, - {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, - dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout); - }) + [&] () -> future>, query::result_digest>>> { + try { + auto&& [result, hit_rate] = co_await sp.get_db().local().query(schema, cmd, + {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, + dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout); + if (only_digest) { + co_return *result->digest(); + } else { + co_return make_foreign(std::move(result)); + } + } catch(...) { + logger.debug("Failed to get data or digest: {}. Ignored.", std::current_exception()); + co_return std::nullopt; + } + } ); if (utils::get_local_injector().enter("paxos_error_after_save_promise")) { co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise")); } - if (f1.failed()) { - f2.ignore_ready_future(); - // Failed to save promise. Nothing we can do but throw. - co_return coroutine::exception(f1.get_exception()); - } - std::optional>, query::result_digest>> data_or_digest; - if (!f2.failed()) { - auto&& [result, hit_rate] = f2.get(); - if (only_digest) { - data_or_digest = *result->digest(); - } else { - data_or_digest = make_foreign(std::move(result)); - } - } else { - // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back - // on querying it by itself in case it's missing in the response. - auto ex = f2.get_exception(); - logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex)); - } auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional p) -> future> { if (!p || p->update.schema_version() == schema->version()) { co_return std::move(p);