Merge "Optimize LWT query phase" from Vladimir Davydov

This patch implements a simple optimization for LWT: it makes PAXOS
prepare phase query locally and return the current value of the modified
key so that a separate query is not necessary. For more details see
patch 6. Patch 1 fixes a bug in next. Patches 2-5 contain trivial
preparatory refactoring.
This commit is contained in:
Tomasz Grabiec
2019-11-22 15:43:24 +01:00
committed by Avi Kivity
13 changed files with 248 additions and 72 deletions

View File

@@ -58,13 +58,13 @@ struct query_context {
// let the `storage_proxy` time out the query down the call chain
db::timeout_clock::duration::zero();
const timeout_config tcfg{d, d, d, d, d, d, d};
return _qp.local().process(req,
cql3::query_options::DEFAULT.get_consistency(),
tcfg,
{ data_value(std::forward<Args>(args))... },
true);
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) {
return _qp.local().process(req,
cql3::query_options::DEFAULT.get_consistency(),
tcfg,
{ data_value(std::forward<Args>(args))... },
true);
});
}
database& db() {

View File

@@ -2062,13 +2062,13 @@ future<std::vector<view_build_progress>> load_view_build_progress() {
});
}
future<service::paxos::paxos_state> load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now,
future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout) {
static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS);
// FIXME: we need execute_cql_with_now()
(void)now;
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
return f.then([key = std::move(key), s] (shared_ptr<cql3::untyped_result_set> results) mutable {
return f.then([s] (shared_ptr<cql3::untyped_result_set> results) mutable {
if (results->empty()) {
return service::paxos::paxos_state();
}

View File

@@ -615,7 +615,7 @@ future<std::vector<view_name>> load_built_views();
future<std::vector<view_build_progress>> load_view_build_progress();
// Paxos related functions
future<service::paxos::paxos_state> load_paxos_state(partition_key key, schema_ptr s, gc_clock::time_point now,
future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout);
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);

View File

@@ -30,6 +30,7 @@ class proposal {
class promise {
std::optional<service::paxos::proposal> accepted_proposal;
std::optional<service::paxos::proposal> most_recent_commit;
std::optional<std::variant<query::result, query::result_digest>> get_data_or_digest();
};
}

View File

@@ -1230,8 +1230,8 @@ future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_rep
void
messaging_service::register_paxos_prepare(std::function<future<service::paxos::prepare_response>(
const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot,
std::optional<tracing::trace_info>)>&& func) {
const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info>)>&& func) {
register_handler(this, messaging_verb::PAXOS_PREPARE, std::move(func));
}
void messaging_service::unregister_paxos_prepare() {
@@ -1239,9 +1239,10 @@ void messaging_service::unregister_paxos_prepare() {
}
future<service::paxos::prepare_response>
messaging_service::send_paxos_prepare(gms::inet_address peer, clock_type::time_point timeout,
utils::UUID schema_version, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info) {
return send_message_timeout<service::paxos::prepare_response>(this,
messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, schema_version, key, ballot, std::move(trace_info));
messaging_verb::PAXOS_PREPARE, netw::msg_addr(peer), timeout, cmd, key, ballot, only_digest, da, std::move(trace_info));
}
void messaging_service::register_paxos_accept(std::function<future<bool>(

View File

@@ -462,14 +462,15 @@ public:
// Wrappers for PAXOS verbs
void register_paxos_prepare(std::function<future<service::paxos::prepare_response>(
const rpc::client_info&, rpc::opt_time_point, utils::UUID schema_version, partition_key key, utils::UUID ballot,
std::optional<tracing::trace_info>)>&& func);
const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info>)>&& func);
void unregister_paxos_prepare();
future<service::paxos::prepare_response> send_paxos_prepare(
gms::inet_address peer, clock_type::time_point timeout, utils::UUID schema_version,
partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
gms::inet_address peer, clock_type::time_point timeout, const query::read_command& cmd,
const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da,
std::optional<tracing::trace_info> trace_info);
void register_paxos_accept(std::function<future<bool>(const rpc::client_info&, rpc::opt_time_point,
service::paxos::proposal proposal, std::optional<tracing::trace_info>)>&& func);

View File

@@ -232,6 +232,11 @@ inline void serialize(Output& out, const T& v) {
serializer<T>::write(out, v);
};
template<typename T, typename Output>
inline void serialize(Output& out, const std::reference_wrapper<T> v) {
serializer<T>::write(out, v.get());
}
template<typename T, typename Input>
inline auto deserialize(Input& in, boost::type<T> t) {
return serializer<T>::read(in);
@@ -304,6 +309,38 @@ template <typename T, typename U>
struct is_equivalent : std::is_same<typename normalize<std::remove_const_t<std::remove_reference_t<T>>>::type, typename normalize<std::remove_const_t <std::remove_reference_t<U>>>::type> {
};
template <typename T, typename U>
struct is_equivalent<std::reference_wrapper<T>, U> : is_equivalent<T, U> {
};
template <typename T, typename U>
struct is_equivalent<T, std::reference_wrapper<U>> : is_equivalent<T, U> {
};
template <typename T, typename U>
struct is_equivalent<std::optional<T>, std::optional<U>> : is_equivalent<T, U> {
};
template <typename T, typename U, bool>
struct is_equivalent_arity;
template <typename ...T, typename ...U>
struct is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, false> : std::false_type {
};
template <typename ...T, typename ...U>
struct is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, true> {
static constexpr bool value = (is_equivalent<T, U>::value && ...);
};
template <typename ...T, typename ...U>
struct is_equivalent<std::tuple<T...>, std::tuple<U...>> : is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, sizeof...(T) == sizeof...(U)> {
};
template <typename ...T, typename ...U>
struct is_equivalent<std::variant<T...>, std::variant<U...>> : is_equivalent<std::tuple<T...>, std::tuple<U...>> {
};
// gc_clock duration values were serialized as 32-bit prior to 3.1, and
// are serialized as 64-bit in 3.1.0.
//

View File

@@ -28,6 +28,8 @@
#include "schema_registry.hh"
#include "database.hh"
#include "service/storage_service.hh"
namespace service::paxos {
logging::logger paxos_state::logger("paxos");
@@ -44,11 +46,12 @@ void paxos_state::release_semaphore_for_key(const dht::token& key) {
}
}
future<prepare_response> paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token,
partition_key key, utils::UUID ballot, clock_type::time_point timeout) {
future<prepare_response> paxos_state::prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
utils::latency_counter lc;
lc.start();
return with_locked_key(token, timeout, [ballot, key = std::move(key), schema, tr_state, timeout] () mutable {
return with_locked_key(token, timeout, [&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
@@ -56,16 +59,42 @@ future<prepare_response> paxos_state::prepare_impl(tracing::trace_state_ptr tr_s
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([ballot, key = std::move(key), tr_state, schema, timeout] (paxos_state state) {
return f.then([&cmd, &token, &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
// If received ballot is newer that the one we already accepted it has to be accepted as well,
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
// its own.
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Promising ballot {}", ballot);
tracing::trace(tr_state, "Promising ballot {}", ballot);
return db::system_keyspace::save_paxos_promise(*schema, key, ballot, timeout).then([state = std::move(state)] {
auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, key, ballot, timeout);
auto f2 = futurize_apply([&] {
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
[tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
return get_local_storage_proxy().get_db().local().query(schema, cmd,
{only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
prv, tr_state, query::result_memory_limiter::maximum_result_size, timeout);
});
});
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) {
auto&& f1 = std::get<0>(t);
auto&& f2 = std::get<1>(t);
if (f1.failed()) {
// Failed to save promise. Nothing we can do but throw.
return make_exception_future<prepare_response>(f1.get_exception());
}
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
// Silently ignore any errors querying the current value as the caller is prepared to fall back
// on querying it by itself in case it's missing in the response.
if (!f2.failed()) {
auto&& [result, hit_rate] = f2.get();
if (only_digest) {
data_or_digest = *result->digest();
} else {
data_or_digest = std::move(make_foreign(std::move(result)));
}
}
return make_ready_future<prepare_response>(prepare_response(promise(std::move(state._accepted_proposal),
std::move(state._most_recent_commit))));
std::move(state._most_recent_commit), std::move(data_or_digest))));
});
} else {
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
@@ -86,8 +115,9 @@ future<prepare_response> paxos_state::prepare_impl(tracing::trace_state_ptr tr_s
}
// Invoke prepare on appropriate shard
future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, partition_key key,
utils::UUID ballot, clock_type::time_point timeout) {
future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema,
lw_shared_ptr<query::read_command> cmd, partition_key key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
dht::token token = dht::global_partitioner().get_token(*schema, key);
unsigned shard = get_local_storage_proxy().get_db().local().shard_of(token);
// prepare_impl takes a semaphore corresponding to a key.
@@ -95,20 +125,21 @@ future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state,
// shards, the key won't be locked, which can lead to an invalid Paxos
// consensus and, as a result, invalid CAS outcome.
return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state),
gs = global_schema_ptr(schema), token = std::move(token), key = std::move(key), ballot, timeout] (auto& sp) {
return paxos_state::prepare_impl(gt, gs, std::move(token), std::move(key), ballot, timeout);
gs = global_schema_ptr(schema), cmd, token = std::move(token), key = std::move(key), ballot,
only_digest, da, timeout] (auto& sp) {
return paxos_state::prepare_impl(gt, gs, *cmd, token, key, ballot, only_digest, da, timeout);
});
}
future<bool> paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token,
proposal proposal, clock_type::time_point timeout) {
future<bool> paxos_state::accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema,
const dht::token& token, const proposal& proposal, clock_type::time_point timeout) {
utils::latency_counter lc;
lc.start();
return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable {
return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable {
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema,
gc_clock::time_point(now_in_sec), timeout);
return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) {
return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) {
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
@@ -140,7 +171,7 @@ future<bool> paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr s
// Make sure the key is locked on the right shard.
return get_storage_proxy().invoke_on(shard, [gt = tracing::global_trace_state_ptr(tr_state),
gs = global_schema_ptr(schema), token = std::move(token), proposal = std::move(proposal), timeout] (auto& sp) {
return paxos_state::accept_impl(gt, gs, std::move(token), std::move(proposal), timeout);
return paxos_state::accept_impl(gt, gs, token, proposal, timeout);
});
}

View File

@@ -42,6 +42,7 @@
#pragma once
#include "service/paxos/proposal.hh"
#include "log.hh"
#include "digest_algorithm.hh"
#include "db/timeout_clock.hh"
#include <unordered_map>
#include "utils/UUID_gen.hh"
@@ -79,10 +80,11 @@ class paxos_state {
std::optional<proposal> _accepted_proposal;
std::optional<proposal> _most_recent_commit;
static future<prepare_response> prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token,
partition_key key, utils::UUID ballot, clock_type::time_point timeout);
static future<bool> accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token,
proposal proposal, clock_type::time_point timeout);
static future<prepare_response> prepare_impl(tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const dht::token& token, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout);
static future<bool> accept_impl(tracing::trace_state_ptr tr_state, schema_ptr schema,
const dht::token& token, const proposal& proposal, clock_type::time_point timeout);
public:
static logging::logger logger;
@@ -95,7 +97,8 @@ public:
, _most_recent_commit(std::move(commit)) {}
// Replica RPC endpoint for Paxos "prepare" phase.
static future<prepare_response> prepare(tracing::trace_state_ptr tr_state, schema_ptr schema,
partition_key key, utils::UUID ballot, clock_type::time_point timeout);
lw_shared_ptr<query::read_command> cmd, partition_key key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "accept" phase.
static future<bool> accept(tracing::trace_state_ptr tr_state, schema_ptr schema, proposal proposal,
clock_type::time_point timeout);

View File

@@ -42,6 +42,7 @@
#pragma once
#include <variant>
#include "query-result.hh"
#include "service/paxos/proposal.hh"
namespace service {
@@ -59,10 +60,39 @@ struct promise {
// table) by the time of this prepare request. Empty on first round or if previous round data
// has expired.
std::optional<proposal> most_recent_commit;
// Local query result with digest or just digest, depending on the caller's choice, or none
// if the query failed. It is used to skip a separate query after the prepare phase provided
// results from all PAXOS participants match.
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
promise(std::optional<proposal> accepted_arg, std::optional<proposal> chosen_arg)
std::optional<std::variant<std::reference_wrapper<query::result>, query::result_digest>> get_data_or_digest() const {
if (!data_or_digest) {
return std::optional<std::variant<std::reference_wrapper<query::result>, query::result_digest>>();
} else if (std::holds_alternative<foreign_ptr<lw_shared_ptr<query::result>>>(*data_or_digest)) {
return *std::get<foreign_ptr<lw_shared_ptr<query::result>>>(*data_or_digest);
} else {
return std::get<query::result_digest>(*data_or_digest);
}
}
promise(std::optional<proposal> accepted_arg, std::optional<proposal> chosen_arg,
std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest_arg)
: accepted_proposal(std::move(accepted_arg))
, most_recent_commit(std::move(chosen_arg)) {}
, most_recent_commit(std::move(chosen_arg))
, data_or_digest(std::move(data_or_digest_arg)) {}
promise(std::optional<proposal> accepted_arg, std::optional<proposal> chosen_arg,
std::optional<std::variant<query::result, query::result_digest>> data_or_digest_arg)
: accepted_proposal(std::move(accepted_arg))
, most_recent_commit(std::move(chosen_arg)) {
if (data_or_digest_arg) {
if (std::holds_alternative<query::result>(*data_or_digest_arg)) {
data_or_digest = make_foreign(make_lw_shared<query::result>(std::move(std::get<query::result>(*data_or_digest_arg))));
} else {
data_or_digest = std::move(std::get<query::result_digest>(*data_or_digest_arg));
}
}
}
};
// If the prepare request is rejected we return timeUUID of the most recently promised ballot, so

View File

@@ -76,6 +76,10 @@ public:
// only then proposing the new ballot and new mutation. May be empty if there is no such cell for
// this key in the paxos table on any of the replicas.
std::optional<paxos::proposal> most_recent_proposal;
// Value of the requested key received from participating replicas during the prepare phase.
// May be none in case data hadn't been received before a consensus was reached or digests
// received from different replicas didn't match.
foreign_ptr<lw_shared_ptr<query::result>> data;
public:
prepare_summary(size_t node_count);

View File

@@ -106,6 +106,13 @@ distributed<service::storage_proxy> _the_storage_proxy;
using namespace exceptions;
using fbu = utils::fb_utilities;
static inline
query::digest_algorithm digest_algorithm() {
return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm()
? query::digest_algorithm::xxHash
: query::digest_algorithm::MD5;
}
static inline
const dht::token& start_token(const dht::partition_range& r) {
static const dht::token min_token = dht::minimum_token();
@@ -693,9 +700,9 @@ static future<> sleep_approx_50ms() {
return seastar::sleep(std::chrono::milliseconds(dist(re)));
}
static future<std::optional<utils::UUID>> sleep_and_restart() {
static future<std::optional<paxos_response_handler::ballot_and_data>> sleep_and_restart() {
return sleep_approx_50ms().then([] {
return std::optional<utils::UUID>(); // continue
return std::optional<paxos_response_handler::ballot_and_data>(); // continue
});
}
@@ -705,13 +712,14 @@ static future<std::optional<utils::UUID>> sleep_and_restart() {
* @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
* nodes have seen the most recent commit. Otherwise, return null.
*/
future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) {
future<paxos_response_handler::ballot_and_data>
paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) {
_proxy->get_db().local().get_config().check_experimental("Paxos");
return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write]
(api::timestamp_type& min_timestamp_micros_to_use, shared_ptr<paxos_response_handler>& prh) {
return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] {
if (storage_proxy::clock_type::now() > _cas_timeout) {
return make_exception_future<std::optional<utils::UUID>>(
return make_exception_future<std::optional<ballot_and_data>>(
mutation_write_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl_for_paxos, 0,
_required_participants, db::write_type::CAS)
);
@@ -763,11 +771,11 @@ future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state&
return accept_proposal(refreshed_in_progress, false).then([this, &contentions, &refreshed_in_progress] (bool is_accepted) mutable {
if (is_accepted) {
return learn_decision(std::move(refreshed_in_progress), false).then([] {
return make_ready_future<std::optional<utils::UUID>>(std::optional<utils::UUID>());
return make_ready_future<std::optional<ballot_and_data>>(std::optional<ballot_and_data>());
}).handle_exception_type([] (mutation_write_timeout_exception& e) {
e.type = db::write_type::CAS;
// we're still doing preparation for the paxos rounds, so we want to use the CAS (see cASSANDRA-8672)
return make_exception_future<std::optional<utils::UUID>>(std::move(e));
return make_exception_future<std::optional<ballot_and_data>>(std::move(e));
});
} else {
paxos::paxos_state::logger.debug("CAS[{}] Some replicas have already promised a higher ballot than ours; aborting", _id);
@@ -808,11 +816,11 @@ future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state&
} else {
f.ignore_ready_future();
}
return std::optional<utils::UUID>(); // continue
return std::optional<ballot_and_data>(); // continue
});
}
return make_ready_future<std::optional<utils::UUID>>(ballot);
return make_ready_future<std::optional<ballot_and_data>>(ballot_and_data{ballot, std::move(summary.data)});
});
});
});
@@ -825,6 +833,10 @@ template<class T> struct dependent_false : std::false_type {};
future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUID ballot) {
struct {
size_t errors = 0;
// Whether the value of the requested key received from participating replicas match.
bool digests_match = true;
// Digest corresponding to the value of the requested key received from participating replicas.
std::optional<query::result_digest> digest;
// the promise can be set before all replies are received at which point
// the optional will be disengaged so further replies are ignored
std::optional<promise<paxos::prepare_summary>> p = promise<paxos::prepare_summary>();
@@ -846,11 +858,15 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: sending ballot {} to {}", _id, ballot, _live_endpoints);
return parallel_for_each(_live_endpoints, [this, &summary, ballot, &request_tracker] (gms::inet_address peer) mutable {
return futurize_apply([&] {
// To generate less network traffic, only the closest replica (first one in the list of participants)
// sends query result content while other replicas send digests needed to check consistency.
bool only_digest = peer != _live_endpoints[0];
auto da = digest_algorithm();
if (fbu::is_me(peer)) {
return paxos::paxos_state::prepare(tr_state, _schema, _key.key(), ballot, _timeout);
return paxos::paxos_state::prepare(tr_state, _schema, _cmd, _key.key(), ballot, only_digest, da, _timeout);
} else {
netw::messaging_service& ms = netw::get_local_messaging_service();
return ms.send_paxos_prepare(peer, _timeout, _schema->version(), _key.key(), ballot,
return ms.send_paxos_prepare(peer, _timeout, *_cmd, _key.key(), ballot, only_digest, da,
tracing::make_trace_info(tr_state));
}
}).then_wrapped([this, &summary, &request_tracker, peer, ballot]
@@ -920,6 +936,33 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
}
}
// Check if the query result attached to the promise matches query results received from other participants.
if (request_tracker.digests_match) {
if (response.data_or_digest) {
foreign_ptr<lw_shared_ptr<query::result>> data;
if (std::holds_alternative<foreign_ptr<lw_shared_ptr<query::result>>>(*response.data_or_digest)) {
data = std::move(std::get<foreign_ptr<lw_shared_ptr<query::result>>>(*response.data_or_digest));
}
auto& digest = data ? data->digest() : std::get<query::result_digest>(*response.data_or_digest);
if (request_tracker.digest) {
if (*request_tracker.digest != digest) {
request_tracker.digests_match = false;
}
} else {
request_tracker.digest = digest;
}
if (request_tracker.digests_match && !summary.data && data) {
summary.data = std::move(data);
}
} else {
request_tracker.digests_match = false;
}
if (!request_tracker.digests_match) {
request_tracker.digest.reset();
summary.data.reset();
}
}
if (summary.committed_ballots_by_replica.size() == _required_participants) { // got all replies
paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: got enough replies to proceed", _id);
request_tracker.set_value(std::move(summary));
@@ -1905,6 +1948,13 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
cl_for_paxos, participants + 1, live_endpoints.size());
}
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
// If the values received from different replicas match, we skip a separate query stage thus saving
// one network round trip. To generate less traffic, only closest replicas send data, others send
// digests that are used to check consistency. For this optimization to work, we need to sort the
// list of participants by proximity to this instance.
sort_endpoints_by_proximity(live_endpoints);
return paxos_participants{std::move(live_endpoints), required_participants};
}
@@ -2943,12 +2993,6 @@ public:
}
};
query::digest_algorithm digest_algorithm() {
return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm()
? query::digest_algorithm::xxHash
: query::digest_algorithm::MD5;
}
class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
@@ -3894,7 +3938,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
try {
handler = seastar::make_shared<service::paxos_response_handler>(
shared_from_this(), query_options.trace_state, query_options.permit,
partition_ranges[0].start()->value().as_decorated_key(), s, cl, cl_for_learn, timeout, cas_timeout);
partition_ranges[0].start()->value().as_decorated_key(), s, cmd, cl, cl_for_learn, timeout, cas_timeout);
} catch (exceptions::unavailable_exception& ex) {
_stats.cas_read_unavailables.mark();
throw;
@@ -3906,7 +3950,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
lc.start();
return handler->begin_and_repair_paxos(query_options.cstate, contentions, false).then_wrapped([this, s = std::move(s),
cmd = std::move(cmd), partition_ranges = std::move(partition_ranges), query_options = std::move(query_options),
cl_for_learn] (future<utils::UUID> f) mutable {
cl_for_learn] (future<paxos_response_handler::ballot_and_data> f) mutable {
if (f.failed()) {
try {
f.get();
@@ -3919,6 +3963,10 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
read_failure_exception(s->ks_name(), s->cf_name(), ex.consistency, ex.received, ex.failures, ex.block_for, false));
}
}
auto v = f.get0();
if (v.data) {
return make_ready_future<storage_proxy::coordinator_query_result>(storage_proxy::coordinator_query_result(std::move(v.data)));
}
return do_query(s, std::move(cmd), std::move(partition_ranges), cl_for_learn, std::move(query_options));
}).then_wrapped([this, lc, &contentions, handler = std::move(handler)] (future<storage_proxy::coordinator_query_result> f) mutable {
_stats.cas_read.mark(lc.stop().latency());
@@ -3988,7 +4036,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
handler = seastar::make_shared<paxos_response_handler>(shared_from_this(),
query_options.trace_state, query_options.permit,
partition_ranges[0].start()->value().as_decorated_key(),
schema, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout);
schema, cmd, cl_for_paxos, cl_for_commit, write_timeout, cas_timeout);
} catch (exceptions::unavailable_exception& ex) {
_stats.cas_write_unavailables.mark();
throw;
@@ -4008,13 +4056,16 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
// recent enough.
return handler->begin_and_repair_paxos(query_options.cstate, contentions, true)
.then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions]
(utils::UUID ballot) mutable {
(paxos_response_handler::ballot_and_data v) mutable {
// Read the current values and check they validate the conditions.
paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", handler->id());
tracing::trace(handler->tr_state, "Reading existing values for CAS precondition");
return query(schema, cmd, std::move(partition_ranges), cl, query_options)
.then([this, handler, schema, cmd, request, ballot, &contentions] (coordinator_query_result&& qr) {
auto mutation = request->apply(*qr.query_result, cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
auto f = v.data ? make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(v.data)) :
query(schema, cmd, std::move(partition_ranges), cl, query_options).then([] (coordinator_query_result&& qr) {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(qr.query_result));
});
return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions] (auto&& qr) {
auto mutation = request->apply(*qr, cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
if (!mutation) {
paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id());
tracing::trace(handler->tr_state, "CAS precondition does not match current values");
@@ -4076,14 +4127,18 @@ std::vector<gms::inet_address> storage_proxy::get_live_endpoints(keyspace& ks, c
return eps;
}
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) {
auto eps = get_live_endpoints(ks, token);
void storage_proxy::sort_endpoints_by_proximity(std::vector<gms::inet_address>& eps) {
locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps);
// FIXME: before dynamic snitch is implement put local address (if present) at the beginning
auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address());
if (it != eps.end() && it != eps.begin()) {
std::iter_swap(it, eps.begin());
}
}
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) {
auto eps = get_live_endpoints(ks, token);
sort_endpoints_by_proximity(eps);
return eps;
}
@@ -4522,8 +4577,9 @@ void storage_proxy::init_messaging_service() {
});
// Register PAXOS verb handlers
ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, utils::UUID schema_version,
partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
ms.register_paxos_prepare([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da,
std::optional<tracing::trace_info> trace_info) {
auto src_addr = netw::messaging_service::get_source(cinfo);
tracing::trace_state_ptr tr_state;
if (trace_info) {
@@ -4531,9 +4587,9 @@ void storage_proxy::init_messaging_service() {
tracing::begin(tr_state);
}
return get_schema_for_read(schema_version, src_addr).then([tr_state = std::move(tr_state),
key = std::move(key), ballot, timeout] (schema_ptr schema) {
return paxos::paxos_state::prepare(tr_state, std::move(schema), std::move(key), ballot, *timeout);
return get_schema_for_read(cmd.schema_version, src_addr).then([cmd = make_lw_shared<query::read_command>(std::move(cmd)),
key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state)] (schema_ptr schema) {
return paxos::paxos_state::prepare(tr_state, std::move(schema), cmd, std::move(key), ballot, only_digest, da, *timeout);
});
});
ms.register_paxos_accept([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, paxos::proposal proposal,

View File

@@ -301,6 +301,7 @@ private:
bool hints_enabled(db::write_type type) noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token);
static void sort_endpoints_by_proximity(std::vector<gms::inet_address>& eps);
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd,
@@ -555,6 +556,8 @@ private:
shared_ptr<storage_proxy> _proxy;
// The schema for the table the operation works upon.
schema_ptr _schema;
// Read command used by this CAS request.
lw_shared_ptr<query::read_command> _cmd;
// SERIAL or LOCAL SERIAL - influences what endpoints become Paxos protocol participants,
// as well as Paxos quorum size. Is either set explicitly in the query or derived from
// the value set by SERIAL CONSISTENCY [SERIAL|LOCAL SERIAL] control statement.
@@ -587,12 +590,13 @@ public:
public:
paxos_response_handler(shared_ptr<storage_proxy> proxy_arg, tracing::trace_state_ptr tr_state_arg,
service_permit permit_arg,
dht::decorated_key key_arg, schema_ptr schema_arg,
dht::decorated_key key_arg, schema_ptr schema_arg, lw_shared_ptr<query::read_command> cmd_arg,
db::consistency_level cl_for_paxos_arg, db::consistency_level cl_for_learn_arg,
storage_proxy::clock_type::time_point timeout_arg, storage_proxy::clock_type::time_point cas_timeout_arg)
: _proxy(proxy_arg)
, _schema(std::move(schema_arg))
, _cmd(cmd_arg)
, _cl_for_paxos(cl_for_paxos_arg)
, _cl_for_learn(cl_for_learn_arg)
, _timeout(timeout_arg)
@@ -605,8 +609,16 @@ public:
_required_participants = pp.required_participants;
}
// Result of PREPARE step, i.e. begin_and_repair_paxos().
struct ballot_and_data {
// Accepted ballot.
utils::UUID ballot;
// Current value of the requested key or none.
foreign_ptr<lw_shared_ptr<query::result>> data;
};
// Steps of the Paxos protocol
future<utils::UUID> begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write);
future<ballot_and_data> begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write);
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);