Merge 'co-routinize paxos_state functions' from Gleb

Co-routinize paxos_state functions to make them more readable.

* 'gleb/coroutineze-paxos-state' of github.com:scylladb/scylla-dev:
  paxos: simplify paxos_state::prepare code to not work with raw futures
  paxos: co-routinize paxos_state::learn function
  paxos: remove no longer used with_locked_key functions
  paxos: co-routinize paxos_state::accept function
  paxos: co-routinize paxos_state::prepare function
  paxos: introduce get_replica_lock() function to take RAII guard for local paxos table access
This commit is contained in:
Kamil Braun
2024-07-02 11:54:13 +02:00
3 changed files with 191 additions and 212 deletions

View File

@@ -7,7 +7,10 @@
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <exception>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include "seastar/coroutine/exception.hh"
#include "service/storage_proxy.hh"
#include "service/paxos/proposal.hh"
#include "service/paxos/paxos_state.hh"
@@ -36,6 +39,12 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
}
}
future<paxos_state::guard> paxos_state::get_replica_lock(const dht::token& key, clock_type::time_point timeout) {
guard m(_paxos_table_lock, key, timeout);
co_await m.lock();
co_return m;
}
future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) {
guard m(_coordinator_lock, key, timeout);
co_await m.lock();
@@ -45,201 +54,189 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
dht::token token = dht::get_token(*schema, key);
utils::latency_counter lc;
lc.start();
// FIXME: Handle tablet intra-node migration: #16594.
// The shard can change concurrently, so we cannot rely on locking on this shard.
return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
// amount of re-submit will fix this (because the node on which the commit has expired will have a
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
co_await utils::get_local_injector().inject("paxos_prepare_timeout", timeout);
dht::token token = dht::get_token(*schema, key);
utils::latency_counter lc;
lc.start();
auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
// If received ballot is newer that the one we already accepted it has to be accepted as well,
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
// its own.
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Promising ballot {}", ballot);
tracing::trace(tr_state, "Promising ballot {}", ballot);
if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
return make_exception_future<prepare_response>(utils::injected_error("injected_error_before_save_promise"));
}
auto f1 = futurize_invoke([&] {
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
});
auto f2 = futurize_invoke([&] {
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
[&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
return sp.get_db().local().query(schema, cmd,
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
prv, tr_state, timeout);
});
});
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema, &sys_ks] (auto t) mutable {
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
return make_exception_future<prepare_response>(utils::injected_error("injected_error_after_save_promise"));
}
auto&& f1 = std::get<0>(t);
auto&& f2 = std::get<1>(t);
if (f1.failed()) {
f2.ignore_ready_future();
// Failed to save promise. Nothing we can do but throw.
return make_exception_future<prepare_response>(f1.get_exception());
}
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
if (!f2.failed()) {
auto&& [result, hit_rate] = f2.get();
if (only_digest) {
data_or_digest = *result->digest();
} else {
data_or_digest = std::move(make_foreign(std::move(result)));
}
} else {
// Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
// on querying it by itself in case it's missing in the response.
auto ex = f2.get_exception();
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
}
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) {
if (!p || p->update.schema_version() == schema->version()) {
return make_ready_future<std::optional<proposal>>(std::move(p));
}
// In case current schema is not the same as the schema in the proposal
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
logger.debug("Stored mutation references outdated schema version. "
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
return service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
});
};
return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable {
return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest)));
});
});
} else {
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
// Return the currently promised ballot (rather than, e.g., the ballot of the last
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
// time (#5667).
return make_ready_future<prepare_response>(prepare_response(std::move(state._promised_ballot)));
}
});
}).finally([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_prepare.mark(lc.stop().latency());
});
auto stats_updater = defer([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_prepare.mark(lc.stop().latency());
});
auto guard = co_await get_replica_lock(token, timeout);
// FIXME: Handle tablet intra-node migration: #16594.
// The shard can change concurrently, so we cannot rely on locking on this shard.
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
// amount of re-submit will fix this (because the node on which the commit has expired will have a
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
paxos_state state = co_await sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
// If received ballot is newer that the one we already accepted it has to be accepted as well,
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
// its own.
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Promising ballot {}", ballot);
tracing::trace(tr_state, "Promising ballot {}", ballot);
if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise"));
}
// The all() below throws only if save_paxos_promise fails.
// If querying the result fails we continue without read round optimization
auto [data_or_digest] = co_await coroutine::all(
[&] {
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
},
[&] () -> future<std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>>> {
try {
auto&& [result, hit_rate] = co_await sp.get_db().local().query(schema, cmd,
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout);
if (only_digest) {
co_return *result->digest();
} else {
co_return make_foreign(std::move(result));
}
} catch(...) {
logger.debug("Failed to get data or digest: {}. Ignored.", std::current_exception());
co_return std::nullopt;
}
}
);
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise"));
}
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) -> future<std::optional<proposal>> {
if (!p || p->update.schema_version() == schema->version()) {
co_return std::move(p);
}
// In case current schema is not the same as the schema in the proposal
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
logger.debug("Stored mutation references outdated schema version. "
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
const column_mapping& cm = co_await service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version());
co_return std::make_optional(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
};
auto [u1, u2] = co_await coroutine::all(std::bind(upgrade_if_needed, std::move(state._accepted_proposal)), std::bind(upgrade_if_needed, std::move(state._most_recent_commit)));
co_return prepare_response(promise(std::move(u1), std::move(u2), std::move(data_or_digest)));
} else {
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
// Return the currently promised ballot (rather than, e.g., the ballot of the last
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
// time (#5667).
co_return std::move(state._promised_ballot);
}
}
future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout,
[&sp, &sys_ks, token = std::move(token), &proposal, schema, tr_state, timeout] {
utils::latency_counter lc;
lc.start();
// FIXME: Handle tablet intra-node migration: #16594.
// The shard can change concurrently, so we cannot rely on locking on this shard.
return with_locked_key(token, timeout, [&sys_ks, &proposal, schema, tr_state, timeout] () mutable {
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
auto f = sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&sys_ks, &proposal, tr_state, schema, timeout] (paxos_state state) {
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Accepting proposal {}", proposal);
tracing::trace(tr_state, "Accepting proposal {}", proposal);
co_await utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout);
utils::latency_counter lc;
lc.start();
if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) {
return make_exception_future<bool>(utils::injected_error("injected_error_before_save_proposal"));
}
return sys_ks.save_paxos_proposal(*schema, proposal, timeout).then([] {
if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) {
return make_exception_future<bool>(utils::injected_error("injected_error_after_save_proposal"));
}
return make_ready_future<bool>(true);
});
} else {
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
return make_ready_future<bool>(false);
}
});
}).finally([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_accept.mark(lc.stop().latency());
});
auto stats_updater = defer([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_accept.mark(lc.stop().latency());
});
// FIXME: Handle tablet intra-node migration: #16594.
// The shard can change concurrently, so we cannot rely on locking on this shard.
auto guard = co_await get_replica_lock(token, timeout);
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
paxos_state state = co_await sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Accepting proposal {}", proposal);
tracing::trace(tr_state, "Accepting proposal {}", proposal);
if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) {
co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_proposal"));
}
co_await sys_ks.save_paxos_proposal(*schema, proposal, timeout);
if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) {
co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_proposal"));
}
co_return true;
} else {
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
co_return false;
}
}
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
co_await coroutine::return_exception(utils::injected_error("injected_error_before_learn"));
}
utils::latency_counter lc;
lc.start();
return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
replica::table& cf = sp.get_db().local().find_column_family(schema);
db_clock::time_point t = cf.get_truncation_time();
auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
// When saving a decision, also delete the last accepted proposal. This is just an
// optimization to save space.
// Even though there is no guarantee we will see decisions in the right order,
// because messages can get delayed, so this decision can be older than our current most
// recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
// Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
// is strictly greater than the decision one, saving the decision will not erase it.
//
// The table may have been truncated since the proposal was initiated. In that case, we
// don't want to perform the mutation and potentially resurrect truncated data.
if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
f = f.then([&sp, schema, &decision, timeout, tr_state] {
logger.debug("Committing decision {}", decision);
tracing::trace(tr_state, "Committing decision {}", decision);
// In case current schema is not the same as the schema in the decision
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
if (decision.update.schema_version() != schema->version()) {
on_internal_error(logger, format("schema version in learn does not match current schema"));
}
return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
});
} else {
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
}
return f.then([&sys_ks, &decision, schema, timeout] {
// We don't need to lock the partition key if there is no gap between loading paxos
// state and saving it, and here we're just blindly updating.
return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] {
return sys_ks.save_paxos_decision(*schema, decision, timeout);
});
});
}).finally([&sp, schema, lc] () mutable {
auto stats_updater = defer([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_learn.mark(lc.stop().latency());
});
co_await utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
replica::table& cf = sp.get_db().local().find_column_family(schema);
db_clock::time_point t = cf.get_truncation_time();
auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
// When saving a decision, also delete the last accepted proposal. This is just an
// optimization to save space.
// Even though there is no guarantee we will see decisions in the right order,
// because messages can get delayed, so this decision can be older than our current most
// recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
// Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
// is strictly greater than the decision one, saving the decision will not erase it.
//
// The table may have been truncated since the proposal was initiated. In that case, we
// don't want to perform the mutation and potentially resurrect truncated data.
if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
logger.debug("Committing decision {}", decision);
tracing::trace(tr_state, "Committing decision {}", decision);
// In case current schema is not the same as the schema in the decision
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
if (decision.update.schema_version() != schema->version()) {
on_internal_error(logger, format("schema version in learn does not match current schema"));
}
co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
} else {
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
}
// We don't need to lock the partition key if there is no gap between loading paxos
// state and saving it, and here we're just blindly updating.
co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
co_return co_await sys_ks.save_paxos_decision(*schema, decision, timeout);
}
future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,

View File

@@ -26,9 +26,8 @@ using clock_type = db::timeout_clock;
// The state of a CAS update of a given primary key as persisted in the paxos table.
class paxos_state {
public:
class guard;
private:
class guard;
class key_lock_map {
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
@@ -40,44 +39,9 @@ private:
map _locks;
public:
//
// A thin RAII aware wrapper around the lock map to garbage
// collect the decorated key from the map on unlock if there
// are no waiters.
///
template<typename Func>
futurize_t<std::invoke_result_t<Func>> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) {
return with_semaphore(get_semaphore_for_key(key), 1, timeout - clock_type::now(), std::move(func)).finally([key, this] {
release_semaphore_for_key(key);
});
}
friend class guard;
};
// Locks are local to the shard which owns the corresponding token range.
// Protects concurrent reads and writes of the same row in system.paxos table.
static thread_local key_lock_map _paxos_table_lock;
// Taken by the coordinator code to allow only one instance of PAXOS to run for each key.
// This prevents contantion between multiple clients trying to modify the
// same key through the same coordinator and stealing the ballot from
// each other.
static thread_local key_lock_map _coordinator_lock;
// protects access to system.paxos
template<typename Func>
static
futurize_t<std::invoke_result_t<Func>> with_locked_key(const dht::token& key, clock_type::time_point timeout, Func func) {
return _paxos_table_lock.with_locked_key(key, timeout, std::move(func));
}
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID();
std::optional<proposal> _accepted_proposal;
std::optional<proposal> _most_recent_commit;
public:
class guard {
key_lock_map& _map;
dht::token _key;
@@ -101,6 +65,24 @@ public:
}
};
// Locks are local to the shard which owns the corresponding token range.
// Protects concurrent reads and writes of the same row in system.paxos table.
static thread_local key_lock_map _paxos_table_lock;
// Taken by the coordinator code to allow only one instance of PAXOS to run for each key.
// This prevents contantion between multiple clients trying to modify the
// same key through the same coordinator and stealing the ballot from
// each other.
static thread_local key_lock_map _coordinator_lock;
static future<guard> get_replica_lock(const dht::token& key, clock_type::time_point timeout);
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID();
std::optional<proposal> _accepted_proposal;
std::optional<proposal> _most_recent_commit;
public:
static future<guard> get_cas_lock(const dht::token& key, clock_type::time_point timeout);
static logging::logger logger;

View File

@@ -6246,7 +6246,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
}
});
paxos::paxos_state::guard l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
while (true) {
// Finish the previous PAXOS round, if any, and, as a side effect, compute