From 1452653fbced79e5dead4f74996ebb0610eb6141 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Fri, 22 Nov 2019 16:38:59 +0300 Subject: [PATCH 1/6] query_context: fix use after free of timeout_config in execute_cql_with_timeout timeout_config is used by reference by cql3::query_processor::process(), see cql3::query_options, so the caller must make sure it doesn't go away. --- db/query_context.hh | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/db/query_context.hh b/db/query_context.hh index 64e4ae679f..e43321af77 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -58,13 +58,13 @@ struct query_context { // let the `storage_proxy` time out the query down the call chain db::timeout_clock::duration::zero(); - const timeout_config tcfg{d, d, d, d, d, d, d}; - - return _qp.local().process(req, - cql3::query_options::DEFAULT.get_consistency(), - tcfg, - { data_value(std::forward(args))... }, - true); + return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) { + return _qp.local().process(req, + cql3::query_options::DEFAULT.get_consistency(), + tcfg, + { data_value(std::forward(args))... }, + true); + }); } database& db() { From f5c2a231184c06345d39f1eddc9a67bd7ef3c603 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Wed, 20 Nov 2019 13:41:55 +0300 Subject: [PATCH 2/6] serializer: add reference_wrapper handling Serialize reference_wrapper as T and make sure is_equivalent<> treats reference_wrapper wrapped in std::optional<> or std::variant<>, or std::tuple<> as T. We need it to avoid copying query::result while serializing paxos::promise. --- serializer.hh | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/serializer.hh b/serializer.hh index f5c8020a63..d31260562b 100644 --- a/serializer.hh +++ b/serializer.hh @@ -232,6 +232,11 @@ inline void serialize(Output& out, const T& v) { serializer::write(out, v); }; +template +inline void serialize(Output& out, const std::reference_wrapper v) { + serializer::write(out, v.get()); +} + template inline auto deserialize(Input& in, boost::type t) { return serializer::read(in); @@ -304,6 +309,38 @@ template struct is_equivalent : std::is_same>>::type, typename normalize>>::type> { }; +template +struct is_equivalent, U> : is_equivalent { +}; + +template +struct is_equivalent> : is_equivalent { +}; + +template +struct is_equivalent, std::optional> : is_equivalent { +}; + +template +struct is_equivalent_arity; + +template +struct is_equivalent_arity, std::tuple, false> : std::false_type { +}; + +template +struct is_equivalent_arity, std::tuple, true> { + static constexpr bool value = (is_equivalent::value && ...); +}; + +template +struct is_equivalent, std::tuple> : is_equivalent_arity, std::tuple, sizeof...(T) == sizeof...(U)> { +}; + +template +struct is_equivalent, std::variant> : is_equivalent, std::tuple> { +}; + // gc_clock duration values were serialized as 32-bit prior to 3.1, and // are serialized as 64-bit in 3.1.0. // From 63d4590336e9d6c1a842454516e028b6df59db01 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Fri, 15 Nov 2019 17:32:01 +0300 Subject: [PATCH 3/6] storage_proxy: move digest_algorithm upper We need it for PAXOS. Mark it as static inline while we are at it. --- service/storage_proxy.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 07eb288970..2e51eed69a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -106,6 +106,13 @@ distributed _the_storage_proxy; using namespace exceptions; using fbu = utils::fb_utilities; +static inline +query::digest_algorithm digest_algorithm() { + return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm() + ? query::digest_algorithm::xxHash + : query::digest_algorithm::MD5; +} + static inline const dht::token& start_token(const dht::partition_range& r) { static const dht::token min_token = dht::minimum_token(); @@ -2943,12 +2950,6 @@ public: } }; -query::digest_algorithm digest_algorithm() { - return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm() - ? query::digest_algorithm::xxHash - : query::digest_algorithm::MD5; -} - class abstract_read_executor : public enable_shared_from_this { protected: using targets_iterator = std::vector::iterator; From ef2e96c47c22012bc2b1120895d6c37fd6ee900b Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Mon, 18 Nov 2019 13:10:01 +0300 Subject: [PATCH 4/6] storage_proxy: factor out helper to sort endpoints by proximity We need it for PAXOS. --- service/storage_proxy.cc | 8 ++++++-- service/storage_proxy.hh | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2e51eed69a..052056716b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4077,14 +4077,18 @@ std::vector storage_proxy::get_live_endpoints(keyspace& ks, c return eps; } -std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) { - auto eps = get_live_endpoints(ks, token); +void storage_proxy::sort_endpoints_by_proximity(std::vector& eps) { locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps); // FIXME: before dynamic snitch is implement put local address (if present) at the beginning auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address()); if (it != eps.end() && it != eps.begin()) { std::iter_swap(it, eps.begin()); } +} + +std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) { + auto eps = get_live_endpoints(ks, token); + sort_endpoints_by_proximity(eps); return eps; } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ed122ce1b4..bcc9721f49 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -301,6 +301,7 @@ private: bool hints_enabled(db::write_type type) noexcept; db::hints::manager& hints_manager_for(db::write_type type); std::vector get_live_endpoints(keyspace& ks, const dht::token& token); + static void sort_endpoints_by_proximity(std::vector& eps); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, From 3d1d4b018f22264be0f67212fa61b0ba20467ffe Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Mon, 18 Nov 2019 10:11:31 +0300 Subject: [PATCH 5/6] paxos: remove unnecessary move constructor invocations invoke_on() guarantees that captures object won't be destroyed until the future returned by the invoked function is resolved so there's no need to move key, token, proposal for calling paxos_state::*_impl helpers. --- db/system_keyspace.cc | 4 ++-- db/system_keyspace.hh | 2 +- message/messaging_service.cc | 2 +- message/messaging_service.hh | 2 +- service/paxos/paxos_state.cc | 20 ++++++++++---------- service/paxos/paxos_state.hh | 8 ++++---- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index ad7136f737..4ba1f7530c 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2062,13 +2062,13 @@ future> load_view_build_progress() { }); } -future load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now, +future load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout) { static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS); // FIXME: we need execute_cql_with_now() (void)now; auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id()); - return f.then([key = std::move(key), s] (shared_ptr results) mutable { + return f.then([s] (shared_ptr results) mutable { if (results->empty()) { return service::paxos::paxos_state(); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 399ae9e8ea..0dc17fff66 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -615,7 +615,7 @@ future> load_built_views(); future> load_view_build_progress(); // Paxos related functions -future load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now, +future load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout); future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 4828891c18..2a88ff3492 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1239,7 +1239,7 @@ void messaging_service::unregister_paxos_prepare() { } future messaging_service::send_paxos_prepare(gms::inet_address peer, clock_type::time_point timeout, - utils::UUID schema_version, partition_key key, utils::UUID ballot, std::optional trace_info) { + utils::UUID schema_version, const partition_key& key, utils::UUID ballot, std::optional trace_info) { return send_message_timeout(this, messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, schema_version, key, ballot, std::move(trace_info)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 7828c0bd48..08c56cfb08 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -469,7 +469,7 @@ public: future send_paxos_prepare( gms::inet_address peer, clock_type::time_point timeout, utils::UUID schema_version, - partition_key key, utils::UUID ballot, std::optional trace_info); + const partition_key& key, utils::UUID ballot, std::optional trace_info); void register_paxos_accept(std::function(const rpc::client_info&, rpc::opt_time_point, service::paxos::proposal proposal, std::optional)>&& func); diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 95531412e1..78fef73627 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -44,11 +44,11 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) { } } -future paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - partition_key key, utils::UUID ballot, clock_type::time_point timeout) { +future paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout) { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [ballot, key = std::move(key), schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&key, ballot, schema, tr_state, timeout] () mutable { // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no @@ -56,7 +56,7 @@ future paxos_state::prepare_impl(tracing::trace_state_ptr tr_s // tombstone that hides any re-submit). See CASSANDRA-12043 for details. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([ballot, key = std::move(key), tr_state, schema, timeout] (paxos_state state) { + return f.then([&key, ballot, tr_state, schema, timeout] (paxos_state state) { // If received ballot is newer that the one we already accepted it has to be accepted as well, // but we will return the previously accepted proposal so that the new coordinator will use it instead of // its own. @@ -96,19 +96,19 @@ future paxos_state::prepare(tracing::trace_state_ptr tr_state, // consensus and, as a result, invalid CAS outcome. return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state), gs = global_schema_ptr(schema), token = std::move(token), key = std::move(key), ballot, timeout] (auto& sp) { - return paxos_state::prepare_impl(gt, gs, std::move(token), std::move(key), ballot, timeout); + return paxos_state::prepare_impl(gt, gs, token, key, ballot, timeout); }); } -future paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - proposal proposal, clock_type::time_point timeout) { +future paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const proposal& proposal, clock_type::time_point timeout) { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable { auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) { + return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) { // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { @@ -140,7 +140,7 @@ future paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr s // Make sure the key is locked on the right shard. return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state), gs = global_schema_ptr(schema), token = std::move(token), proposal = std::move(proposal), timeout] (auto& sp) { - return paxos_state::accept_impl(gt, gs, std::move(token), std::move(proposal), timeout); + return paxos_state::accept_impl(gt, gs, token, proposal, timeout); }); } diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index a32db68923..ca193adf03 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -79,10 +79,10 @@ class paxos_state { std::optional _accepted_proposal; std::optional _most_recent_commit; - static future prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - partition_key key, utils::UUID ballot, clock_type::time_point timeout); - static future accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, - proposal proposal, clock_type::time_point timeout); + static future prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout); + static future accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, + const dht::token& token, const proposal& proposal, clock_type::time_point timeout); public: static logging::logger logger; From bf5f864d80f18a2839fd0df0ce2534910eeaa1e7 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Fri, 15 Nov 2019 10:53:56 +0300 Subject: [PATCH 6/6] paxos: piggyback result query on prepare response Current LWT implementation uses at least three network round trips: - first, execute PAXOS prepare phase - second, query the current value of the updated key - third, propose the change to participating replicas (there's also learn phase, but we don't wait for it to complete). The idea behind the optimization implemented by this patch is simple: piggyback the current value of the updated key on the prepare response to eliminate one round trip. To generate less network traffic, only the closest to the coordinator replica sends data while other participating replicas send digests which are used to check data consistency. Note, this patch changes the API of some RPC calls used by PAXOS, but this should be okay as long as the feature in the early development stage and marked experimental. To assess the impact of this optimization on LWT performance, I ran a simple benchmark that starts a number of concurrent clients each of which updates its own key (uncontended case) stored in a cluster of three AWS i3.2xlarge nodes located in the same region (us-west-1) and measures the aggregate bandwidth and latency. The test uses shard-aware gocql driver. Here are the results: latency 99% (ms) bandwidth (rq/s) timeouts (rq/s) clients before after before after before after 1 2 2 626 637 0 0 5 4 3 2616 2843 0 0 10 3 3 4493 4767 0 0 50 7 7 10567 10833 0 0 100 15 15 12265 12934 0 0 200 48 30 13593 14317 0 0 400 185 60 14796 15549 0 0 600 290 94 14416 15669 0 0 800 568 118 14077 15820 2 0 1000 710 118 13088 15830 9 0 2000 1388 232 13342 15658 85 0 3000 1110 363 13282 15422 233 0 4000 1735 454 13387 15385 329 0 That is, this optimization improves max LWT bandwidth by about 15% and allows to run 3-4x more clients while maintaining the same level of system responsiveness. --- idl/paxos.idl.hh | 1 + message/messaging_service.cc | 9 +-- message/messaging_service.hh | 9 +-- service/paxos/paxos_state.cc | 49 +++++++++++++--- service/paxos/paxos_state.hh | 7 ++- service/paxos/prepare_response.hh | 34 ++++++++++- service/paxos/prepare_summary.hh | 4 ++ service/storage_proxy.cc | 95 ++++++++++++++++++++++++------- service/storage_proxy.hh | 15 ++++- 9 files changed, 178 insertions(+), 45 deletions(-) diff --git a/idl/paxos.idl.hh b/idl/paxos.idl.hh index 0318857ca5..238b7c2041 100644 --- a/idl/paxos.idl.hh +++ b/idl/paxos.idl.hh @@ -30,6 +30,7 @@ class proposal { class promise { std::optional accepted_proposal; std::optional most_recent_commit; + std::optional> get_data_or_digest(); }; } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 2a88ff3492..d48d4619d9 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1230,8 +1230,8 @@ future> messaging_service::send_rep void messaging_service::register_paxos_prepare(std::function( - const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot, - std::optional)>&& func) { + const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional)>&& func) { register_handler(this, messaging_verb::PAXOS_PREPARE, std::move(func)); } void messaging_service::unregister_paxos_prepare() { @@ -1239,9 +1239,10 @@ void messaging_service::unregister_paxos_prepare() { } future messaging_service::send_paxos_prepare(gms::inet_address peer, clock_type::time_point timeout, - utils::UUID schema_version, const partition_key& key, utils::UUID ballot, std::optional trace_info) { + const query::read_command& cmd, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional trace_info) { return send_message_timeout(this, - messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, schema_version, key, ballot, std::move(trace_info)); + messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, cmd, key, ballot, only_digest, da, std::move(trace_info)); } void messaging_service::register_paxos_accept(std::function( diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 08c56cfb08..6df9c577ac 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -462,14 +462,15 @@ public: // Wrappers for PAXOS verbs void register_paxos_prepare(std::function( - const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot, - std::optional)>&& func); + const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, std::optional)>&& func); void unregister_paxos_prepare(); future send_paxos_prepare( - gms::inet_address peer, clock_type::time_point timeout, utils::UUID schema_version, - const partition_key& key, utils::UUID ballot, std::optional trace_info); + gms::inet_address peer, clock_type::time_point timeout, const query::read_command& cmd, + const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, + std::optional trace_info); void register_paxos_accept(std::function(const rpc::client_info&, rpc::opt_time_point, service::paxos::proposal proposal, std::optional)>&& func); diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 78fef73627..68fd473e1e 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -28,6 +28,8 @@ #include "schema_registry.hh" #include "database.hh" +#include "service/storage_service.hh" + namespace service::paxos { logging::logger paxos_state::logger("paxos"); @@ -45,10 +47,11 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) { } future paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, - const dht::token& token, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout) { + const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [&key, ballot, schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable { // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no @@ -56,16 +59,42 @@ future paxos_state::prepare_impl(tracing::trace_state_ptr tr_s // tombstone that hides any re-submit). See CASSANDRA-12043 for details. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([&key, ballot, tr_state, schema, timeout] (paxos_state state) { + return f.then([&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { // If received ballot is newer that the one we already accepted it has to be accepted as well, // but we will return the previously accepted proposal so that the new coordinator will use it instead of // its own. if (ballot.timestamp() > state._promised_ballot.timestamp()) { logger.debug("Promising ballot {}", ballot); tracing::trace(tr_state, "Promising ballot {}", ballot); - return db::system_keyspace::save_paxos_promise(*schema, key, ballot, timeout).then([state = std::move(state)] { + auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout); + auto f2 = futurize_apply([&] { + return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), + [tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { + return get_local_storage_proxy().get_db().local().query(schema, cmd, + {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da}, + prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout); + }); + }); + return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) { + auto&& f1 = std::get<0>(t); + auto&& f2 = std::get<1>(t); + if (f1.failed()) { + // Failed to save promise. Nothing we can do but throw. + return make_exception_future(f1.get_exception()); + } + std::optional>, query::result_digest>> data_or_digest; + // Silently ignore any errors querying the current value as the caller is prepared to fall back + // on querying it by itself in case it's missing in the response. + if (!f2.failed()) { + auto&& [result, hit_rate] = f2.get(); + if (only_digest) { + data_or_digest = *result->digest(); + } else { + data_or_digest = std::move(make_foreign(std::move(result))); + } + } return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), - std::move(state._most_recent_commit)))); + std::move(state._most_recent_commit), std::move(data_or_digest)))); }); } else { logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); @@ -86,8 +115,9 @@ future paxos_state::prepare_impl(tracing::trace_state_ptr tr_s } // Invoke prepare on appropriate shard -future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, partition_key key, - utils::UUID ballot, clock_type::time_point timeout) { +future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, + lw_shared_ptr cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { dht::token token = dht::global_partitioner().get_token(*schema, key); unsigned shard = get_local_storage_proxy().get_db().local().shard_of(token); // prepare_impl takes a semaphore corresponding to a key. @@ -95,8 +125,9 @@ future paxos_state::prepare(tracing::trace_state_ptr tr_state, // shards, the key won't be locked, which can lead to an invalid Paxos // consensus and, as a result, invalid CAS outcome. return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state), - gs = global_schema_ptr(schema), token = std::move(token), key = std::move(key), ballot, timeout] (auto& sp) { - return paxos_state::prepare_impl(gt, gs, token, key, ballot, timeout); + gs = global_schema_ptr(schema), cmd, token = std::move(token), key = std::move(key), ballot, + only_digest, da, timeout] (auto& sp) { + return paxos_state::prepare_impl(gt, gs, *cmd, token, key, ballot, only_digest, da, timeout); }); } diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index ca193adf03..3f4b7390c3 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -42,6 +42,7 @@ #pragma once #include "service/paxos/proposal.hh" #include "log.hh" +#include "digest_algorithm.hh" #include "db/timeout_clock.hh" #include #include "utils/UUID_gen.hh" @@ -80,7 +81,8 @@ class paxos_state { std::optional _most_recent_commit; static future prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, - const dht::token& token, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout); + const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); static future accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, const dht::token& token, const proposal& proposal, clock_type::time_point timeout); @@ -95,7 +97,8 @@ public: , _most_recent_commit(std::move(commit)) {} // Replica RPC endpoint for Paxos "prepare" phase. static future prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, - partition_key key, utils::UUID ballot, clock_type::time_point timeout); + lw_shared_ptr cmd, partition_key key, utils::UUID ballot, + bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "accept" phase. static future accept(tracing::trace_state_ptr tr_state, schema_ptr schema, proposal proposal, clock_type::time_point timeout); diff --git a/service/paxos/prepare_response.hh b/service/paxos/prepare_response.hh index 583352e40d..72bada2c34 100644 --- a/service/paxos/prepare_response.hh +++ b/service/paxos/prepare_response.hh @@ -42,6 +42,7 @@ #pragma once #include +#include "query-result.hh" #include "service/paxos/proposal.hh" namespace service { @@ -59,10 +60,39 @@ struct promise { // table) by the time of this prepare request. Empty on first round or if previous round data // has expired. std::optional most_recent_commit; + // Local query result with digest or just digest, depending on the caller's choice, or none + // if the query failed. It is used to skip a separate query after the prepare phase provided + // results from all PAXOS participants match. + std::optional>, query::result_digest>> data_or_digest; - promise(std::optional accepted_arg, std::optional chosen_arg) + std::optional, query::result_digest>> get_data_or_digest() const { + if (!data_or_digest) { + return std::optional, query::result_digest>>(); + } else if (std::holds_alternative>>(*data_or_digest)) { + return *std::get>>(*data_or_digest); + } else { + return std::get(*data_or_digest); + } + } + + promise(std::optional accepted_arg, std::optional chosen_arg, + std::optional>, query::result_digest>> data_or_digest_arg) : accepted_proposal(std::move(accepted_arg)) - , most_recent_commit(std::move(chosen_arg)) {} + , most_recent_commit(std::move(chosen_arg)) + , data_or_digest(std::move(data_or_digest_arg)) {} + + promise(std::optional accepted_arg, std::optional chosen_arg, + std::optional> data_or_digest_arg) + : accepted_proposal(std::move(accepted_arg)) + , most_recent_commit(std::move(chosen_arg)) { + if (data_or_digest_arg) { + if (std::holds_alternative(*data_or_digest_arg)) { + data_or_digest = make_foreign(make_lw_shared(std::move(std::get(*data_or_digest_arg)))); + } else { + data_or_digest = std::move(std::get(*data_or_digest_arg)); + } + } + } }; // If the prepare request is rejected we return timeUUID of the most recently promised ballot, so diff --git a/service/paxos/prepare_summary.hh b/service/paxos/prepare_summary.hh index 6bcfa7d827..45279accda 100644 --- a/service/paxos/prepare_summary.hh +++ b/service/paxos/prepare_summary.hh @@ -76,6 +76,10 @@ public: // only then proposing the new ballot and new mutation. May be empty if there is no such cell for // this key in the paxos table on any of the replicas. std::optional most_recent_proposal; + // Value of the requested key received from participating replicas during the prepare phase. + // May be none in case data hadn't been received before a consensus was reached or digests + // received from different replicas didn't match. + foreign_ptr> data; public: prepare_summary(size_t node_count); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 052056716b..cbbc388504 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -700,9 +700,9 @@ static future<> sleep_approx_50ms() { return seastar::sleep(std::chrono::milliseconds(dist(re))); } -static future> sleep_and_restart() { +static future> sleep_and_restart() { return sleep_approx_50ms().then([] { - return std::optional(); // continue + return std::optional(); // continue }); } @@ -712,13 +712,14 @@ static future> sleep_and_restart() { * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the most recent commit. Otherwise, return null. */ -future paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) { +future +paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) { _proxy->get_db().local().get_config().check_experimental("Paxos"); return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write] (api::timestamp_type& min_timestamp_micros_to_use, shared_ptr& prh) { return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] { if (storage_proxy::clock_type::now() > _cas_timeout) { - return make_exception_future>( + return make_exception_future>( mutation_write_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl_for_paxos, 0, _required_participants, db::write_type::CAS) ); @@ -770,11 +771,11 @@ future paxos_response_handler::begin_and_repair_paxos(client_state& return accept_proposal(refreshed_in_progress, false).then([this, &contentions, &refreshed_in_progress] (bool is_accepted) mutable { if (is_accepted) { return learn_decision(std::move(refreshed_in_progress), false).then([] { - return make_ready_future>(std::optional()); + return make_ready_future>(std::optional()); }).handle_exception_type([] (mutation_write_timeout_exception& e) { e.type = db::write_type::CAS; // we're still doing preparation for the paxos rounds, so we want to use the CAS (see cASSANDRA-8672) - return make_exception_future>(std::move(e)); + return make_exception_future>(std::move(e)); }); } else { paxos::paxos_state::logger.debug("CAS[{}] Some replicas have already promised a higher ballot than ours; aborting", _id); @@ -815,11 +816,11 @@ future paxos_response_handler::begin_and_repair_paxos(client_state& } else { f.ignore_ready_future(); } - return std::optional(); // continue + return std::optional(); // continue }); } - return make_ready_future>(ballot); + return make_ready_future>(ballot_and_data{ballot, std::move(summary.data)}); }); }); }); @@ -832,6 +833,10 @@ template struct dependent_false : std::false_type {}; future paxos_response_handler::prepare_ballot(utils::UUID ballot) { struct { size_t errors = 0; + // Whether the value of the requested key received from participating replicas match. + bool digests_match = true; + // Digest corresponding to the value of the requested key received from participating replicas. + std::optional digest; // the promise can be set before all replies are received at which point // the optional will be disengaged so further replies are ignored std::optional> p = promise(); @@ -853,11 +858,15 @@ future paxos_response_handler::prepare_ballot(utils::UUI paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: sending ballot {} to {}", _id, ballot, _live_endpoints); return parallel_for_each(_live_endpoints, [this, &summary, ballot, &request_tracker] (gms::inet_address peer) mutable { return futurize_apply([&] { + // To generate less network traffic, only the closest replica (first one in the list of participants) + // sends query result content while other replicas send digests needed to check consistency. + bool only_digest = peer != _live_endpoints[0]; + auto da = digest_algorithm(); if (fbu::is_me(peer)) { - return paxos::paxos_state::prepare(tr_state, _schema, _key.key(), ballot, _timeout); + return paxos::paxos_state::prepare(tr_state, _schema, _cmd, _key.key(), ballot, only_digest, da, _timeout); } else { netw::messaging_service& ms = netw::get_local_messaging_service(); - return ms.send_paxos_prepare(peer, _timeout, _schema->version(), _key.key(), ballot, + return ms.send_paxos_prepare(peer, _timeout, *_cmd, _key.key(), ballot, only_digest, da, tracing::make_trace_info(tr_state)); } }).then_wrapped([this, &summary, &request_tracker, peer, ballot] @@ -927,6 +936,33 @@ future paxos_response_handler::prepare_ballot(utils::UUI } } + // Check if the query result attached to the promise matches query results received from other participants. + if (request_tracker.digests_match) { + if (response.data_or_digest) { + foreign_ptr> data; + if (std::holds_alternative>>(*response.data_or_digest)) { + data = std::move(std::get>>(*response.data_or_digest)); + } + auto& digest = data ? data->digest() : std::get(*response.data_or_digest); + if (request_tracker.digest) { + if (*request_tracker.digest != digest) { + request_tracker.digests_match = false; + } + } else { + request_tracker.digest = digest; + } + if (request_tracker.digests_match && !summary.data && data) { + summary.data = std::move(data); + } + } else { + request_tracker.digests_match = false; + } + if (!request_tracker.digests_match) { + request_tracker.digest.reset(); + summary.data.reset(); + } + } + if (summary.committed_ballots_by_replica.size() == _required_participants) { // got all replies paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: got enough replies to proceed", _id); request_tracker.set_value(std::move(summary)); @@ -1912,6 +1948,13 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token & cl_for_paxos, participants + 1, live_endpoints.size()); } + // Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key. + // If the values received from different replicas match, we skip a separate query stage thus saving + // one network round trip. To generate less traffic, only closest replicas send data, others send + // digests that are used to check consistency. For this optimization to work, we need to sort the + // list of participants by proximity to this instance. + sort_endpoints_by_proximity(live_endpoints); + return paxos_participants{std::move(live_endpoints), required_participants}; } @@ -3895,7 +3938,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s, try { handler = seastar::make_shared( shared_from_this(), query_options.trace_state, query_options.permit, - partition_ranges[0].start()->value().as_decorated_key(), s, cl, cl_for_learn, timeout, cas_timeout); + partition_ranges[0].start()->value().as_decorated_key(), s, cmd, cl, cl_for_learn, timeout, cas_timeout); } catch (exceptions::unavailable_exception& ex) { _stats.cas_read_unavailables.mark(); throw; @@ -3907,7 +3950,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s, lc.start(); return handler->begin_and_repair_paxos(query_options.cstate, contentions, false).then_wrapped([this, s = std::move(s), cmd = std::move(cmd), partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), - cl_for_learn] (future f) mutable { + cl_for_learn] (future f) mutable { if (f.failed()) { try { f.get(); @@ -3920,6 +3963,10 @@ storage_proxy::do_query_with_paxos(schema_ptr s, read_failure_exception(s->ks_name(), s->cf_name(), ex.consistency, ex.received, ex.failures, ex.block_for, false)); } } + auto v = f.get0(); + if (v.data) { + return make_ready_future(storage_proxy::coordinator_query_result(std::move(v.data))); + } return do_query(s, std::move(cmd), std::move(partition_ranges), cl_for_learn, std::move(query_options)); }).then_wrapped([this, lc, &contentions, handler = std::move(handler)] (future f) mutable { _stats.cas_read.mark(lc.stop().latency()); @@ -3989,7 +4036,7 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque handler = seastar::make_shared(shared_from_this(), query_options.trace_state, query_options.permit, partition_ranges[0].start()->value().as_decorated_key(), - schema, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout); + schema, cmd, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout); } catch (exceptions::unavailable_exception& ex) { _stats.cas_write_unavailables.mark(); throw; @@ -4009,13 +4056,16 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque // recent enough. return handler->begin_and_repair_paxos(query_options.cstate, contentions, true) .then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions] - (utils::UUID ballot) mutable { + (paxos_response_handler::ballot_and_data v) mutable { // Read the current values and check they validate the conditions. paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id()); tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); - return query(schema, cmd, std::move(partition_ranges), cl, query_options) - .then([this, handler, schema, cmd, request, ballot, &contentions] (coordinator_query_result&& qr) { - auto mutation = request->apply(*qr.query_result, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); + auto f = v.data ? make_ready_future>>(std::move(v.data)) : + query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) { + return make_ready_future>>(std::move(qr.query_result)); + }); + return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) { + auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); if (!mutation) { paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); tracing::trace(handler->tr_state, "CAS precondition does not match current values"); @@ -4527,8 +4577,9 @@ void storage_proxy::init_messaging_service() { }); // Register PAXOS verb handlers - ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, utils::UUID schema_version, - partition_key key, utils::UUID ballot, std::optional trace_info) { + ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, + query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, + std::optional trace_info) { auto src_addr = netw::messaging_service::get_source(cinfo); tracing::trace_state_ptr tr_state; if (trace_info) { @@ -4536,9 +4587,9 @@ void storage_proxy::init_messaging_service() { tracing::begin(tr_state); } - return get_schema_for_read(schema_version, src_addr).then([tr_state = std::move(tr_state), - key = std::move(key), ballot, timeout] (schema_ptr schema) { - return paxos::paxos_state::prepare(tr_state, std::move(schema), std::move(key), ballot, *timeout); + return get_schema_for_read(cmd.schema_version, src_addr).then([cmd = make_lw_shared(std::move(cmd)), + key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) { + return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout); }); }); ms.register_paxos_accept([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal, diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index bcc9721f49..2f6f5080eb 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -556,6 +556,8 @@ private: shared_ptr _proxy; // The schema for the table the operation works upon. schema_ptr _schema; + // Read command used by this CAS request. + lw_shared_ptr _cmd; // SERIAL or LOCAL SERIAL - influences what endpoints become Paxos protocol participants, // as well as Paxos quorum size. Is either set explicitly in the query or derived from // the value set by SERIAL CONSISTENCY [SERIAL|LOCAL SERIAL] control statement. @@ -588,12 +590,13 @@ public: public: paxos_response_handler(shared_ptr proxy_arg, tracing::trace_state_ptr tr_state_arg, service_permit permit_arg, - dht::decorated_key key_arg, schema_ptr schema_arg, + dht::decorated_key key_arg, schema_ptr schema_arg, lw_shared_ptr cmd_arg, db::consistency_level cl_for_paxos_arg, db::consistency_level cl_for_learn_arg, storage_proxy::clock_type::time_point timeout_arg, storage_proxy::clock_type::time_point cas_timeout_arg) : _proxy(proxy_arg) , _schema(std::move(schema_arg)) + , _cmd(cmd_arg) , _cl_for_paxos(cl_for_paxos_arg) , _cl_for_learn(cl_for_learn_arg) , _timeout(timeout_arg) @@ -606,8 +609,16 @@ public: _required_participants = pp.required_participants; } + // Result of PREPARE step, i.e. begin_and_repair_paxos(). + struct ballot_and_data { + // Accepted ballot. + utils::UUID ballot; + // Current value of the requested key or none. + foreign_ptr> data; + }; + // Steps of the Paxos protocol - future begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write); + future begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write); future prepare_ballot(utils::UUID ballot); future accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true); future<> learn_decision(paxos::proposal decision, bool allow_hints = false);