From 75b3dbf7eaa2ac7c35c88b0f4e49b242bd6f9f4c Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 7 Sep 2023 15:52:59 +0200 Subject: [PATCH 1/8] transport: add support for setting custom payload A custom payload can now be added to response_message. If it is set, it will be sent to client and the custom_payload flag will be set. write_string_bytes_map method is added to response class and a missing custom_payload flag is added to cql_frame_flags. --- transport/messages/result_message_base.hh | 13 +++++++++++++ transport/response.hh | 1 + transport/server.cc | 13 +++++++++++++ transport/server.hh | 1 + 4 files changed, 28 insertions(+) diff --git a/transport/messages/result_message_base.hh b/transport/messages/result_message_base.hh index c8132a6671..fc7a003ff3 100644 --- a/transport/messages/result_message_base.hh +++ b/transport/messages/result_message_base.hh @@ -9,6 +9,7 @@ #pragma once +#include #include #include @@ -19,6 +20,7 @@ namespace messages { class result_message { std::vector _warnings; + std::optional> _custom_payload; public: class visitor; class visitor_base; @@ -35,6 +37,17 @@ public: return _warnings; } + void add_custom_payload(sstring key, bytes value) { + if (!_custom_payload) { + _custom_payload = std::optional>{std::unordered_map()}; + } + _custom_payload.value()[key] = value; + } + + const std::optional>& custom_payload() const { + return _custom_payload; + } + virtual std::optional move_to_shard() const { return std::nullopt; } diff --git a/transport/response.hh b/transport/response.hh index b88760dd05..81823a30e8 100644 --- a/transport/response.hh +++ b/transport/response.hh @@ -73,6 +73,7 @@ public: void write_consistency(db::consistency_level c); void write_string_map(std::map string_map); void write_string_multimap(std::multimap string_map); + void write_string_bytes_map(const std::unordered_map& map); void write_value(bytes_opt value); void write_value(std::optional value); void write(const cql3::metadata& m, bool skip = false); diff --git a/transport/server.cc b/transport/server.cc index 44bd0c8e79..aea5a5c2d3 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1539,6 +1539,10 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_ response->set_frame_flag(cql_frame_flags::warning); response->write_string_list(msg.warnings()); } + if (msg.custom_payload()) { + response->set_frame_flag(cql_frame_flags::custom_payload); + response->write_string_bytes_map(msg.custom_payload().value()); + } cql_server::fmt_visitor fmt{version, *response, skip_metadata}; msg.accept(fmt); return response; @@ -1788,6 +1792,15 @@ void cql_server::response::write_string_multimap(std::multimap } } +void cql_server::response::write_string_bytes_map(const std::unordered_map& map) +{ + write_short(cast_if_fits(map.size())); + for (auto&& s : map) { + write_string(s.first); + write_bytes(s.second); + } +} + void cql_server::response::write_value(bytes_opt value) { if (!value) { diff --git a/transport/server.hh b/transport/server.hh index 7a82baf02a..e27d9fa0c4 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -75,6 +75,7 @@ enum class cql_compression { enum cql_frame_flags { compression = 0x01, tracing = 0x02, + custom_payload = 0x04, warning = 0x08, }; From 93420353f4784d6ca42503544e28427d5a87b633 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 13 Sep 2023 15:26:54 +0200 Subject: [PATCH 2/8] transport: add function to add tablet info to the result_message --- replica/tablets.cc | 5 ++++- replica/tablets.hh | 7 +++++++ transport/messages/result_message_base.hh | 16 ++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/replica/tablets.cc b/replica/tablets.cc index 25d2bf3ebb..2f437a5873 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -25,6 +25,10 @@ using namespace locator; static thread_local auto replica_type = tuple_type_impl::get_instance({uuid_type, int32_type}); static thread_local auto replica_set_type = list_type_impl::get_instance(replica_type, false); +data_type get_replica_set_type() { + return replica_set_type; +} + schema_ptr make_tablets_schema() { // FIXME: Allow UDTs in system keyspace: // CREATE TYPE tablet_replica (replica_id uuid, shard int); @@ -43,7 +47,6 @@ schema_ptr make_tablets_schema() { .build(); } -static std::vector replicas_to_data_value(const tablet_replica_set& replicas) { std::vector result; result.reserve(replicas.size()); diff --git a/replica/tablets.hh b/replica/tablets.hh index de00c332ef..12627f29b9 100644 --- a/replica/tablets.hh +++ b/replica/tablets.hh @@ -8,6 +8,9 @@ #pragma once +#include "types/types.hh" +#include "types/tuple.hh" +#include "types/list.hh" #include "timestamp.hh" #include "locator/tablets.hh" #include "schema/schema_fwd.hh" @@ -28,8 +31,12 @@ class query_processor; namespace replica { +data_type get_replica_set_type(); + schema_ptr make_tablets_schema(); +std::vector replicas_to_data_value(const locator::tablet_replica_set& replicas); + /// Converts information in tablet_map to mutations of system.tablets. /// /// The mutations will delete any older tablet information for the same table. diff --git a/transport/messages/result_message_base.hh b/transport/messages/result_message_base.hh index fc7a003ff3..db4647c726 100644 --- a/transport/messages/result_message_base.hh +++ b/transport/messages/result_message_base.hh @@ -14,6 +14,8 @@ #include #include "seastarx.hh" +#include "locator/tablets.hh" +#include "replica/tablets.hh" namespace cql_transport { namespace messages { @@ -44,6 +46,20 @@ public: _custom_payload.value()[key] = value; } + void add_tablet_info(locator::tablet_replica_set tablet_replicas, std::pair token_range) { + if (!tablet_replicas.empty()) { + auto replicas_values = make_list_value(replica::get_replica_set_type(), replica::replicas_to_data_value(tablet_replicas)); + this->add_custom_payload("tablet_replicas", replicas_values.serialize_nonnull()); + auto v1 = data_value(dht::token::to_int64(token_range.first)); + auto v2 = data_value(dht::token::to_int64(token_range.second)); + bytes token_bytes(bytes::initialized_later(), v1.serialized_size() + v2.serialized_size()); + auto i = token_bytes.begin(); + v1.serialize(i); + v2.serialize(i); + this->add_custom_payload("token_range", token_bytes); + } + } + const std::optional>& custom_payload() const { return _custom_payload; } From a0c85318756a983453feb3d8cdd0b687ab1ffd37 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 22 Sep 2023 13:01:16 +0200 Subject: [PATCH 3/8] locator: add function to check if host is local --- locator/tablets.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/locator/tablets.hh b/locator/tablets.hh index 29abb4edb5..b04225ee2f 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -114,6 +114,16 @@ tablet_replica_set replace_replica(const tablet_replica_set& rs, tablet_replica return result; } +inline +bool contains(const tablet_replica_set& rs, host_id host) { + for (auto replica : rs) { + if (replica.host == host) { + return true; + } + } + return false; +} + /// Stores information about a single tablet. struct tablet_info { tablet_replica_set replicas; From 954d51389cb1a83897625e824b98b945067043a6 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 13 Sep 2023 15:31:58 +0200 Subject: [PATCH 4/8] locator: add function to check locality --- locator/abstract_replication_strategy.cc | 4 +++ locator/abstract_replication_strategy.hh | 5 ++++ locator/tablets.cc | 36 ++++++++++++++++++++++++ locator/tablets.hh | 5 ++++ 4 files changed, 50 insertions(+) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 475771a863..39621b48cb 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -120,6 +120,10 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_endpoints_f return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); } +std::optional vnode_effective_replication_map::check_locality(const token& token) const { + return {}; +} + bool vnode_effective_replication_map::has_pending_ranges(inet_address endpoint) const { for (const auto& item : _pending_endpoints) { const auto& nodes = item.second; diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 6df2e36ff3..b9a0419fb0 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -24,6 +24,7 @@ #include "utils/maybe_yield.hh" #include "utils/sequenced_set.hh" #include "utils/simple_hashers.hh" +#include "tablets.hh" // forward declaration since replica/database.hh includes this file namespace replica { @@ -215,6 +216,9 @@ public: /// Returns a list of nodes to which a read request should be directed. virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const = 0; + virtual std::optional check_locality(const token& token) const = 0; + + /// Returns true if there are any pending ranges for this endpoint. /// This operation is expensive, for vnode_erm it iterates /// over all pending ranges which is O(number of tokens). @@ -290,6 +294,7 @@ public: // effective_replication_map inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override; inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override; inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override; + std::optional check_locality(const token& token) const override; bool has_pending_ranges(inet_address endpoint) const override; std::unique_ptr make_splitter() const override; const dht::sharder& get_sharder(const schema& s) const override; diff --git a/locator/tablets.cc b/locator/tablets.cc index d016639570..9a31bd8180 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -430,6 +430,42 @@ public: return result; } + std::optional check_locality(const token& search_token) const override { + auto&& tablets = get_tablet_map(); + auto tid = tablets.get_tablet_id(search_token); + auto&& info = tablets.get_tablet_info(tid); + auto host = get_token_metadata().get_my_id(); + auto shard = this_shard_id(); + + auto make_tablet_routing_info = [&] { + dht::token first_token; + if (tid == tablets.first_tablet()) { + first_token = dht::minimum_token(); + } else { + first_token = tablets.get_last_token(tablet_id(size_t(tid) - 1)); + } + auto token_range = std::make_pair(first_token, tablets.get_last_token(tid)); + return tablet_routing_info{info.replicas, token_range}; + }; + + for (auto&& r : info.replicas) { + if (r.host == host) { + if (r.shard == shard) { + return std::nullopt; // routed correctly + } else { + return make_tablet_routing_info(); + } + } + } + + auto tinfo = tablets.get_tablet_transition_info(tid); + if (tinfo && tinfo->pending_replica.host == host && tinfo->pending_replica.shard == shard) { + return std::nullopt; // routed correctly + } + + return make_tablet_routing_info(); + } + virtual bool has_pending_ranges(inet_address endpoint) const override { const auto host_id = _tmptr->get_host_id_if_known(endpoint); if (!host_id.has_value()) { diff --git a/locator/tablets.hh b/locator/tablets.hh index b04225ee2f..260aa5a983 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -349,6 +349,11 @@ public: friend std::ostream& operator<<(std::ostream&, const tablet_metadata&); }; +struct tablet_routing_info { + tablet_replica_set tablet_replicas; + std::pair token_range; +}; + } template <> From 54f22927a36533536da4dd2603a5b088b2dd45bd Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 13 Sep 2023 15:25:54 +0200 Subject: [PATCH 5/8] cql3: send tablet if wrong node/shard is used during select statement --- cql3/statements/select_statement.cc | 32 +++++++++++++++++++++++++---- cql3/statements/select_statement.hh | 1 + 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 8c766c4d81..dfc40d6b3c 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -404,12 +404,25 @@ select_statement::do_execute(query_processor& qp, auto key_ranges = _restrictions->get_partition_key_ranges(options); + auto token = dht::token(); + std::optional tablet_info = {}; + + auto&& table = _schema->table(); + if (_may_use_token_aware_routing && table.uses_tablets()) { + if (key_ranges.size() == 1 && query::is_single_partition(key_ranges.front())) { + token = key_ranges[0].start()->value().as_decorated_key().token(); + + auto erm = table.get_effective_replication_map(); + tablet_info = erm->check_locality(token); + } + } + if (db::is_serial_consistency(options.get_consistency())) { if (key_ranges.size() != 1 || !query::is_single_partition(key_ranges.front())) { throw exceptions::invalid_request_exception( "SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); } - unsigned shard = _schema->table().shard_of(key_ranges[0].start()->value().as_decorated_key().token()); + unsigned shard = table.shard_of(key_ranges[0].start()->value().as_decorated_key().token()); if (this_shard_id() != shard) { return make_ready_future>( qp.bounce_to_shard(shard, std::move(const_cast(options).take_cached_pk_function_calls())) @@ -417,13 +430,24 @@ select_statement::do_execute(query_processor& qp, } } + auto f = make_ready_future>(); + if (!aggregate && !_restrictions_need_filtering && (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*_schema, page_size, *command, key_ranges))) { - return execute_without_checking_exception_message_non_aggregate_unpaged(qp, command, std::move(key_ranges), state, options, now); + f = execute_without_checking_exception_message_non_aggregate_unpaged(qp, command, std::move(key_ranges), state, options, now); } else { - return execute_without_checking_exception_message_aggregate_or_paged(qp, command, std::move(key_ranges), state, options, now, page_size, aggregate, nonpaged_filtering); + f = execute_without_checking_exception_message_aggregate_or_paged(qp, command, std::move(key_ranges), state, options, now, page_size, aggregate, nonpaged_filtering); } + + if (!tablet_info.has_value()) { + return f; + } + + return f.then([tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (auto res) mutable { + res->add_tablet_info(std::move(tablet_replicas), token_range); + return res; + }); } future<::shared_ptr> @@ -2077,7 +2101,7 @@ std::unique_ptr select_statement::prepare(data_dictionary::d } auto partition_key_bind_indices = ctx.get_partition_key_bind_indexes(*schema); - + stmt->_may_use_token_aware_routing = partition_key_bind_indices.size() != 0; return make_unique(std::move(stmt), ctx, std::move(partition_key_bind_indices), std::move(warnings)); } diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index 267888ca8e..261ac89b18 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -55,6 +55,7 @@ public: using parameters = raw::select_statement::parameters; using ordering_comparator_type = raw::select_statement::ordering_comparator_type; static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000; + bool _may_use_token_aware_routing; protected: static thread_local const lw_shared_ptr _default_parameters; schema_ptr _schema; From cea4c40685b64d39c453e74e432be7e00e80e0d2 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 14 Sep 2023 10:14:40 +0200 Subject: [PATCH 6/8] cql3: send tablet if wrong node/shard is used during modification statement --- cql3/statements/batch_statement.cc | 4 +- cql3/statements/modification_statement.cc | 63 +++++++++++++++++------ cql3/statements/modification_statement.hh | 6 +-- test/lib/cql_test_env.cc | 4 +- 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc index 125f056d04..5911ad18a2 100644 --- a/cql3/statements/batch_statement.cc +++ b/cql3/statements/batch_statement.cc @@ -168,7 +168,9 @@ future> batch_statement::get_mutations(query_processor& qp statement->inc_cql_stats(query_state.get_client_state().is_internal()); auto&& statement_options = options.for_statement(i); auto timestamp = _attrs->get_timestamp(now, statement_options); - auto more = co_await statement->get_mutations(qp, statement_options, timeout, local, timestamp, query_state); + modification_statement::json_cache_opt json_cache = statement->maybe_prepare_json_cache(statement_options); + std::vector keys = statement->build_partition_keys(statement_options, json_cache); + auto more = co_await statement->get_mutations(qp, statement_options, timeout, local, timestamp, query_state, json_cache, std::move(keys)); for (auto&& m : more) { // We want unordered_set::try_emplace(), but we don't have it diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index a609c34b4d..58bfd19cb3 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -114,10 +114,8 @@ future<> modification_statement::check_access(query_processor& qp, const service } future> -modification_statement::get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs) const { +modification_statement::get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs, json_cache_opt& json_cache, std::vector keys) const { auto cl = options.get_consistency(); - auto json_cache = maybe_prepare_json_cache(options); - auto keys = build_partition_keys(options, json_cache); auto ranges = create_clustering_ranges(options, json_cache); auto f = make_ready_future(s); @@ -272,24 +270,44 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs _restrictions->validate_primary_key(options); if (has_conditions()) { - return execute_with_condition(qp, qs, options); + co_return co_await execute_with_condition(qp, qs, options); } - return execute_without_condition(qp, qs, options).then([] (coordinator_result<> res) { - if (!res) { - return make_ready_future<::shared_ptr>( - seastar::make_shared(std::move(res).assume_error())); + json_cache_opt json_cache = maybe_prepare_json_cache(options); + std::vector keys = build_partition_keys(options, json_cache); + + bool keys_size_one = keys.size() == 1; + auto token = dht::token(); + if (keys_size_one) { + token = keys[0].start()->value().token(); + } + + auto res = co_await execute_without_condition(qp, qs, options, json_cache, std::move(keys)); + + if (!res) { + co_return seastar::make_shared(std::move(res).assume_error()); + } + + auto result = seastar::make_shared(); + if (keys_size_one) { + auto&& table = s->table(); + if (_may_use_token_aware_routing && table.uses_tablets()) { + auto erm = table.get_effective_replication_map(); + auto tablet_info = erm->check_locality(token); + if (tablet_info.has_value()) { + result->add_tablet_info(tablet_info->tablet_replicas, tablet_info->token_range); + } } - return make_ready_future<::shared_ptr>( - ::shared_ptr{}); - }); + } + + co_return std::move(result); } future> -modification_statement::execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options) const { +modification_statement::execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options, json_cache_opt& json_cache, std::vector keys) const { auto cl = options.get_consistency(); auto timeout = db::timeout_clock::now() + get_timeout(qs.get_client_state(), options); - return get_mutations(qp, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &qp, &qs] (auto mutations) { + return get_mutations(qp, options, timeout, false, options.get_timestamp(qs), qs, json_cache, std::move(keys)).then([this, cl, timeout, &qp, &qs] (auto mutations) { if (mutations.empty()) { return make_ready_future>(bo::success()); } @@ -328,17 +346,29 @@ modification_statement::execute_with_condition(query_processor& qp, service::que // modification in the list of CAS commands, since we're handling single-statement execution. request->add_row_update(*this, std::move(ranges), std::move(json_cache), options); - auto shard = service::storage_proxy::cas_shard(*s, request->key()[0].start()->value().as_decorated_key().token()); + auto token = request->key()[0].start()->value().as_decorated_key().token(); + + auto shard = service::storage_proxy::cas_shard(*s, token); if (shard != this_shard_id()) { return make_ready_future>( qp.bounce_to_shard(shard, std::move(const_cast(options).take_cached_pk_function_calls())) ); } + std::optional tablet_info = locator::tablet_routing_info{locator::tablet_replica_set(), std::pair()}; + + auto&& table = s->table(); + if (_may_use_token_aware_routing && table.uses_tablets()) { + auto erm = table.get_effective_replication_map(); + tablet_info = erm->check_locality(token); + } + return qp.proxy().cas(s, request, request->read_command(qp), request->key(), {read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()}, - cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request] (bool is_applied) { - return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied); + cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) { + auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied); + result->add_tablet_info(tablet_replicas, token_range); + return result; }); } @@ -509,6 +539,7 @@ modification_statement::prepare(data_dictionary::database db, prepare_context& c if (!prepared_stmt->has_conditions() && prepared_stmt->_restrictions.has_value()) { ctx.clear_pk_function_calls_cache(); } + prepared_stmt->_may_use_token_aware_routing = ctx.get_partition_key_bind_indexes(*schema).size() != 0; return prepared_stmt; } diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index 70b96b0dc6..84240bc12f 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -42,7 +42,7 @@ namespace raw { class modification_statement; } class modification_statement : public cql_statement_opt_metadata { public: const statement_type type; - + bool _may_use_token_aware_routing; private: const uint32_t _bound_terms; // If we have operation on list entries, such as adding or @@ -236,7 +236,7 @@ public: private: future> - execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options) const; + execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options, json_cache_opt& json_cache, std::vector keys) const; future<::shared_ptr> execute_with_condition(query_processor& qp, service::query_state& qs, const query_options& options) const; @@ -252,7 +252,7 @@ public: * @return vector of the mutations * @throws invalid_request_exception on invalid requests */ - future> get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs) const; + future> get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs, json_cache_opt& json_cache, std::vector keys) const; virtual json_cache_opt maybe_prepare_json_cache(const query_options& options) const; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 38856bf426..4fa5e102d7 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -290,8 +290,10 @@ public: } auto& qo = cql3::query_options::DEFAULT; auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout; + cql3::statements::modification_statement::json_cache_opt json_cache = modif_stmt->maybe_prepare_json_cache(qo); + std::vector keys = modif_stmt->build_partition_keys(qo, json_cache); - return modif_stmt->get_mutations(local_qp(), qo, timeout, false, qo.get_timestamp(*qs), *qs) + return modif_stmt->get_mutations(local_qp(), qo, timeout, false, qo.get_timestamp(*qs), *qs, json_cache, keys) .finally([qs, modif_stmt = std::move(modif_stmt)] {}); } From 207d673ad633699d35003519fefc581b416b341d Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 7 Nov 2023 10:21:49 +0100 Subject: [PATCH 7/8] Add tests for sending tablet info --- test/boost/cql_query_test.cc | 140 +++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 33068d68c4..93bf0897a5 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -5707,3 +5707,143 @@ SEASTAR_TEST_CASE(test_setting_synchronous_updates_property) { ); }); } + +static +cql_test_config tablet_cql_test_config() { + cql_test_config c; + c.db_config->experimental_features({ + db::experimental_features_t::feature::TABLETS, + db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES, + }, db::config::config_source::CommandLine); + c.db_config->consistent_cluster_management(true); + return c; +} + +static +bool has_tablet_routing(::shared_ptr result) { + auto custom_payload = result->custom_payload(); + if (!custom_payload.has_value() || custom_payload->find("tablet_replicas") == custom_payload->end() || custom_payload->find("token_range") == custom_payload->end()) { + return false; + } + return true; +} + +SEASTAR_TEST_CASE(test_sending_tablet_info_unprepared_insert) { + BOOST_ASSERT(smp::count == 2); + return do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get(); + e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get(); + + smp::submit_to(0, [&] { + return seastar::async([&] { + auto result = e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + + smp::submit_to(1, [&] { + return seastar::async([&] { + auto result = e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + }, tablet_cql_test_config()); +} + +SEASTAR_TEST_CASE(test_sending_tablet_info_unprepared_select) { + return do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get(); + e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get(); + e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get(); + + smp::submit_to(0, [&] { + return seastar::async([&] { + auto result = e.execute_cql("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = 1;").get(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + + smp::submit_to(1, [&] { + return seastar::async([&] { + auto result = e.execute_cql("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = 1;").get(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + }, tablet_cql_test_config()); +} + +SEASTAR_TEST_CASE(test_sending_tablet_info_insert) { + return do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get(); + e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get(); + auto insert = e.prepare("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (?, ?, ?);").get0(); + + std::vector raw_values; + raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{1}))); + raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{2}))); + raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{3}))); + + const auto sptr = e.local_db().find_schema("ks_tablet", "test_tablet"); + + auto pk = partition_key::from_singular(*sptr, int32_t(1)); + + unsigned local_shard = sptr->table().shard_of(dht::get_token(*sptr, pk.view())); + + smp::submit_to(local_shard, [&] { + return seastar::async([&] { + auto result = e.execute_prepared(insert, raw_values).get0(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + + std::vector raw_values2; + raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{2}))); + raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{3}))); + raw_values2.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{4}))); + + auto pk2 = partition_key::from_singular(*sptr, int32_t(2)); + + unsigned local_shard2 = sptr->table().shard_of(dht::get_token(*sptr, pk2.view())); + unsigned foreign_shard = (local_shard2 + 1) % smp::count; + + smp::submit_to(foreign_shard, [&] { + return seastar::async([&] { + auto result = e.execute_prepared(insert, raw_values2).get0(); + BOOST_ASSERT(has_tablet_routing(result)); + }); + }).get(); + }, tablet_cql_test_config()); +} + +SEASTAR_TEST_CASE(test_sending_tablet_info_select) { + return do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create keyspace ks_tablet with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 8};").get(); + e.execute_cql("create table ks_tablet.test_tablet (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get(); + e.execute_cql("insert into ks_tablet.test_tablet (pk, ck, v) VALUES (1, 2, 3);").get(); + + auto select = e.prepare("select pk, ck, v FROM ks_tablet.test_tablet WHERE pk = ?;").get0(); + std::vector raw_values; + raw_values.emplace_back(cql3::raw_value::make_value(int32_type->decompose(int32_t{1}))); + + const auto sptr = e.local_db().find_schema("ks_tablet", "test_tablet"); + + auto pk = partition_key::from_singular(*sptr, int32_t(1)); + + unsigned local_shard = sptr->table().shard_of(dht::get_token(*sptr, pk.view())); + unsigned foreign_shard = (local_shard + 1) % smp::count; + + smp::submit_to(local_shard, [&] { + return seastar::async([&] { + auto result = e.execute_prepared(select, raw_values).get0(); + BOOST_ASSERT(!has_tablet_routing(result)); + }); + }).get(); + + smp::submit_to(foreign_shard, [&] { + return seastar::async([&] { + auto result = e.execute_prepared(select, raw_values).get0(); + BOOST_ASSERT(has_tablet_routing(result)); + }); + }).get(); + }, tablet_cql_test_config()); +} From ac51c417ea652efd0056f08aedce5998e4518d75 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 21 Nov 2023 20:25:27 +0100 Subject: [PATCH 8/8] docs: add documentation about sending tablet info to protocol extensions --- docs/dev/protocol-extensions.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/dev/protocol-extensions.md b/docs/dev/protocol-extensions.md index 74a379333e..60feae653f 100644 --- a/docs/dev/protocol-extensions.md +++ b/docs/dev/protocol-extensions.md @@ -180,3 +180,22 @@ The string map in the SUPPORTED response will contain the following parameters: - `ERROR_CODE`: a 32-bit signed decimal integer which Scylla will use as the error code for the rate limit exception. + +## Sending tablet info to the drivers + +This extension adds support for sending tablet info to the drivers if the +request was routed to the wrong node/shard. + +There is a need for sending tablet info to the drivers so they can be +tablet aware. +For the best performance we want to get this info lazily only when it is +needed. + +The info is send when driver asks about the information that the specific +tablet contains and it is directed to the wrong node/shard so it could +use that information for every subsequent query. +If we send the query to the wrong node/shard, we want to send the RESULT +message with additional information about the tablet in `custom_payload`: + + - `tablet_replicas` - information about tablet replicas, for every replica there is information about the host and shard. + - `token_range` - information about token range for that tablet in format `(first_token, last_token]`.