From 64cfb9b1f6ae7234cb493a0755c3307c19f5901a Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 10 Dec 2019 16:50:38 +0200 Subject: [PATCH] lwt: take raw lock for entire cas duration It will prevent parallel update by the same coordinator and should reduce contention. --- service/paxos/paxos_state.cc | 141 +++++++++++++++++------------------ service/paxos/paxos_state.hh | 10 +-- service/storage_proxy.cc | 110 +++++++++++++++------------ 3 files changed, 135 insertions(+), 126 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 107bfa074d..e024cfa124 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -49,63 +49,61 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) { future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, lw_shared_ptr cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { - dht::token token = dht::global_partitioner().get_token(*schema, key); utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [cmd, token, key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] () mutable { - // When preparing, we need to use the same time as "now" (that's the time we use to decide if something - // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up - // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no - // amount of re-submit will fix this (because the node on which the commit has expired will have a - // tombstone that hides any re-submit). See CASSANDRA-12043 for details. - auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); - auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([cmd, token = std::move(token), key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { - // If received ballot is newer that the one we already accepted it has to be accepted as well, - // but we will return the previously accepted proposal so that the new coordinator will use it instead of - // its own. - if (ballot.timestamp() > state._promised_ballot.timestamp()) { - logger.debug("Promising ballot {}", ballot); - tracing::trace(tr_state, "Promising ballot {}", ballot); - auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout); - auto f2 = futurize_apply([&] { - return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), - [tr_state, schema, cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { - return get_local_storage_proxy().get_db().local().query(schema, *cmd, - {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, - prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout); - }); + // When preparing, we need to use the same time as "now" (that's the time we use to decide if something + // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up + // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no + // amount of re-submit will fix this (because the node on which the commit has expired will have a + // tombstone that hides any re-submit). See CASSANDRA-12043 for details. + auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); + auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); + return f.then([cmd, key = std::move(key), ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { + // If received ballot is newer that the one we already accepted it has to be accepted as well, + // but we will return the previously accepted proposal so that the new coordinator will use it instead of + // its own. + if (ballot.timestamp() > state._promised_ballot.timestamp()) { + logger.debug("Promising ballot {}", ballot); + tracing::trace(tr_state, "Promising ballot {}", ballot); + auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout); + auto f2 = futurize_apply([&] { + dht::token token = dht::global_partitioner().get_token(*schema, key); + return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), + [tr_state, schema, cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { + return get_local_storage_proxy().get_db().local().query(schema, *cmd, + {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, + prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout); }); - return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, cmd] (auto t) { - auto&& f1 = std::get<0>(t); - auto&& f2 = std::get<1>(t); - if (f1.failed()) { - // Failed to save promise. Nothing we can do but throw. - return make_exception_future(f1.get_exception()); + }); + return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, cmd] (auto t) { + auto&& f1 = std::get<0>(t); + auto&& f2 = std::get<1>(t); + if (f1.failed()) { + // Failed to save promise. Nothing we can do but throw. + return make_exception_future(f1.get_exception()); + } + std::optional>, query::result_digest>> data_or_digest; + // Silently ignore any errors querying the current value as the caller is prepared to fall back + // on querying it by itself in case it's missing in the response. + if (!f2.failed()) { + auto&& [result, hit_rate] = f2.get(); + if (only_digest) { + data_or_digest = *result->digest(); + } else { + data_or_digest = std::move(make_foreign(std::move(result))); } - std::optional>, query::result_digest>> data_or_digest; - // Silently ignore any errors querying the current value as the caller is prepared to fall back - // on querying it by itself in case it's missing in the response. - if (!f2.failed()) { - auto&& [result, hit_rate] = f2.get(); - if (only_digest) { - data_or_digest = *result->digest(); - } else { - data_or_digest = std::move(make_foreign(std::move(result))); - } - } - return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), - std::move(state._most_recent_commit), std::move(data_or_digest)))); - }); - } else { - logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); - tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); - // Return the currently promised ballot (rather than, e.g., the ballot of the last - // accepted proposal) so the coordinator can make sure it uses a newer ballot next - // time (#5667). - return make_ready_future(prepare_response(std::move(state._promised_ballot))); - } - }); + } + return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), + std::move(state._most_recent_commit), std::move(data_or_digest)))); + }); + } else { + logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); + tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); + // Return the currently promised ballot (rather than, e.g., the ballot of the last + // accepted proposal) so the coordinator can make sure it uses a newer ballot next + // time (#5667). + return make_ready_future(prepare_response(std::move(state._promised_ballot))); + } }).finally([schema, lc] () mutable { auto& stats = get_local_storage_proxy().get_db().local().find_column_family(schema).get_stats(); stats.cas_prepare.mark(lc.stop().latency()); @@ -117,28 +115,25 @@ future paxos_state::prepare(tracing::trace_state_ptr tr_state, future paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr schema, proposal proposal, clock_type::time_point timeout) { - dht::token token = proposal.update.decorated_key(*schema).token(); utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable { - auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); - auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema, - gc_clock::time_point(now_in_sec), timeout); - return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) { - // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. - // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. - if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { - logger.debug("Accepting proposal {}", proposal); - tracing::trace(tr_state, "Accepting proposal {}", proposal); - return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] { - return true; - }); - } else { - logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); - tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); - return make_ready_future(false); - } - }); + auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); + auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema, + gc_clock::time_point(now_in_sec), timeout); + return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) { + // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. + // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. + if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { + logger.debug("Accepting proposal {}", proposal); + tracing::trace(tr_state, "Accepting proposal {}", proposal); + return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] { + return true; + }); + } else { + logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); + tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot); + return make_ready_future(false); + } }).finally([schema, lc] () mutable { auto& stats = get_local_storage_proxy().get_db().local().find_column_family(schema).get_stats(); stats.cas_propose.mark(lc.stop().latency()); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index d1e6c1d11d..d6df3b9604 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -63,6 +63,11 @@ class paxos_state { static key_semaphore& get_semaphore_for_key(const dht::token& key); static void release_semaphore_for_key(const dht::token& key); + utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(0); + std::optional _accepted_proposal; + std::optional _most_recent_commit; + +public: // // A thin RAII aware wrapper around the lock map to garbage // collect the decorated key from the map on unlock if there @@ -76,11 +81,6 @@ class paxos_state { }); } - utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(0); - std::optional _accepted_proposal; - std::optional _most_recent_commit; - -public: static logging::logger logger; paxos_state() {} diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f7cb80d93d..d0cfae8743 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4089,54 +4089,60 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque db::consistency_level::LOCAL_QUORUM : db::consistency_level::QUORUM; return do_with(unsigned(0), [this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges), - query_options = std::move(query_options), cl] (unsigned& contentions) mutable { + query_options = std::move(query_options), cl, write_timeout] (unsigned& contentions) mutable { + dht::token token = partition_ranges[0].start()->value().as_decorated_key().token(); utils::latency_counter lc; lc.start(); - return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges), - query_options = std::move(query_options), cl, &contentions] () mutable { - // Finish the previous PAXOS round, if any, and, as a side effect, compute - // a ballot (round identifier) which is a) unique b) has good chances of being - // recent enough. - return handler->begin_and_repair_paxos(query_options.cstate, contentions, true) - .then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions] - (paxos_response_handler::ballot_and_data v) mutable { - // Read the current values and check they validate the conditions. - paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id()); - tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); - auto f = v.data ? make_ready_future>>(std::move(v.data)) : - query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) { + + return paxos::paxos_state::with_locked_key(token, write_timeout, [this, lc, handler, schema, cmd, request, + partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), cl, + &contentions] () mutable { + return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges), + query_options = std::move(query_options), cl, &contentions] () mutable { + // Finish the previous PAXOS round, if any, and, as a side effect, compute + // a ballot (round identifier) which is a) unique b) has good chances of being + // recent enough. + return handler->begin_and_repair_paxos(query_options.cstate, contentions, true) + .then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions] + (paxos_response_handler::ballot_and_data v) mutable { + // Read the current values and check they validate the conditions. + paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id()); + tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); + auto f = v.data ? make_ready_future>>(std::move(v.data)) : + query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) { return make_ready_future>>(std::move(qr.query_result)); }); - return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) { - auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); - if (!mutation) { - paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); - tracing::trace(handler->tr_state, "CAS precondition does not match current values"); - ++_stats.cas_write_condition_not_met; - return make_ready_future>(false); - } - return do_with(paxos::proposal(ballot, freeze(*mutation)), - [handler, &contentions] (paxos::proposal& proposal) { - paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}", - handler->id(), proposal.ballot); - tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", - proposal.ballot); - return handler->accept_proposal(proposal).then([handler, &proposal, &contentions] (bool is_accepted) { - if (is_accepted) { - // The majority (aka a QUORUM) has promised the coordinator to - // accept the action associated with the computed ballot. - // Apply the mutation. - return handler->learn_decision(std::move(proposal)).then([handler] { - paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id()); - tracing::trace(handler->tr_state, "CAS successful"); - return std::optional(true); - }); - } - paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)", - handler->id()); - tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)"); - ++contentions; - return sleep_approx_50ms().then([] { return std::optional(); }); + return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) { + auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); + if (!mutation) { + paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); + tracing::trace(handler->tr_state, "CAS precondition does not match current values"); + ++_stats.cas_write_condition_not_met; + return make_ready_future>(false); + } + return do_with(paxos::proposal(ballot, freeze(*mutation)), + [handler, &contentions] (paxos::proposal& proposal) { + paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}", + handler->id(), proposal.ballot); + tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", + proposal.ballot); + return handler->accept_proposal(proposal).then([handler, &proposal, &contentions] (bool is_accepted) { + if (is_accepted) { + // The majority (aka a QUORUM) has promised the coordinator to + // accept the action associated with the computed ballot. + // Apply the mutation. + return handler->learn_decision(std::move(proposal)).then([handler] { + paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id()); + tracing::trace(handler->tr_state, "CAS successful"); + return std::optional(true); + }); + } + paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)", + handler->id()); + tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)"); + ++contentions; + return sleep_approx_50ms().then([] { return std::optional(); }); + }); }); }); }); @@ -4635,8 +4641,12 @@ void storage_proxy::init_messaging_service() { } return get_schema_for_read(cmd.schema_version, src_addr).then([cmd = make_lw_shared(std::move(cmd)), - key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) { - return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout); + key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) mutable { + dht::token token = dht::global_partitioner().get_token(*schema, key); + return paxos::paxos_state::with_locked_key(token, *timeout, [schema = std::move(schema), cmd = std::move(cmd), + key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] () mutable { + return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout); + }); }); }); ms.register_paxos_accept([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal, @@ -4649,8 +4659,12 @@ void storage_proxy::init_messaging_service() { } return get_schema_for_read(proposal.update.schema_version(), src_addr).then([tr_state = std::move(tr_state), - proposal = std::move(proposal), timeout] (schema_ptr schema) { - return paxos::paxos_state::accept(tr_state, std::move(schema), std::move(proposal), *timeout); + proposal = std::move(proposal), timeout] (schema_ptr schema) mutable { + dht::token token = proposal.update.decorated_key(*schema).token(); + return paxos::paxos_state::with_locked_key(token, *timeout, [schema = std::move(schema), tr_state = std::move(tr_state), + proposal = std::move(proposal), timeout] () mutable { + return paxos::paxos_state::accept(tr_state, std::move(schema), std::move(proposal), *timeout); + }); }); }); }