paxos: co-routinize paxos_state::prepare function
This commit is contained in:
@@ -7,7 +7,9 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
#include <exception>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/all.hh>
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
@@ -51,101 +53,100 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
|
||||
future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
|
||||
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
|
||||
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
|
||||
return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
// FIXME: Handle tablet intra-node migration: #16594.
|
||||
// The shard can change concurrently, so we cannot rely on locking on this shard.
|
||||
return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
|
||||
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
|
||||
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
|
||||
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
|
||||
// amount of re-submit will fix this (because the node on which the commit has expired will have a
|
||||
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
|
||||
co_await utils::get_local_injector().inject("paxos_prepare_timeout", timeout);
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
|
||||
// If received ballot is newer that the one we already accepted it has to be accepted as well,
|
||||
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
|
||||
// its own.
|
||||
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Promising ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Promising ballot {}", ballot);
|
||||
if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
|
||||
return make_exception_future<prepare_response>(utils::injected_error("injected_error_before_save_promise"));
|
||||
}
|
||||
auto f1 = futurize_invoke([&] {
|
||||
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
|
||||
});
|
||||
auto f2 = futurize_invoke([&] {
|
||||
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
|
||||
[&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
|
||||
return sp.get_db().local().query(schema, cmd,
|
||||
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
|
||||
prv, tr_state, timeout);
|
||||
});
|
||||
});
|
||||
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema, &sys_ks] (auto t) mutable {
|
||||
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
|
||||
return make_exception_future<prepare_response>(utils::injected_error("injected_error_after_save_promise"));
|
||||
}
|
||||
auto&& f1 = std::get<0>(t);
|
||||
auto&& f2 = std::get<1>(t);
|
||||
if (f1.failed()) {
|
||||
f2.ignore_ready_future();
|
||||
// Failed to save promise. Nothing we can do but throw.
|
||||
return make_exception_future<prepare_response>(f1.get_exception());
|
||||
}
|
||||
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
|
||||
if (!f2.failed()) {
|
||||
auto&& [result, hit_rate] = f2.get();
|
||||
if (only_digest) {
|
||||
data_or_digest = *result->digest();
|
||||
} else {
|
||||
data_or_digest = std::move(make_foreign(std::move(result)));
|
||||
}
|
||||
} else {
|
||||
// Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
|
||||
// on querying it by itself in case it's missing in the response.
|
||||
auto ex = f2.get_exception();
|
||||
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
|
||||
}
|
||||
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) {
|
||||
if (!p || p->update.schema_version() == schema->version()) {
|
||||
return make_ready_future<std::optional<proposal>>(std::move(p));
|
||||
}
|
||||
// In case current schema is not the same as the schema in the proposal
|
||||
// try to look it up first in the local schema_registry cache and upgrade
|
||||
// the mutation using schema from the cache.
|
||||
//
|
||||
// If there's no schema in the cache, then retrieve persisted column mapping
|
||||
// for that version and upgrade the mutation with it.
|
||||
logger.debug("Stored mutation references outdated schema version. "
|
||||
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
|
||||
return service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
|
||||
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
|
||||
});
|
||||
};
|
||||
return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable {
|
||||
return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest)));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
// Return the currently promised ballot (rather than, e.g., the ballot of the last
|
||||
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
|
||||
// time (#5667).
|
||||
return make_ready_future<prepare_response>(prepare_response(std::move(state._promised_ballot)));
|
||||
}
|
||||
});
|
||||
}).finally([&sp, schema, lc] () mutable {
|
||||
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_prepare.mark(lc.stop().latency());
|
||||
});
|
||||
auto stats_updater = defer([&sp, schema, lc] () mutable {
|
||||
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_prepare.mark(lc.stop().latency());
|
||||
});
|
||||
|
||||
auto guard = co_await get_replica_lock(token, timeout);
|
||||
// FIXME: Handle tablet intra-node migration: #16594.
|
||||
// The shard can change concurrently, so we cannot rely on locking on this shard.
|
||||
|
||||
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
|
||||
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
|
||||
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
|
||||
// amount of re-submit will fix this (because the node on which the commit has expired will have a
|
||||
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
|
||||
|
||||
paxos_state state = co_await sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
// If received ballot is newer that the one we already accepted it has to be accepted as well,
|
||||
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
|
||||
// its own.
|
||||
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Promising ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Promising ballot {}", ballot);
|
||||
if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
|
||||
co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise"));
|
||||
}
|
||||
|
||||
auto [f1, f2] = co_await when_all(
|
||||
[&] {
|
||||
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
|
||||
},
|
||||
coroutine::lambda([&] () -> future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> {
|
||||
co_return co_await sp.get_db().local().query(schema, cmd,
|
||||
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
|
||||
dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout);
|
||||
})
|
||||
);
|
||||
|
||||
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
|
||||
co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise"));
|
||||
}
|
||||
|
||||
if (f1.failed()) {
|
||||
f2.ignore_ready_future();
|
||||
// Failed to save promise. Nothing we can do but throw.
|
||||
co_return coroutine::exception(f1.get_exception());
|
||||
}
|
||||
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
|
||||
if (!f2.failed()) {
|
||||
auto&& [result, hit_rate] = f2.get();
|
||||
if (only_digest) {
|
||||
data_or_digest = *result->digest();
|
||||
} else {
|
||||
data_or_digest = make_foreign(std::move(result));
|
||||
}
|
||||
} else {
|
||||
// Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
|
||||
// on querying it by itself in case it's missing in the response.
|
||||
auto ex = f2.get_exception();
|
||||
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
|
||||
}
|
||||
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) -> future<std::optional<proposal>> {
|
||||
if (!p || p->update.schema_version() == schema->version()) {
|
||||
co_return std::move(p);
|
||||
}
|
||||
// In case current schema is not the same as the schema in the proposal
|
||||
// try to look it up first in the local schema_registry cache and upgrade
|
||||
// the mutation using schema from the cache.
|
||||
//
|
||||
// If there's no schema in the cache, then retrieve persisted column mapping
|
||||
// for that version and upgrade the mutation with it.
|
||||
logger.debug("Stored mutation references outdated schema version. "
|
||||
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
|
||||
const column_mapping& cm = co_await service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version());
|
||||
|
||||
co_return std::make_optional(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
|
||||
};
|
||||
|
||||
auto [u1, u2] = co_await coroutine::all(std::bind(upgrade_if_needed, std::move(state._accepted_proposal)), std::bind(upgrade_if_needed, std::move(state._most_recent_commit)));
|
||||
|
||||
co_return prepare_response(promise(std::move(u1), std::move(u2), std::move(data_or_digest)));
|
||||
} else {
|
||||
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
// Return the currently promised ballot (rather than, e.g., the ballot of the last
|
||||
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
|
||||
// time (#5667).
|
||||
co_return std::move(state._promised_ballot);
|
||||
}
|
||||
}
|
||||
|
||||
future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
|
||||
|
||||
Reference in New Issue
Block a user