mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-01 04:26:48 +00:00
lwt: take raw lock for entire cas duration
It will prevent parallel update by the same coordinator and should reduce contention.
This commit is contained in:
@@ -49,63 +49,61 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) {
|
||||
future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema,
|
||||
lw_shared_ptr<query::read_command> cmd, partition_key key, utils::UUID ballot,
|
||||
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
|
||||
dht::token token = dht::global_partitioner().get_token(*schema, key);
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
return with_locked_key(token, timeout, [cmd, token, key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] () mutable {
|
||||
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
|
||||
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
|
||||
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
|
||||
// amount of re-submit will fix this (because the node on which the commit has expired will have a
|
||||
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
|
||||
auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([cmd, token = std::move(token), key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
|
||||
// If received ballot is newer that the one we already accepted it has to be accepted as well,
|
||||
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
|
||||
// its own.
|
||||
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Promising ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Promising ballot {}", ballot);
|
||||
auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout);
|
||||
auto f2 = futurize_apply([&] {
|
||||
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
|
||||
[tr_state, schema, cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
|
||||
return get_local_storage_proxy().get_db().local().query(schema, *cmd,
|
||||
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
|
||||
prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout);
|
||||
});
|
||||
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
|
||||
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
|
||||
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
|
||||
// amount of re-submit will fix this (because the node on which the commit has expired will have a
|
||||
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
|
||||
auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([cmd, key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
|
||||
// If received ballot is newer that the one we already accepted it has to be accepted as well,
|
||||
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
|
||||
// its own.
|
||||
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Promising ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Promising ballot {}", ballot);
|
||||
auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout);
|
||||
auto f2 = futurize_apply([&] {
|
||||
dht::token token = dht::global_partitioner().get_token(*schema, key);
|
||||
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
|
||||
[tr_state, schema, cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
|
||||
return get_local_storage_proxy().get_db().local().query(schema, *cmd,
|
||||
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
|
||||
prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout);
|
||||
});
|
||||
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, cmd] (auto t) {
|
||||
auto&& f1 = std::get<0>(t);
|
||||
auto&& f2 = std::get<1>(t);
|
||||
if (f1.failed()) {
|
||||
// Failed to save promise. Nothing we can do but throw.
|
||||
return make_exception_future<prepare_response>(f1.get_exception());
|
||||
});
|
||||
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, cmd] (auto t) {
|
||||
auto&& f1 = std::get<0>(t);
|
||||
auto&& f2 = std::get<1>(t);
|
||||
if (f1.failed()) {
|
||||
// Failed to save promise. Nothing we can do but throw.
|
||||
return make_exception_future<prepare_response>(f1.get_exception());
|
||||
}
|
||||
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
|
||||
// Silently ignore any errors querying the current value as the caller is prepared to fall back
|
||||
// on querying it by itself in case it's missing in the response.
|
||||
if (!f2.failed()) {
|
||||
auto&& [result, hit_rate] = f2.get();
|
||||
if (only_digest) {
|
||||
data_or_digest = *result->digest();
|
||||
} else {
|
||||
data_or_digest = std::move(make_foreign(std::move(result)));
|
||||
}
|
||||
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
|
||||
// Silently ignore any errors querying the current value as the caller is prepared to fall back
|
||||
// on querying it by itself in case it's missing in the response.
|
||||
if (!f2.failed()) {
|
||||
auto&& [result, hit_rate] = f2.get();
|
||||
if (only_digest) {
|
||||
data_or_digest = *result->digest();
|
||||
} else {
|
||||
data_or_digest = std::move(make_foreign(std::move(result)));
|
||||
}
|
||||
}
|
||||
return make_ready_future<prepare_response>(prepare_response(promise(std::move(state._accepted_proposal),
|
||||
std::move(state._most_recent_commit), std::move(data_or_digest))));
|
||||
});
|
||||
} else {
|
||||
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
// Return the currently promised ballot (rather than, e.g., the ballot of the last
|
||||
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
|
||||
// time (#5667).
|
||||
return make_ready_future<prepare_response>(prepare_response(std::move(state._promised_ballot)));
|
||||
}
|
||||
});
|
||||
}
|
||||
return make_ready_future<prepare_response>(prepare_response(promise(std::move(state._accepted_proposal),
|
||||
std::move(state._most_recent_commit), std::move(data_or_digest))));
|
||||
});
|
||||
} else {
|
||||
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
|
||||
// Return the currently promised ballot (rather than, e.g., the ballot of the last
|
||||
// accepted proposal) so the coordinator can make sure it uses a newer ballot next
|
||||
// time (#5667).
|
||||
return make_ready_future<prepare_response>(prepare_response(std::move(state._promised_ballot)));
|
||||
}
|
||||
}).finally([schema, lc] () mutable {
|
||||
auto& stats = get_local_storage_proxy().get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_prepare.mark(lc.stop().latency());
|
||||
@@ -117,28 +115,25 @@ future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state,
|
||||
|
||||
future<bool> paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr schema, proposal proposal,
|
||||
clock_type::time_point timeout) {
|
||||
dht::token token = proposal.update.decorated_key(*schema).token();
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable {
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
|
||||
auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema,
|
||||
gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) {
|
||||
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
|
||||
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
|
||||
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Accepting proposal {}", proposal);
|
||||
tracing::trace(tr_state, "Accepting proposal {}", proposal);
|
||||
return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] {
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
});
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
|
||||
auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema,
|
||||
gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) {
|
||||
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
|
||||
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
|
||||
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Accepting proposal {}", proposal);
|
||||
tracing::trace(tr_state, "Accepting proposal {}", proposal);
|
||||
return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] {
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
}).finally([schema, lc] () mutable {
|
||||
auto& stats = get_local_storage_proxy().get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_propose.mark(lc.stop().latency());
|
||||
|
||||
@@ -63,6 +63,11 @@ class paxos_state {
|
||||
static key_semaphore& get_semaphore_for_key(const dht::token& key);
|
||||
static void release_semaphore_for_key(const dht::token& key);
|
||||
|
||||
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(0);
|
||||
std::optional<proposal> _accepted_proposal;
|
||||
std::optional<proposal> _most_recent_commit;
|
||||
|
||||
public:
|
||||
//
|
||||
// A thin RAII aware wrapper around the lock map to garbage
|
||||
// collect the decorated key from the map on unlock if there
|
||||
@@ -76,11 +81,6 @@ class paxos_state {
|
||||
});
|
||||
}
|
||||
|
||||
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(0);
|
||||
std::optional<proposal> _accepted_proposal;
|
||||
std::optional<proposal> _most_recent_commit;
|
||||
|
||||
public:
|
||||
static logging::logger logger;
|
||||
|
||||
paxos_state() {}
|
||||
|
||||
@@ -4089,54 +4089,60 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
db::consistency_level::LOCAL_QUORUM : db::consistency_level::QUORUM;
|
||||
|
||||
return do_with(unsigned(0), [this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges),
|
||||
query_options = std::move(query_options), cl] (unsigned& contentions) mutable {
|
||||
query_options = std::move(query_options), cl, write_timeout] (unsigned& contentions) mutable {
|
||||
dht::token token = partition_ranges[0].start()->value().as_decorated_key().token();
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges),
|
||||
query_options = std::move(query_options), cl, &contentions] () mutable {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
// recent enough.
|
||||
return handler->begin_and_repair_paxos(query_options.cstate, contentions, true)
|
||||
.then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions]
|
||||
(paxos_response_handler::ballot_and_data v) mutable {
|
||||
// Read the current values and check they validate the conditions.
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id());
|
||||
tracing::trace(handler->tr_state, "Reading existing values for CAS precondition");
|
||||
auto f = v.data ? make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(v.data)) :
|
||||
query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) {
|
||||
|
||||
return paxos::paxos_state::with_locked_key(token, write_timeout, [this, lc, handler, schema, cmd, request,
|
||||
partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), cl,
|
||||
&contentions] () mutable {
|
||||
return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges),
|
||||
query_options = std::move(query_options), cl, &contentions] () mutable {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
// recent enough.
|
||||
return handler->begin_and_repair_paxos(query_options.cstate, contentions, true)
|
||||
.then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions]
|
||||
(paxos_response_handler::ballot_and_data v) mutable {
|
||||
// Read the current values and check they validate the conditions.
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id());
|
||||
tracing::trace(handler->tr_state, "Reading existing values for CAS precondition");
|
||||
auto f = v.data ? make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(v.data)) :
|
||||
query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) {
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(qr.query_result));
|
||||
});
|
||||
return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) {
|
||||
auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
|
||||
if (!mutation) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS precondition does not match current values");
|
||||
++_stats.cas_write_condition_not_met;
|
||||
return make_ready_future<std::optional<bool>>(false);
|
||||
}
|
||||
return do_with(paxos::proposal(ballot, freeze(*mutation)),
|
||||
[handler, &contentions] (paxos::proposal& proposal) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
|
||||
handler->id(), proposal.ballot);
|
||||
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}",
|
||||
proposal.ballot);
|
||||
return handler->accept_proposal(proposal).then([handler, &proposal, &contentions] (bool is_accepted) {
|
||||
if (is_accepted) {
|
||||
// The majority (aka a QUORUM) has promised the coordinator to
|
||||
// accept the action associated with the computed ballot.
|
||||
// Apply the mutation.
|
||||
return handler->learn_decision(std::move(proposal)).then([handler] {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS successful");
|
||||
return std::optional<bool>(true);
|
||||
});
|
||||
}
|
||||
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
|
||||
++contentions;
|
||||
return sleep_approx_50ms().then([] { return std::optional<bool>(); });
|
||||
return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) {
|
||||
auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
|
||||
if (!mutation) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS precondition does not match current values");
|
||||
++_stats.cas_write_condition_not_met;
|
||||
return make_ready_future<std::optional<bool>>(false);
|
||||
}
|
||||
return do_with(paxos::proposal(ballot, freeze(*mutation)),
|
||||
[handler, &contentions] (paxos::proposal& proposal) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
|
||||
handler->id(), proposal.ballot);
|
||||
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}",
|
||||
proposal.ballot);
|
||||
return handler->accept_proposal(proposal).then([handler, &proposal, &contentions] (bool is_accepted) {
|
||||
if (is_accepted) {
|
||||
// The majority (aka a QUORUM) has promised the coordinator to
|
||||
// accept the action associated with the computed ballot.
|
||||
// Apply the mutation.
|
||||
return handler->learn_decision(std::move(proposal)).then([handler] {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS successful");
|
||||
return std::optional<bool>(true);
|
||||
});
|
||||
}
|
||||
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
|
||||
++contentions;
|
||||
return sleep_approx_50ms().then([] { return std::optional<bool>(); });
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -4635,8 +4641,12 @@ void storage_proxy::init_messaging_service() {
|
||||
}
|
||||
|
||||
return get_schema_for_read(cmd.schema_version, src_addr).then([cmd = make_lw_shared<query::read_command>(std::move(cmd)),
|
||||
key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) {
|
||||
return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout);
|
||||
key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) mutable {
|
||||
dht::token token = dht::global_partitioner().get_token(*schema, key);
|
||||
return paxos::paxos_state::with_locked_key(token, *timeout, [schema = std::move(schema), cmd = std::move(cmd),
|
||||
key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] () mutable {
|
||||
return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout);
|
||||
});
|
||||
});
|
||||
});
|
||||
ms.register_paxos_accept([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,
|
||||
@@ -4649,8 +4659,12 @@ void storage_proxy::init_messaging_service() {
|
||||
}
|
||||
|
||||
return get_schema_for_read(proposal.update.schema_version(), src_addr).then([tr_state = std::move(tr_state),
|
||||
proposal = std::move(proposal), timeout] (schema_ptr schema) {
|
||||
return paxos::paxos_state::accept(tr_state, std::move(schema), std::move(proposal), *timeout);
|
||||
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
|
||||
dht::token token = proposal.update.decorated_key(*schema).token();
|
||||
return paxos::paxos_state::with_locked_key(token, *timeout, [schema = std::move(schema), tr_state = std::move(tr_state),
|
||||
proposal = std::move(proposal), timeout] () mutable {
|
||||
return paxos::paxos_state::accept(tr_state, std::move(schema), std::move(proposal), *timeout);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user