diff --git a/db/query_context.hh b/db/query_context.hh index 64e4ae679f..e43321af77 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -58,13 +58,13 @@ struct query_context { // let the `storage_proxy` time out the query down the call chain db::timeout_clock::duration::zero(); - const timeout_config tcfg{d, d, d, d, d, d, d}; - - return _qp.local().process(req, - cql3::query_options::DEFAULT.get_consistency(), - tcfg, - { data_value(std::forward(args))... }, - true); + return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) { + return _qp.local().process(req, + cql3::query_options::DEFAULT.get_consistency(), + tcfg, + { data_value(std::forward(args))... }, + true); + }); } database& db() { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index ad7136f737..4ba1f7530c 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2062,13 +2062,13 @@ future> load_view_build_progress() { }); } -future load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now, +future load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout) { static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS); // FIXME: we need execute_cql_with_now() (void)now; auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id()); - return f.then([key = std::move(key), s] (shared_ptr results) mutable { + return f.then([s] (shared_ptr results) mutable { if (results->empty()) { return service::paxos::paxos_state(); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 399ae9e8ea..0dc17fff66 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -615,7 +615,7 @@ future> load_built_views(); future> load_view_build_progress(); // Paxos related functions -future load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now, +future load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout); future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout); diff --git a/idl/paxos.idl.hh b/idl/paxos.idl.hh index 0318857ca5..238b7c2041 100644 --- a/idl/paxos.idl.hh +++ b/idl/paxos.idl.hh @@ -30,6 +30,7 @@ class proposal { class promise { std::optional accepted_proposal; std::optional most_recent_commit; + std::optional> get_data_or_digest(); }; } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 4828891c18..d48d4619d9 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1230,8 +1230,8 @@ future> messaging_service::send_rep void messaging_service::register_paxos_prepare(std::function( - const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot, - std::optional)>&& func) { + const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional)>&& func) { register_handler(this, messaging_verb::PAXOS_PREPARE, std::move(func)); } void messaging_service::unregister_paxos_prepare() { @@ -1239,9 +1239,10 @@ void messaging_service::unregister_paxos_prepare() { } future messaging_service::send_paxos_prepare(gms::inet_address peer, clock_type::time_point timeout, - utils::UUID schema_version, partition_key key, utils::UUID ballot, std::optional trace_info) { + const query::read_command& cmd, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional trace_info) { return send_message_timeout(this, - messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, schema_version, key, ballot, std::move(trace_info)); + messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, cmd, key, ballot, only_digest, da, std::move(trace_info)); } void messaging_service::register_paxos_accept(std::function( diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 7828c0bd48..6df9c577ac 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -462,14 +462,15 @@ public: // Wrappers for PAXOS verbs void register_paxos_prepare(std::function( - const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot, - std::optional)>&& func); + const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional)>&& func); void unregister_paxos_prepare(); future send_paxos_prepare( - gms::inet_address peer, clock_type::time_point timeout, utils::UUID schema_version, - partition_key key, utils::UUID ballot, std::optional trace_info); + gms::inet_address peer, clock_type::time_point timeout, const query::read_command& cmd, + const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, + std::optional trace_info); void register_paxos_accept(std::function(const rpc::client_info&, rpc::opt_time_point, service::paxos::proposal proposal, std::optional)>&& func); diff --git a/serializer.hh b/serializer.hh index f5c8020a63..d31260562b 100644 --- a/serializer.hh +++ b/serializer.hh @@ -232,6 +232,11 @@ inline void serialize(Output& out, const T& v) { serializer::write(out, v); }; +template +inline void serialize(Output& out, const std::reference_wrapper v) { + serializer::write(out, v.get()); +} + template inline auto deserialize(Input& in, boost::type t) { return serializer::read(in); @@ -304,6 +309,38 @@ template struct is_equivalent : std::is_same>>::type, typename normalize>>::type> { }; +template +struct is_equivalent, U> : is_equivalent { +}; + +template +struct is_equivalent> : is_equivalent { +}; + +template +struct is_equivalent, std::optional> : is_equivalent { +}; + +template +struct is_equivalent_arity; + +template +struct is_equivalent_arity, std::tuple, false> : std::false_type { +}; + +template +struct is_equivalent_arity, std::tuple, true> { + static constexpr bool value = (is_equivalent::value && ...); +}; + +template +struct is_equivalent, std::tuple> : is_equivalent_arity, std::tuple, sizeof...(T) == sizeof...(U)> { +}; + +template +struct is_equivalent, std::variant> : is_equivalent, std::tuple> { +}; + // gc_clock duration values were serialized as 32-bit prior to 3.1, and // are serialized as 64-bit in 3.1.0. // diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 95531412e1..68fd473e1e 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -28,6 +28,8 @@ #include "schema_registry.hh" #include "database.hh" +#include "service/storage_service.hh" + namespace service::paxos { logging::logger paxos_state::logger("paxos"); @@ -44,11 +46,12 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) { } } -future paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - partition_key key, utils::UUID ballot, clock_type::time_point timeout) { +future paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [ballot, key = std::move(key), schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable { // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no @@ -56,16 +59,42 @@ future paxos_state::prepare_impl(tracing::trace_state_ptr tr_s // tombstone that hides any re-submit). See CASSANDRA-12043 for details. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([ballot, key = std::move(key), tr_state, schema, timeout] (paxos_state state) { + return f.then([&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { // If received ballot is newer that the one we already accepted it has to be accepted as well, // but we will return the previously accepted proposal so that the new coordinator will use it instead of // its own. if (ballot.timestamp() > state._promised_ballot.timestamp()) { logger.debug("Promising ballot {}", ballot); tracing::trace(tr_state, "Promising ballot {}", ballot); - return db::system_keyspace::save_paxos_promise(*schema, key, ballot, timeout).then([state = std::move(state)] { + auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout); + auto f2 = futurize_apply([&] { + return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), + [tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { + return get_local_storage_proxy().get_db().local().query(schema, cmd, + {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, + prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout); + }); + }); + return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) { + auto&& f1 = std::get<0>(t); + auto&& f2 = std::get<1>(t); + if (f1.failed()) { + // Failed to save promise. Nothing we can do but throw. + return make_exception_future(f1.get_exception()); + } + std::optional>, query::result_digest>> data_or_digest; + // Silently ignore any errors querying the current value as the caller is prepared to fall back + // on querying it by itself in case it's missing in the response. + if (!f2.failed()) { + auto&& [result, hit_rate] = f2.get(); + if (only_digest) { + data_or_digest = *result->digest(); + } else { + data_or_digest = std::move(make_foreign(std::move(result))); + } + } return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), - std::move(state._most_recent_commit)))); + std::move(state._most_recent_commit), std::move(data_or_digest)))); }); } else { logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); @@ -86,8 +115,9 @@ future paxos_state::prepare_impl(tracing::trace_state_ptr tr_s } // Invoke prepare on appropriate shard -future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, partition_key key, - utils::UUID ballot, clock_type::time_point timeout) { +future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, + lw_shared_ptr cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { dht::token token = dht::global_partitioner().get_token(*schema, key); unsigned shard = get_local_storage_proxy().get_db().local().shard_of(token); // prepare_impl takes a semaphore corresponding to a key. @@ -95,20 +125,21 @@ future paxos_state::prepare(tracing::trace_state_ptr tr_state, // shards, the key won't be locked, which can lead to an invalid Paxos // consensus and, as a result, invalid CAS outcome. return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state), - gs = global_schema_ptr(schema), token = std::move(token), key = std::move(key), ballot, timeout] (auto& sp) { - return paxos_state::prepare_impl(gt, gs, std::move(token), std::move(key), ballot, timeout); + gs = global_schema_ptr(schema), cmd, token = std::move(token), key = std::move(key), ballot, + only_digest, da, timeout] (auto& sp) { + return paxos_state::prepare_impl(gt, gs, *cmd, token, key, ballot, only_digest, da, timeout); }); } -future paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - proposal proposal, clock_type::time_point timeout) { +future paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const proposal& proposal, clock_type::time_point timeout) { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable { auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) { + return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) { // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { @@ -140,7 +171,7 @@ future paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr s // Make sure the key is locked on the right shard. return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state), gs = global_schema_ptr(schema), token = std::move(token), proposal = std::move(proposal), timeout] (auto& sp) { - return paxos_state::accept_impl(gt, gs, std::move(token), std::move(proposal), timeout); + return paxos_state::accept_impl(gt, gs, token, proposal, timeout); }); } diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index a32db68923..3f4b7390c3 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -42,6 +42,7 @@ #pragma once #include "service/paxos/proposal.hh" #include "log.hh" +#include "digest_algorithm.hh" #include "db/timeout_clock.hh" #include #include "utils/UUID_gen.hh" @@ -79,10 +80,11 @@ class paxos_state { std::optional _accepted_proposal; std::optional _most_recent_commit; - static future prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - partition_key key, utils::UUID ballot, clock_type::time_point timeout); - static future accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - proposal proposal, clock_type::time_point timeout); + static future prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); + static future accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const proposal& proposal, clock_type::time_point timeout); public: static logging::logger logger; @@ -95,7 +97,8 @@ public: , _most_recent_commit(std::move(commit)) {} // Replica RPC endpoint for Paxos "prepare" phase. static future prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, - partition_key key, utils::UUID ballot, clock_type::time_point timeout); + lw_shared_ptr cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "accept" phase. static future accept(tracing::trace_state_ptr tr_state, schema_ptr schema, proposal proposal, clock_type::time_point timeout); diff --git a/service/paxos/prepare_response.hh b/service/paxos/prepare_response.hh index 583352e40d..72bada2c34 100644 --- a/service/paxos/prepare_response.hh +++ b/service/paxos/prepare_response.hh @@ -42,6 +42,7 @@ #pragma once #include +#include "query-result.hh" #include "service/paxos/proposal.hh" namespace service { @@ -59,10 +60,39 @@ struct promise { // table) by the time of this prepare request. Empty on first round or if previous round data // has expired. std::optional most_recent_commit; + // Local query result with digest or just digest, depending on the caller's choice, or none + // if the query failed. It is used to skip a separate query after the prepare phase provided + // results from all PAXOS participants match. + std::optional>, query::result_digest>> data_or_digest; - promise(std::optional accepted_arg, std::optional chosen_arg) + std::optional, query::result_digest>> get_data_or_digest() const { + if (!data_or_digest) { + return std::optional, query::result_digest>>(); + } else if (std::holds_alternative>>(*data_or_digest)) { + return *std::get>>(*data_or_digest); + } else { + return std::get(*data_or_digest); + } + } + + promise(std::optional accepted_arg, std::optional chosen_arg, + std::optional>, query::result_digest>> data_or_digest_arg) : accepted_proposal(std::move(accepted_arg)) - , most_recent_commit(std::move(chosen_arg)) {} + , most_recent_commit(std::move(chosen_arg)) + , data_or_digest(std::move(data_or_digest_arg)) {} + + promise(std::optional accepted_arg, std::optional chosen_arg, + std::optional> data_or_digest_arg) + : accepted_proposal(std::move(accepted_arg)) + , most_recent_commit(std::move(chosen_arg)) { + if (data_or_digest_arg) { + if (std::holds_alternative(*data_or_digest_arg)) { + data_or_digest = make_foreign(make_lw_shared(std::move(std::get(*data_or_digest_arg)))); + } else { + data_or_digest = std::move(std::get(*data_or_digest_arg)); + } + } + } }; // If the prepare request is rejected we return timeUUID of the most recently promised ballot, so diff --git a/service/paxos/prepare_summary.hh b/service/paxos/prepare_summary.hh index 6bcfa7d827..45279accda 100644 --- a/service/paxos/prepare_summary.hh +++ b/service/paxos/prepare_summary.hh @@ -76,6 +76,10 @@ public: // only then proposing the new ballot and new mutation. May be empty if there is no such cell for // this key in the paxos table on any of the replicas. std::optional most_recent_proposal; + // Value of the requested key received from participating replicas during the prepare phase. + // May be none in case data hadn't been received before a consensus was reached or digests + // received from different replicas didn't match. + foreign_ptr> data; public: prepare_summary(size_t node_count); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 07eb288970..cbbc388504 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -106,6 +106,13 @@ distributed _the_storage_proxy; using namespace exceptions; using fbu = utils::fb_utilities; +static inline +query::digest_algorithm digest_algorithm() { + return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm() + ? query::digest_algorithm::xxHash + : query::digest_algorithm::MD5; +} + static inline const dht::token& start_token(const dht::partition_range& r) { static const dht::token min_token = dht::minimum_token(); @@ -693,9 +700,9 @@ static future<> sleep_approx_50ms() { return seastar::sleep(std::chrono::milliseconds(dist(re))); } -static future> sleep_and_restart() { +static future> sleep_and_restart() { return sleep_approx_50ms().then([] { - return std::optional(); // continue + return std::optional(); // continue }); } @@ -705,13 +712,14 @@ static future> sleep_and_restart() { * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the most recent commit. Otherwise, return null. */ -future paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) { +future +paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) { _proxy->get_db().local().get_config().check_experimental("Paxos"); return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write] (api::timestamp_type& min_timestamp_micros_to_use, shared_ptr& prh) { return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] { if (storage_proxy::clock_type::now() > _cas_timeout) { - return make_exception_future>( + return make_exception_future>( mutation_write_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl_for_paxos, 0, _required_participants, db::write_type::CAS) ); @@ -763,11 +771,11 @@ future paxos_response_handler::begin_and_repair_paxos(client_state& return accept_proposal(refreshed_in_progress, false).then([this, &contentions, &refreshed_in_progress] (bool is_accepted) mutable { if (is_accepted) { return learn_decision(std::move(refreshed_in_progress), false).then([] { - return make_ready_future>(std::optional()); + return make_ready_future>(std::optional()); }).handle_exception_type([] (mutation_write_timeout_exception& e) { e.type = db::write_type::CAS; // we're still doing preparation for the paxos rounds, so we want to use the CAS (see cASSANDRA-8672) - return make_exception_future>(std::move(e)); + return make_exception_future>(std::move(e)); }); } else { paxos::paxos_state::logger.debug("CAS[{}] Some replicas have already promised a higher ballot than ours; aborting", _id); @@ -808,11 +816,11 @@ future paxos_response_handler::begin_and_repair_paxos(client_state& } else { f.ignore_ready_future(); } - return std::optional(); // continue + return std::optional(); // continue }); } - return make_ready_future>(ballot); + return make_ready_future>(ballot_and_data{ballot, std::move(summary.data)}); }); }); }); @@ -825,6 +833,10 @@ template struct dependent_false : std::false_type {}; future paxos_response_handler::prepare_ballot(utils::UUID ballot) { struct { size_t errors = 0; + // Whether the value of the requested key received from participating replicas match. + bool digests_match = true; + // Digest corresponding to the value of the requested key received from participating replicas. + std::optional digest; // the promise can be set before all replies are received at which point // the optional will be disengaged so further replies are ignored std::optional> p = promise(); @@ -846,11 +858,15 @@ future paxos_response_handler::prepare_ballot(utils::UUI paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: sending ballot {} to {}", _id, ballot, _live_endpoints); return parallel_for_each(_live_endpoints, [this, &summary, ballot, &request_tracker] (gms::inet_address peer) mutable { return futurize_apply([&] { + // To generate less network traffic, only the closest replica (first one in the list of participants) + // sends query result content while other replicas send digests needed to check consistency. + bool only_digest = peer != _live_endpoints[0]; + auto da = digest_algorithm(); if (fbu::is_me(peer)) { - return paxos::paxos_state::prepare(tr_state, _schema, _key.key(), ballot, _timeout); + return paxos::paxos_state::prepare(tr_state, _schema, _cmd, _key.key(), ballot, only_digest, da, _timeout); } else { netw::messaging_service& ms = netw::get_local_messaging_service(); - return ms.send_paxos_prepare(peer, _timeout, _schema->version(), _key.key(), ballot, + return ms.send_paxos_prepare(peer, _timeout, *_cmd, _key.key(), ballot, only_digest, da, tracing::make_trace_info(tr_state)); } }).then_wrapped([this, &summary, &request_tracker, peer, ballot] @@ -920,6 +936,33 @@ future paxos_response_handler::prepare_ballot(utils::UUI } } + // Check if the query result attached to the promise matches query results received from other participants. + if (request_tracker.digests_match) { + if (response.data_or_digest) { + foreign_ptr> data; + if (std::holds_alternative>>(*response.data_or_digest)) { + data = std::move(std::get>>(*response.data_or_digest)); + } + auto& digest = data ? data->digest() : std::get(*response.data_or_digest); + if (request_tracker.digest) { + if (*request_tracker.digest != digest) { + request_tracker.digests_match = false; + } + } else { + request_tracker.digest = digest; + } + if (request_tracker.digests_match && !summary.data && data) { + summary.data = std::move(data); + } + } else { + request_tracker.digests_match = false; + } + if (!request_tracker.digests_match) { + request_tracker.digest.reset(); + summary.data.reset(); + } + } + if (summary.committed_ballots_by_replica.size() == _required_participants) { // got all replies paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: got enough replies to proceed", _id); request_tracker.set_value(std::move(summary)); @@ -1905,6 +1948,13 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token & cl_for_paxos, participants + 1, live_endpoints.size()); } + // Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key. + // If the values received from different replicas match, we skip a separate query stage thus saving + // one network round trip. To generate less traffic, only closest replicas send data, others send + // digests that are used to check consistency. For this optimization to work, we need to sort the + // list of participants by proximity to this instance. + sort_endpoints_by_proximity(live_endpoints); + return paxos_participants{std::move(live_endpoints), required_participants}; } @@ -2943,12 +2993,6 @@ public: } }; -query::digest_algorithm digest_algorithm() { - return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm() - ? query::digest_algorithm::xxHash - : query::digest_algorithm::MD5; -} - class abstract_read_executor : public enable_shared_from_this { protected: using targets_iterator = std::vector::iterator; @@ -3894,7 +3938,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s, try { handler = seastar::make_shared( shared_from_this(), query_options.trace_state, query_options.permit, - partition_ranges[0].start()->value().as_decorated_key(), s, cl, cl_for_learn, timeout, cas_timeout); + partition_ranges[0].start()->value().as_decorated_key(), s, cmd, cl, cl_for_learn, timeout, cas_timeout); } catch (exceptions::unavailable_exception& ex) { _stats.cas_read_unavailables.mark(); throw; @@ -3906,7 +3950,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s, lc.start(); return handler->begin_and_repair_paxos(query_options.cstate, contentions, false).then_wrapped([this, s = std::move(s), cmd = std::move(cmd), partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), - cl_for_learn] (future f) mutable { + cl_for_learn] (future f) mutable { if (f.failed()) { try { f.get(); @@ -3919,6 +3963,10 @@ storage_proxy::do_query_with_paxos(schema_ptr s, read_failure_exception(s->ks_name(), s->cf_name(), ex.consistency, ex.received, ex.failures, ex.block_for, false)); } } + auto v = f.get0(); + if (v.data) { + return make_ready_future(storage_proxy::coordinator_query_result(std::move(v.data))); + } return do_query(s, std::move(cmd), std::move(partition_ranges), cl_for_learn, std::move(query_options)); }).then_wrapped([this, lc, &contentions, handler = std::move(handler)] (future f) mutable { _stats.cas_read.mark(lc.stop().latency()); @@ -3988,7 +4036,7 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque handler = seastar::make_shared(shared_from_this(), query_options.trace_state, query_options.permit, partition_ranges[0].start()->value().as_decorated_key(), - schema, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout); + schema, cmd, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout); } catch (exceptions::unavailable_exception& ex) { _stats.cas_write_unavailables.mark(); throw; @@ -4008,13 +4056,16 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque // recent enough. return handler->begin_and_repair_paxos(query_options.cstate, contentions, true) .then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions] - (utils::UUID ballot) mutable { + (paxos_response_handler::ballot_and_data v) mutable { // Read the current values and check they validate the conditions. paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id()); tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); - return query(schema, cmd, std::move(partition_ranges), cl, query_options) - .then([this, handler, schema, cmd, request, ballot, &contentions] (coordinator_query_result&& qr) { - auto mutation = request->apply(*qr.query_result, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); + auto f = v.data ? make_ready_future>>(std::move(v.data)) : + query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) { + return make_ready_future>>(std::move(qr.query_result)); + }); + return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) { + auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); if (!mutation) { paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); tracing::trace(handler->tr_state, "CAS precondition does not match current values"); @@ -4076,14 +4127,18 @@ std::vector storage_proxy::get_live_endpoints(keyspace& ks, c return eps; } -std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) { - auto eps = get_live_endpoints(ks, token); +void storage_proxy::sort_endpoints_by_proximity(std::vector& eps) { locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps); // FIXME: before dynamic snitch is implement put local address (if present) at the beginning auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address()); if (it != eps.end() && it != eps.begin()) { std::iter_swap(it, eps.begin()); } +} + +std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) { + auto eps = get_live_endpoints(ks, token); + sort_endpoints_by_proximity(eps); return eps; } @@ -4522,8 +4577,9 @@ void storage_proxy::init_messaging_service() { }); // Register PAXOS verb handlers - ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, utils::UUID schema_version, - partition_key key, utils::UUID ballot, std::optional trace_info) { + ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, + query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, + std::optional trace_info) { auto src_addr = netw::messaging_service::get_source(cinfo); tracing::trace_state_ptr tr_state; if (trace_info) { @@ -4531,9 +4587,9 @@ void storage_proxy::init_messaging_service() { tracing::begin(tr_state); } - return get_schema_for_read(schema_version, src_addr).then([tr_state = std::move(tr_state), - key = std::move(key), ballot, timeout] (schema_ptr schema) { - return paxos::paxos_state::prepare(tr_state, std::move(schema), std::move(key), ballot, *timeout); + return get_schema_for_read(cmd.schema_version, src_addr).then([cmd = make_lw_shared(std::move(cmd)), + key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) { + return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout); }); }); ms.register_paxos_accept([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal, diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ed122ce1b4..2f6f5080eb 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -301,6 +301,7 @@ private: bool hints_enabled(db::write_type type) noexcept; db::hints::manager& hints_manager_for(db::write_type type); std::vector get_live_endpoints(keyspace& ks, const dht::token& token); + static void sort_endpoints_by_proximity(std::vector& eps); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, @@ -555,6 +556,8 @@ private: shared_ptr _proxy; // The schema for the table the operation works upon. schema_ptr _schema; + // Read command used by this CAS request. + lw_shared_ptr _cmd; // SERIAL or LOCAL SERIAL - influences what endpoints become Paxos protocol participants, // as well as Paxos quorum size. Is either set explicitly in the query or derived from // the value set by SERIAL CONSISTENCY [SERIAL|LOCAL SERIAL] control statement. @@ -587,12 +590,13 @@ public: public: paxos_response_handler(shared_ptr proxy_arg, tracing::trace_state_ptr tr_state_arg, service_permit permit_arg, - dht::decorated_key key_arg, schema_ptr schema_arg, + dht::decorated_key key_arg, schema_ptr schema_arg, lw_shared_ptr cmd_arg, db::consistency_level cl_for_paxos_arg, db::consistency_level cl_for_learn_arg, storage_proxy::clock_type::time_point timeout_arg, storage_proxy::clock_type::time_point cas_timeout_arg) : _proxy(proxy_arg) , _schema(std::move(schema_arg)) + , _cmd(cmd_arg) , _cl_for_paxos(cl_for_paxos_arg) , _cl_for_learn(cl_for_learn_arg) , _timeout(timeout_arg) @@ -605,8 +609,16 @@ public: _required_participants = pp.required_participants; } + // Result of PREPARE step, i.e. begin_and_repair_paxos(). + struct ballot_and_data { + // Accepted ballot. + utils::UUID ballot; + // Current value of the requested key or none. + foreign_ptr> data; + }; + // Steps of the Paxos protocol - future begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write); + future begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write); future prepare_ballot(utils::UUID ballot); future accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true); future<> learn_decision(paxos::proposal decision, bool allow_hints = false);