Merge 'Support for sending tablet info to the drivers' from Sylwia Szunejko

There is a need for sending tablet info to the drivers so they can be tablet aware. For the best performance we want to get this info lazily only when it is needed.

The info is send when driver asks about the information that the specific tablet contains and it is directed to the wrong node/shard so it could use that information for every subsequent query. If we send the query to the wrong node/shard, we want to send the RESULT message with additional information about the tablet (replicas and token range) in custom_payload.

Mechanism for sending custom_payload added.

Sending custom_payload tested using three node cluster and cqlsh queries. I used RF=1 so choosing wrong node was testable.

I also manually tested it with the python-driver and confirmed that the tablet info can be deserialized properly.

Automatic tests added.

Closes scylladb/scylladb#15410

* github.com:scylladb/scylladb:
  docs: add documentation about sending tablet info to protocol extensions
  Add tests for sending tablet info
  cql3: send tablet if wrong node/shard is used during modification statement
  cql3: send tablet if wrong node/shard is used during select statement
  locator: add function to check locality
  locator: add function to check if host is local
  transport: add function to add tablet info to the result_message
  transport: add support for setting custom payload
This commit is contained in:
Tomasz Grabiec
2023-11-22 12:04:14 +01:00
committed by Botond Dénes
18 changed files with 359 additions and 26 deletions

View File

@@ -168,7 +168,9 @@ future<std::vector<mutation>> batch_statement::get_mutations(query_processor& qp
statement->inc_cql_stats(query_state.get_client_state().is_internal());
auto&& statement_options = options.for_statement(i);
auto timestamp = _attrs->get_timestamp(now, statement_options);
auto more = co_await statement->get_mutations(qp, statement_options, timeout, local, timestamp, query_state);
modification_statement::json_cache_opt json_cache = statement->maybe_prepare_json_cache(statement_options);
std::vector<dht::partition_range> keys = statement->build_partition_keys(statement_options, json_cache);
auto more = co_await statement->get_mutations(qp, statement_options, timeout, local, timestamp, query_state, json_cache, std::move(keys));
for (auto&& m : more) {
// We want unordered_set::try_emplace(), but we don't have it

View File

@@ -114,10 +114,8 @@ future<> modification_statement::check_access(query_processor& qp, const service
}
future<std::vector<mutation>>
modification_statement::get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs) const {
modification_statement::get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const {
auto cl = options.get_consistency();
auto json_cache = maybe_prepare_json_cache(options);
auto keys = build_partition_keys(options, json_cache);
auto ranges = create_clustering_ranges(options, json_cache);
auto f = make_ready_future<update_parameters::prefetch_data>(s);
@@ -272,24 +270,44 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
_restrictions->validate_primary_key(options);
if (has_conditions()) {
return execute_with_condition(qp, qs, options);
co_return co_await execute_with_condition(qp, qs, options);
}
return execute_without_condition(qp, qs, options).then([] (coordinator_result<> res) {
if (!res) {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error()));
json_cache_opt json_cache = maybe_prepare_json_cache(options);
std::vector<dht::partition_range> keys = build_partition_keys(options, json_cache);
bool keys_size_one = keys.size() == 1;
auto token = dht::token();
if (keys_size_one) {
token = keys[0].start()->value().token();
}
auto res = co_await execute_without_condition(qp, qs, options, json_cache, std::move(keys));
if (!res) {
co_return seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error());
}
auto result = seastar::make_shared<cql_transport::messages::result_message::void_message>();
if (keys_size_one) {
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets()) {
auto erm = table.get_effective_replication_map();
auto tablet_info = erm->check_locality(token);
if (tablet_info.has_value()) {
result->add_tablet_info(tablet_info->tablet_replicas, tablet_info->token_range);
}
}
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
::shared_ptr<cql_transport::messages::result_message>{});
});
}
co_return std::move(result);
}
future<coordinator_result<>>
modification_statement::execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options) const {
modification_statement::execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const {
auto cl = options.get_consistency();
auto timeout = db::timeout_clock::now() + get_timeout(qs.get_client_state(), options);
return get_mutations(qp, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &qp, &qs] (auto mutations) {
return get_mutations(qp, options, timeout, false, options.get_timestamp(qs), qs, json_cache, std::move(keys)).then([this, cl, timeout, &qp, &qs] (auto mutations) {
if (mutations.empty()) {
return make_ready_future<coordinator_result<>>(bo::success());
}
@@ -328,17 +346,29 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
auto shard = service::storage_proxy::cas_shard(*s, request->key()[0].start()->value().as_decorated_key().token());
auto token = request->key()[0].start()->value().as_decorated_key().token();
auto shard = service::storage_proxy::cas_shard(*s, token);
if (shard != this_shard_id()) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
qp.bounce_to_shard(shard, std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()))
);
}
std::optional<locator::tablet_routing_info> tablet_info = locator::tablet_routing_info{locator::tablet_replica_set(), std::pair<dht::token, dht::token>()};
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets()) {
auto erm = table.get_effective_replication_map();
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, request, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;
});
}
@@ -509,6 +539,7 @@ modification_statement::prepare(data_dictionary::database db, prepare_context& c
if (!prepared_stmt->has_conditions() && prepared_stmt->_restrictions.has_value()) {
ctx.clear_pk_function_calls_cache();
}
prepared_stmt->_may_use_token_aware_routing = ctx.get_partition_key_bind_indexes(*schema).size() != 0;
return prepared_stmt;
}

View File

@@ -42,7 +42,7 @@ namespace raw { class modification_statement; }
class modification_statement : public cql_statement_opt_metadata {
public:
const statement_type type;
bool _may_use_token_aware_routing;
private:
const uint32_t _bound_terms;
// If we have operation on list entries, such as adding or
@@ -236,7 +236,7 @@ public:
private:
future<exceptions::coordinator_result<>>
execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options) const;
execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const;
future<::shared_ptr<cql_transport::messages::result_message>>
execute_with_condition(query_processor& qp, service::query_state& qs, const query_options& options) const;
@@ -252,7 +252,7 @@ public:
* @return vector of the mutations
* @throws invalid_request_exception on invalid requests
*/
future<std::vector<mutation>> get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs) const;
future<std::vector<mutation>> get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const;
virtual json_cache_opt maybe_prepare_json_cache(const query_options& options) const;

View File

@@ -404,12 +404,25 @@ select_statement::do_execute(query_processor& qp,
auto key_ranges = _restrictions->get_partition_key_ranges(options);
auto token = dht::token();
std::optional<locator::tablet_routing_info> tablet_info = {};
auto&& table = _schema->table();
if (_may_use_token_aware_routing && table.uses_tablets()) {
if (key_ranges.size() == 1 && query::is_single_partition(key_ranges.front())) {
token = key_ranges[0].start()->value().as_decorated_key().token();
auto erm = table.get_effective_replication_map();
tablet_info = erm->check_locality(token);
}
}
if (db::is_serial_consistency(options.get_consistency())) {
if (key_ranges.size() != 1 || !query::is_single_partition(key_ranges.front())) {
throw exceptions::invalid_request_exception(
"SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
}
unsigned shard = _schema->table().shard_of(key_ranges[0].start()->value().as_decorated_key().token());
unsigned shard = table.shard_of(key_ranges[0].start()->value().as_decorated_key().token());
if (this_shard_id() != shard) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
qp.bounce_to_shard(shard, std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()))
@@ -417,13 +430,24 @@ select_statement::do_execute(query_processor& qp,
}
}
auto f = make_ready_future<shared_ptr<cql_transport::messages::result_message>>();
if (!aggregate && !_restrictions_need_filtering && (page_size <= 0
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
*command, key_ranges))) {
return execute_without_checking_exception_message_non_aggregate_unpaged(qp, command, std::move(key_ranges), state, options, now);
f = execute_without_checking_exception_message_non_aggregate_unpaged(qp, command, std::move(key_ranges), state, options, now);
} else {
return execute_without_checking_exception_message_aggregate_or_paged(qp, command, std::move(key_ranges), state, options, now, page_size, aggregate, nonpaged_filtering);
f = execute_without_checking_exception_message_aggregate_or_paged(qp, command, std::move(key_ranges), state, options, now, page_size, aggregate, nonpaged_filtering);
}
if (!tablet_info.has_value()) {
return f;
}
return f.then([tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (auto res) mutable {
res->add_tablet_info(std::move(tablet_replicas), token_range);
return res;
});
}
future<::shared_ptr<cql_transport::messages::result_message>>
@@ -2077,7 +2101,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
}
auto partition_key_bind_indices = ctx.get_partition_key_bind_indexes(*schema);
stmt->_may_use_token_aware_routing = partition_key_bind_indices.size() != 0;
return make_unique<prepared_statement>(std::move(stmt), ctx, std::move(partition_key_bind_indices), std::move(warnings));
}

View File

@@ -55,6 +55,7 @@ public:
using parameters = raw::select_statement::parameters;
using ordering_comparator_type = raw::select_statement::ordering_comparator_type;
static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000;
bool _may_use_token_aware_routing;
protected:
static thread_local const lw_shared_ptr<const parameters> _default_parameters;
schema_ptr _schema;

View File

@@ -180,3 +180,22 @@ The string map in the SUPPORTED response will contain the following parameters:
- `ERROR_CODE`: a 32-bit signed decimal integer which Scylla
will use as the error code for the rate limit exception.
## Sending tablet info to the drivers
This extension adds support for sending tablet info to the drivers if the
request was routed to the wrong node/shard.
There is a need for sending tablet info to the drivers so they can be
tablet aware.
For the best performance we want to get this info lazily only when it is
needed.
The info is send when driver asks about the information that the specific
tablet contains and it is directed to the wrong node/shard so it could
use that information for every subsequent query.
If we send the query to the wrong node/shard, we want to send the RESULT
message with additional information about the tablet in `custom_payload`:
- `tablet_replicas` - information about tablet replicas, for every replica there is information about the host and shard.
- `token_range` - information about token range for that tablet in format `(first_token, last_token]`.

View File

@@ -120,6 +120,10 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_endpoints_f
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
}
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token& token) const {
return {};
}
bool vnode_effective_replication_map::has_pending_ranges(inet_address endpoint) const {
for (const auto& item : _pending_endpoints) {
const auto& nodes = item.second;

View File

@@ -24,6 +24,7 @@
#include "utils/maybe_yield.hh"
#include "utils/sequenced_set.hh"
#include "utils/simple_hashers.hh"
#include "tablets.hh"
// forward declaration since replica/database.hh includes this file
namespace replica {
@@ -215,6 +216,9 @@ public:
/// Returns a list of nodes to which a read request should be directed.
virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const = 0;
virtual std::optional<tablet_routing_info> check_locality(const token& token) const = 0;
/// Returns true if there are any pending ranges for this endpoint.
/// This operation is expensive, for vnode_erm it iterates
/// over all pending ranges which is O(number of tokens).
@@ -290,6 +294,7 @@ public: // effective_replication_map
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override;
inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override;
inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override;
std::optional<tablet_routing_info> check_locality(const token& token) const override;
bool has_pending_ranges(inet_address endpoint) const override;
std::unique_ptr<token_range_splitter> make_splitter() const override;
const dht::sharder& get_sharder(const schema& s) const override;

View File

@@ -429,6 +429,42 @@ public:
return result;
}
std::optional<tablet_routing_info> check_locality(const token& search_token) const override {
auto&& tablets = get_tablet_map();
auto tid = tablets.get_tablet_id(search_token);
auto&& info = tablets.get_tablet_info(tid);
auto host = get_token_metadata().get_my_id();
auto shard = this_shard_id();
auto make_tablet_routing_info = [&] {
dht::token first_token;
if (tid == tablets.first_tablet()) {
first_token = dht::minimum_token();
} else {
first_token = tablets.get_last_token(tablet_id(size_t(tid) - 1));
}
auto token_range = std::make_pair(first_token, tablets.get_last_token(tid));
return tablet_routing_info{info.replicas, token_range};
};
for (auto&& r : info.replicas) {
if (r.host == host) {
if (r.shard == shard) {
return std::nullopt; // routed correctly
} else {
return make_tablet_routing_info();
}
}
}
auto tinfo = tablets.get_tablet_transition_info(tid);
if (tinfo && tinfo->pending_replica.host == host && tinfo->pending_replica.shard == shard) {
return std::nullopt; // routed correctly
}
return make_tablet_routing_info();
}
virtual bool has_pending_ranges(inet_address endpoint) const override {
const auto host_id = _tmptr->get_host_id_if_known(endpoint);
if (!host_id.has_value()) {

View File

@@ -114,6 +114,16 @@ tablet_replica_set replace_replica(const tablet_replica_set& rs, tablet_replica
return result;
}
inline
bool contains(const tablet_replica_set& rs, host_id host) {
for (auto replica : rs) {
if (replica.host == host) {
return true;
}
}
return false;
}
/// Stores information about a single tablet.
struct tablet_info {
tablet_replica_set replicas;
@@ -339,6 +349,11 @@ public:
friend std::ostream& operator<<(std::ostream&, const tablet_metadata&);
};
struct tablet_routing_info {
tablet_replica_set tablet_replicas;
std::pair<dht::token, dht::token> token_range;
};
}
template <>

View File

@@ -25,6 +25,10 @@ using namespace locator;
static thread_local auto replica_type = tuple_type_impl::get_instance({uuid_type, int32_type});
static thread_local auto replica_set_type = list_type_impl::get_instance(replica_type, false);
data_type get_replica_set_type() {
return replica_set_type;
}
schema_ptr make_tablets_schema() {
// FIXME: Allow UDTs in system keyspace:
// CREATE TYPE tablet_replica (replica_id uuid, shard int);
@@ -43,7 +47,6 @@ schema_ptr make_tablets_schema() {
.build();
}
static
std::vector<data_value> replicas_to_data_value(const tablet_replica_set& replicas) {
std::vector<data_value> result;
result.reserve(replicas.size());

View File

@@ -8,6 +8,9 @@
#pragma once
#include "types/types.hh"
#include "types/tuple.hh"
#include "types/list.hh"
#include "timestamp.hh"
#include "locator/tablets.hh"
#include "schema/schema_fwd.hh"
@@ -28,8 +31,12 @@ class query_processor;
namespace replica {
data_type get_replica_set_type();
schema_ptr make_tablets_schema();
std::vector<data_value> replicas_to_data_value(const locator::tablet_replica_set& replicas);
/// Converts information in tablet_map to mutations of system.tablets.
///
/// The mutations will delete any older tablet information for the same table.

View File

@@ -5707,3 +5707,143 @@ SEASTAR_TEST_CASE(test_setting_synchronous_updates_property) {
);
});
}
static
cql_test_config tablet_cql_test_config() {
cql_test_config c;
c.db_config->experimental_features({
db::experimental_features_t::feature::TABLETS,
db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES,
}, db::config::config_source::CommandLine);
c.db_config->consistent_cluster_management(true);
return c;
}
static
bool has_tablet_routing(::shared_ptr<cql_transport::messages::result_message> result) {
auto custom_payload = result->custom_payload();
if (!custom_payload.has_value() || custom_payload->find("tablet_replicas") == custom_payload->end() || custom_payload->find("token_range") == custom_payload->end()) {
return false;
}
return true;
}
SEASTAR_TEST_CASE(test_sending_tablet_info_unprepared_insert) {
BOOST_ASSERT(smp::count == 2);
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get();
e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
smp::submit_to(0, [&] {
return seastar::async([&] {
auto result = e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
smp::submit_to(1, [&] {
return seastar::async([&] {
auto result = e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_sending_tablet_info_unprepared_select) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get();
e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get();
smp::submit_to(0, [&] {
return seastar::async([&] {
auto result = e.execute_cql("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = 1;").get();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
smp::submit_to(1, [&] {
return seastar::async([&] {
auto result = e.execute_cql("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = 1;").get();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_sending_tablet_info_insert) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get();
e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
auto insert = e.prepare("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (?, ?, ?);").get0();
std::vector<cql3::raw_value> raw_values;
raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{1})));
raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{2})));
raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{3})));
const auto sptr = e.local_db().find_schema("ks_tablet", "test_tablet");
auto pk = partition_key::from_singular(*sptr, int32_t(1));
unsigned local_shard = sptr->table().shard_of(dht::get_token(*sptr, pk.view()));
smp::submit_to(local_shard, [&] {
return seastar::async([&] {
auto result = e.execute_prepared(insert, raw_values).get0();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
std::vector<cql3::raw_value> raw_values2;
raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{2})));
raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{3})));
raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{4})));
auto pk2 = partition_key::from_singular(*sptr, int32_t(2));
unsigned local_shard2 = sptr->table().shard_of(dht::get_token(*sptr, pk2.view()));
unsigned foreign_shard = (local_shard2 + 1) % smp::count;
smp::submit_to(foreign_shard, [&] {
return seastar::async([&] {
auto result = e.execute_prepared(insert, raw_values2).get0();
BOOST_ASSERT(has_tablet_routing(result));
});
}).get();
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_sending_tablet_info_select) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get();
e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get();
auto select = e.prepare("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = ?;").get0();
std::vector<cql3::raw_value> raw_values;
raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{1})));
const auto sptr = e.local_db().find_schema("ks_tablet", "test_tablet");
auto pk = partition_key::from_singular(*sptr, int32_t(1));
unsigned local_shard = sptr->table().shard_of(dht::get_token(*sptr, pk.view()));
unsigned foreign_shard = (local_shard + 1) % smp::count;
smp::submit_to(local_shard, [&] {
return seastar::async([&] {
auto result = e.execute_prepared(select, raw_values).get0();
BOOST_ASSERT(!has_tablet_routing(result));
});
}).get();
smp::submit_to(foreign_shard, [&] {
return seastar::async([&] {
auto result = e.execute_prepared(select, raw_values).get0();
BOOST_ASSERT(has_tablet_routing(result));
});
}).get();
}, tablet_cql_test_config());
}

View File

@@ -290,8 +290,10 @@ public:
}
auto& qo = cql3::query_options::DEFAULT;
auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
cql3::statements::modification_statement::json_cache_opt json_cache = modif_stmt->maybe_prepare_json_cache(qo);
std::vector<dht::partition_range> keys = modif_stmt->build_partition_keys(qo, json_cache);
return modif_stmt->get_mutations(local_qp(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
return modif_stmt->get_mutations(local_qp(), qo, timeout, false, qo.get_timestamp(*qs), *qs, json_cache, keys)
.finally([qs, modif_stmt = std::move(modif_stmt)] {});
}

View File

@@ -9,16 +9,20 @@
#pragma once
#include <map>
#include <vector>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "locator/tablets.hh"
#include "replica/tablets.hh"
namespace cql_transport {
namespace messages {
class result_message {
std::vector<sstring> _warnings;
std::optional<std::unordered_map<sstring, bytes>> _custom_payload;
public:
class visitor;
class visitor_base;
@@ -35,6 +39,31 @@ public:
return _warnings;
}
void add_custom_payload(sstring key, bytes value) {
if (!_custom_payload) {
_custom_payload = std::optional<std::unordered_map<sstring, bytes>>{std::unordered_map<sstring, bytes>()};
}
_custom_payload.value()[key] = value;
}
void add_tablet_info(locator::tablet_replica_set tablet_replicas, std::pair<dht::token, dht::token> token_range) {
if (!tablet_replicas.empty()) {
auto replicas_values = make_list_value(replica::get_replica_set_type(), replica::replicas_to_data_value(tablet_replicas));
this->add_custom_payload("tablet_replicas", replicas_values.serialize_nonnull());
auto v1 = data_value(dht::token::to_int64(token_range.first));
auto v2 = data_value(dht::token::to_int64(token_range.second));
bytes token_bytes(bytes::initialized_later(), v1.serialized_size() + v2.serialized_size());
auto i = token_bytes.begin();
v1.serialize(i);
v2.serialize(i);
this->add_custom_payload("token_range", token_bytes);
}
}
const std::optional<std::unordered_map<sstring, bytes>>& custom_payload() const {
return _custom_payload;
}
virtual std::optional<unsigned> move_to_shard() const {
return std::nullopt;
}

View File

@@ -73,6 +73,7 @@ public:
void write_consistency(db::consistency_level c);
void write_string_map(std::map<sstring, sstring> string_map);
void write_string_multimap(std::multimap<sstring, sstring> string_map);
void write_string_bytes_map(const std::unordered_map<sstring, bytes>& map);
void write_value(bytes_opt value);
void write_value(std::optional<managed_bytes_view> value);
void write(const cql3::metadata& m, bool skip = false);

View File

@@ -1539,6 +1539,10 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
response->set_frame_flag(cql_frame_flags::warning);
response->write_string_list(msg.warnings());
}
if (msg.custom_payload()) {
response->set_frame_flag(cql_frame_flags::custom_payload);
response->write_string_bytes_map(msg.custom_payload().value());
}
cql_server::fmt_visitor fmt{version, *response, skip_metadata};
msg.accept(fmt);
return response;
@@ -1788,6 +1792,15 @@ void cql_server::response::write_string_multimap(std::multimap<sstring, sstring>
}
}
void cql_server::response::write_string_bytes_map(const std::unordered_map<sstring, bytes>& map)
{
write_short(cast_if_fits<uint16_t>(map.size()));
for (auto&& s : map) {
write_string(s.first);
write_bytes(s.second);
}
}
void cql_server::response::write_value(bytes_opt value)
{
if (!value) {

View File

@@ -75,6 +75,7 @@ enum class cql_compression {
enum cql_frame_flags {
compression = 0x01,
tracing = 0x02,
custom_payload = 0x04,
warning = 0x08,
};