diff --git a/gms/feature.hh b/gms/feature.hh index c2ae7a5c19..e32da7eb0b 100644 --- a/gms/feature.hh +++ b/gms/feature.hh @@ -145,6 +145,7 @@ extern const std::string_view PER_TABLE_CACHING; extern const std::string_view DIGEST_FOR_NULL_VALUES; extern const std::string_view CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX; extern const std::string_view ALTERNATOR_STREAMS; +extern const std::string_view RANGE_SCAN_DATA_VARIANT; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index b3c1e5c568..724734b164 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -64,6 +64,7 @@ constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING"; constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES"; constexpr std::string_view features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX = "CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX"; constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS"; +constexpr std::string_view features::RANGE_SCAN_DATA_VARIANT = "RANGE_SCAN_DATA_VARIANT"; static logging::logger logger("features"); @@ -90,6 +91,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg) , _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES) , _correct_idx_token_in_secondary_index_feature(*this, features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX) , _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS) + , _range_scan_data_variant(*this, features::RANGE_SCAN_DATA_VARIANT) {} feature_config feature_config_from_db_config(db::config& cfg, std::set disabled) { @@ -193,6 +195,7 @@ std::set feature_service::known_feature_set() { gms::features::DIGEST_FOR_NULL_VALUES, gms::features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX, gms::features::ALTERNATOR_STREAMS, + gms::features::RANGE_SCAN_DATA_VARIANT, }; for (const sstring& s : _config._disabled_features) { @@ -274,6 +277,7 @@ void feature_service::enable(const std::set& list) { std::ref(_digest_for_null_values_feature), std::ref(_correct_idx_token_in_secondary_index_feature), std::ref(_alternator_streams_feature), + std::ref(_range_scan_data_variant), }) { if (list.contains(f.name())) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 3644220f30..6beb925f4c 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -94,6 +94,7 @@ private: gms::feature _digest_for_null_values_feature; gms::feature _correct_idx_token_in_secondary_index_feature; gms::feature _alternator_streams_feature; + gms::feature _range_scan_data_variant; public: bool cluster_supports_user_defined_functions() const { @@ -170,6 +171,12 @@ public: bool cluster_supports_alternator_streams() const { return bool(_alternator_streams_feature); } + + // Range scans have a data variant, which produces query::result directly, + // instead of through the intermediate reconcilable_result format. + bool cluster_supports_range_scan_data_variant() const { + return bool(_range_scan_data_variant); + } }; } // namespace gms diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 1b7aebb265..5059915ccc 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -24,6 +24,9 @@ #include "multishard_mutation_query.hh" #include "database.hh" #include "db/config.hh" +#include "query-result-writer.hh" + +#include #include @@ -60,7 +63,7 @@ using foreign_unique_ptr = foreign_ptr>; /// /// Note: /// 1) Each step can only be started when the previous phase has finished. -/// 2) This usage is implemented in the `do_query_mutations()` function below. +/// 2) This usage is implemented in the `do_query()` function below. /// 3) Both, `read_context::lookup_readers()` and `read_context::save_readers()` /// knows to do nothing when the query is not stateful and just short /// circuit. @@ -589,18 +592,23 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu namespace { -using consume_result = std::tuple, reconcilable_result>; +template +using consume_result = std::tuple, ResultType>; +template +using compact_for_result_state = compact_for_query_state; + +template struct page_consume_result { std::optional last_ckey; - reconcilable_result result; + typename ResultBuilder::result_type result; flat_mutation_reader::tracked_buffer unconsumed_fragments; - lw_shared_ptr compaction_state; + lw_shared_ptr> compaction_state; - page_consume_result(consume_result&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments, - lw_shared_ptr&& compaction_state) + page_consume_result(consume_result&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments, + lw_shared_ptr>&& compaction_state) : last_ckey(std::get>(std::move(result))) - , result(std::get(std::move(result))) + , result(std::get(std::move(result))) , unconsumed_fragments(std::move(unconsumed_fragments)) , compaction_state(std::move(compaction_state)) { } @@ -608,62 +616,157 @@ struct page_consume_result { } // anonymous namespace -static future do_query_mutations( +template +future> read_page( + shared_ptr ctx, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout, + ResultBuilder&& result_builder) { + auto ms = mutation_source([&] (schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding, + mutation_reader::forwarding fwd_mr) { + return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr); + }); + auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges, + cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); + + auto compaction_state = make_lw_shared>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), + cmd.partition_limit); + + auto result = co_await query::consume_page(reader, compaction_state, cmd.slice, std::move(result_builder), cmd.get_row_limit(), + cmd.partition_limit, cmd.timestamp, timeout, *cmd.max_result_size); + co_return page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)); +} + +template +future do_query( distributed& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, - query::result_memory_accounter&& accounter) { - return do_with(seastar::make_shared(db, s, cmd, ranges, trace_state), [&db, s, &cmd, &ranges, trace_state, timeout, - accounter = std::move(accounter)] (shared_ptr& ctx) mutable { - return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout, - accounter = std::move(accounter)] () mutable { - auto ms = mutation_source([&] (schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding, - mutation_reader::forwarding fwd_mr) { - return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr); - }); - auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges, - cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); + ResultBuilder&& result_builder) { + auto ctx = seastar::make_shared(db, s, cmd, ranges, trace_state); - auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), - cmd.partition_limit); + co_await ctx->lookup_readers(); - return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( - flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { - auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); - return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, - timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable { - return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); - }); - }).then_wrapped([&ctx] (future&& result_fut) { - if (result_fut.failed()) { - return make_exception_future(std::move(result_fut.get_exception())); - } + std::exception_ptr ex; - auto [last_ckey, result, unconsumed_buffer, compaction_state] = result_fut.get0(); - if (!compaction_state->are_limits_reached() && !result.is_short_read()) { - return make_ready_future(std::move(result)); - } + try { + auto [last_ckey, result, unconsumed_buffer, compaction_state] = co_await read_page(ctx, s, cmd, ranges, trace_state, timeout, + std::move(result_builder)); - return ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), - std::move(last_ckey)).then_wrapped([result = std::move(result)] (future<>&&) mutable { - return make_ready_future(std::move(result)); - }); - }).finally([&ctx] { - return ctx->stop(); - }); - }); - }); + if (compaction_state->are_limits_reached() || result.is_short_read()) { + co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey)); + } + + co_await ctx->stop(); + co_return std::move(result); + } catch (...) { + ex = std::current_exception(); + } + + co_await ctx->stop(); + + std::rethrow_exception(std::move(ex)); } +template +static future>, cache_temperature>> do_query_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout, + std::function result_builder_factory) { + if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { + co_return std::tuple( + make_foreign(make_lw_shared()), + db.local().find_column_family(s).get_global_cache_hit_rate()); + } + + auto& local_db = db.local(); + auto& stats = local_db.get_stats(); + const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); + + try { + auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed); + + auto result_builder = result_builder_factory(std::move(accounter)); + + auto result = co_await do_query(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(result_builder)); + + ++stats.total_reads; + stats.short_mutation_queries += bool(result.is_short_read()); + auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); + co_return std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate); + } catch (...) { + ++stats.total_reads_failed; + throw; + } +} + +namespace { + +class mutation_query_result_builder { +public: + using result_type = reconcilable_result; + static constexpr emit_only_live_rows only_live = emit_only_live_rows::no; + +private: + reconcilable_result_builder _builder; + +public: + mutation_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) + : _builder(s, slice, std::move(accounter)) { } + + void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } + void consume(tombstone t) { _builder.consume(t); } + stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } + stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } + result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } +}; + +class data_query_result_builder { +public: + using result_type = query::result; + static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes; + +private: + std::unique_ptr _res_builder; + query_result_builder _builder; + +public: + data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter) + : _res_builder(std::make_unique(slice, opts, std::move(accounter))) + , _builder(s, *_res_builder) { } + + void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } + void consume(tombstone t) { _builder.consume(t); } + stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } + stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } + result_type consume_end_of_stream() { + _builder.consume_end_of_stream(); + return _res_builder->build(); + } +}; + +} // anonymous namespace + future>, cache_temperature>> query_mutations_on_all_shards( distributed& db, schema_ptr s, @@ -671,31 +774,28 @@ future>, cache_tempera const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { - if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { - return make_ready_future>, cache_temperature>>( - std::tuple( - make_foreign(make_lw_shared()), - db.local().find_column_family(s).get_global_cache_hit_rate())); + if (cmd.slice.options.contains(query::partition_slice::option::reversed)) { + throw std::runtime_error("query_mutations_on_all_shards(): reverse range scans are not supported"); } - - const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); - return db.local().get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed).then([&, s = std::move(s), - trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable { - return do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)).then_wrapped( - [&db, s = std::move(s)] (future&& f) { - auto& local_db = db.local(); - auto& stats = local_db.get_stats(); - if (f.failed()) { - ++stats.total_reads_failed; - return make_exception_future>, cache_temperature>>(f.get_exception()); - } else { - ++stats.total_reads; - auto result = f.get0(); - stats.short_mutation_queries += bool(result.is_short_read()); - auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); - return make_ready_future>, cache_temperature>>( - std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate)); - } - }); + return do_query_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout, + [s, &cmd] (query::result_memory_accounter&& accounter) { + return mutation_query_result_builder(*s, cmd.slice, std::move(accounter)); + }); +} + +future>, cache_temperature>> query_data_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + query::result_options opts, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout) { + if (cmd.slice.options.contains(query::partition_slice::option::reversed)) { + throw std::runtime_error("query_data_on_all_shards(): reverse range scans are not supported"); + } + return do_query_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout, + [s, &cmd, opts] (query::result_memory_accounter&& accounter) { + return data_query_result_builder(*s, cmd.slice, opts, std::move(accounter)); }); } diff --git a/multishard_mutation_query.hh b/multishard_mutation_query.hh index f0baed9c78..7d9a1c0233 100644 --- a/multishard_mutation_query.hh +++ b/multishard_mutation_query.hh @@ -68,3 +68,16 @@ future>, cache_tempera const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); + +/// Run the data query on all shards. +/// +/// Identical to `query_mutations_on_all_shards()` except that it builds results +/// in the `query::result` format instead of in the `reconcilable_result` one. +future>, cache_temperature>> query_data_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + query::result_options opts, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout); diff --git a/mutation_partition.cc b/mutation_partition.cc index 00422b1c1c..0028fccc86 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1806,30 +1806,6 @@ mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) *this = std::move(tmp); } -// Adds mutation to query::result. -class mutation_querier { - const schema& _schema; - query::result_memory_accounter& _memory_accounter; - query::result::partition_writer _pw; - ser::qr_partition__static_row__cells _static_cells_wr; - bool _live_data_in_static_row{}; - uint64_t _live_clustering_rows = 0; - std::optional> _rows_wr; -private: - void query_static_row(const row& r, tombstone current_tombstone); - void prepare_writers(); -public: - mutation_querier(const schema& s, query::result::partition_writer pw, - query::result_memory_accounter& memory_accounter); - void consume(tombstone) { } - // Requires that sr.has_any_live_data() - stop_iteration consume(static_row&& sr, tombstone current_tombstone); - // Requires that cr.has_any_live_data() - stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); - stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } - uint64_t consume_end_of_stream(); -}; - mutation_querier::mutation_querier(const schema& s, query::result::partition_writer pw, query::result_memory_accounter& memory_accounter) : _schema(s) @@ -1947,50 +1923,43 @@ uint64_t mutation_querier::consume_end_of_stream() { } } -class query_result_builder { - const schema& _schema; - query::result::builder& _rb; - std::optional _mutation_consumer; - stop_iteration _stop; -public: - query_result_builder(const schema& s, query::result::builder& rb) - : _schema(s), _rb(rb) - { } +query_result_builder::query_result_builder(const schema& s, query::result::builder& rb) + : _schema(s), _rb(rb) +{ } - void consume_new_partition(const dht::decorated_key& dk) { - _mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter())); - } +void query_result_builder::consume_new_partition(const dht::decorated_key& dk) { + _mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter())); +} - void consume(tombstone t) { - _mutation_consumer->consume(t); - } - stop_iteration consume(static_row&& sr, tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(sr), t); - return _stop; - } - stop_iteration consume(clustering_row&& cr, row_tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(cr), t); - return _stop; - } - stop_iteration consume(range_tombstone&& rt) { - _stop = _mutation_consumer->consume(std::move(rt)); - return _stop; - } +void query_result_builder::consume(tombstone t) { + _mutation_consumer->consume(t); +} +stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) { + _stop = _mutation_consumer->consume(std::move(sr), t); + return _stop; +} +stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) { + _stop = _mutation_consumer->consume(std::move(cr), t); + return _stop; +} +stop_iteration query_result_builder::consume(range_tombstone&& rt) { + _stop = _mutation_consumer->consume(std::move(rt)); + return _stop; +} - stop_iteration consume_end_of_partition() { - auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); - if (live_rows_in_partition > 0 && !_stop) { - _stop = _rb.memory_accounter().check(); - } - if (_stop) { - _rb.mark_as_short_read(); - } - return _stop; +stop_iteration query_result_builder::consume_end_of_partition() { + auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); + if (live_rows_in_partition > 0 && !_stop) { + _stop = _rb.memory_accounter().check(); } + if (_stop) { + _rb.mark_as_short_read(); + } + return _stop; +} - void consume_end_of_stream() { - } -}; +void query_result_builder::consume_end_of_stream() { +} future<> data_query( schema_ptr s, diff --git a/query-request.hh b/query-request.hh index feb818ca3c..31d404a81a 100644 --- a/query-request.hh +++ b/query-request.hh @@ -142,6 +142,10 @@ public: // key restrictions and the partition doesn't have any rows matching // the restrictions, see #589. This flag overrides this behavior. always_return_static_content, + // Use the new data range scan variant, which builds query::result + // directly, bypassing the intermediate reconcilable_result format used + // in pre 4.5 range scans. + range_scan_data_variant, }; using option_set = enum_set>; + option::always_return_static_content, + option::range_scan_data_variant>>; clustering_row_ranges _row_ranges; public: column_id_vector static_columns; // TODO: consider using bitmap diff --git a/query-result-writer.hh b/query-result-writer.hh index a0a223b313..c2cecd8e44 100644 --- a/query-result-writer.hh +++ b/query-result-writer.hh @@ -194,3 +194,49 @@ public: }; } + +class row; +class static_row; +class clustering_row; +class range_tombstone; + +// Adds mutation to query::result. +class mutation_querier { + const schema& _schema; + query::result_memory_accounter& _memory_accounter; + query::result::partition_writer _pw; + ser::qr_partition__static_row__cells _static_cells_wr; + bool _live_data_in_static_row{}; + uint64_t _live_clustering_rows = 0; + std::optional> _rows_wr; +private: + void query_static_row(const row& r, tombstone current_tombstone); + void prepare_writers(); +public: + mutation_querier(const schema& s, query::result::partition_writer pw, + query::result_memory_accounter& memory_accounter); + void consume(tombstone) { } + // Requires that sr.has_any_live_data() + stop_iteration consume(static_row&& sr, tombstone current_tombstone); + // Requires that cr.has_any_live_data() + stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); + stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } + uint64_t consume_end_of_stream(); +}; + +class query_result_builder { + const schema& _schema; + query::result::builder& _rb; + std::optional _mutation_consumer; + stop_iteration _stop; +public: + query_result_builder(const schema& s, query::result::builder& rb); + + void consume_new_partition(const dht::decorated_key& dk); + void consume(tombstone t); + stop_iteration consume(static_row&& sr, tombstone t, bool); + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool); + stop_iteration consume(range_tombstone&& rt); + stop_iteration consume_end_of_partition(); + void consume_end_of_stream(); +}; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 8be26bb72d..add6883d1b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3894,11 +3894,11 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr>, cache_temperature>&& r_ht) { + return query_nonsingular_data_locally(s, cmd, {pr}, opts, trace_state, timeout).then( + [trace_state = std::move(trace_state)] (rpc::tuple>, cache_temperature>&& r_ht) { auto&& [r, ht] = r_ht; tracing::trace(trace_state, "Querying is done"); - return make_ready_future>, cache_temperature>>( - rpc::tuple(::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->get_row_limit(), cmd->partition_limit, opts))), ht)); + return make_ready_future>, cache_temperature>>(rpc::tuple(std::move(r), ht)); }); } } @@ -4014,6 +4014,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t std::unordered_map> ranges_per_exec; const auto tmptr = get_token_metadata_ptr(); + if (_features.cluster_supports_range_scan_data_variant()) { + cmd->slice.options.set(); + } + const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) { auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token))); return it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); @@ -5189,6 +5193,22 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, }); } +future>, cache_temperature>> +storage_proxy::query_nonsingular_data_locally(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& prs, + query::result_options opts, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) { + auto ranges = std::move(prs); + auto local_cmd = cmd; + rpc::tuple>, cache_temperature> ret; + if (local_cmd->slice.options.contains(query::partition_slice::option::range_scan_data_variant)) { + ret = co_await query_data_on_all_shards(_db, std::move(s), *local_cmd, ranges, opts, std::move(trace_state), timeout); + } else { + auto res = co_await query_mutations_on_all_shards(_db, s, *local_cmd, ranges, std::move(trace_state), timeout); + ret = rpc::tuple(make_foreign(make_lw_shared(to_data_query_result(std::move(*std::get<0>(res)), std::move(s), local_cmd->slice, + local_cmd->get_row_limit(), local_cmd->partition_limit, opts))), std::get<1>(res)); + } + co_return ret; +} + future<> storage_proxy::start_hints_manager(shared_ptr gossiper_ptr, shared_ptr ss_ptr) { future<> f = make_ready_future<>(); if (!_hints_manager.is_disabled_for_all()) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 98859d0d31..06628d0d3d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -413,6 +413,9 @@ private: future>, cache_temperature>> query_nonsingular_mutations_locally( schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state, clock_type::time_point timeout); + future>, cache_temperature>> query_nonsingular_data_locally( + schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& pr, query::result_options opts, + tracing::trace_state_ptr trace_state, clock_type::time_point timeout); future<> mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr trace_state, service_permit permit); diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 6c3dda060d..9b12b52f1f 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4609,6 +4609,11 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0(); + auto clean_up_sched_groups = defer([dbcfg = *cfg.dbcfg] { + seastar::destroy_scheduling_group(dbcfg.statement_scheduling_group).get0(); + seastar::destroy_scheduling_group(dbcfg.streaming_scheduling_group).get0(); + }); + do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("CREATE TABLE test (pk int, ck int, v text, PRIMARY KEY (pk, ck));").get(); auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0(); @@ -4833,3 +4838,46 @@ SEASTAR_THREAD_TEST_CASE(test_twcs_optimal_query_path) { .is_rows().with_size(1); }).get(); } + +SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) { + cql_test_config cfg; + + cfg.db_config->max_memory_for_unlimited_query_soft_limit(1024 * 1024, utils::config_file::config_source::CommandLine); + cfg.db_config->max_memory_for_unlimited_query_hard_limit(1024 * 1024, utils::config_file::config_source::CommandLine); + + cfg.dbcfg.emplace(); + cfg.dbcfg->available_memory = memory::stats().total_memory(); + cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); + + auto statement_sched_group = cfg.dbcfg->statement_scheduling_group; + + auto clean_up_sched_groups = defer([statement_sched_group] { + seastar::destroy_scheduling_group(statement_sched_group).get0(); + }); + + do_with_cql_env_thread([] (cql_test_env& e) { + auto now_nano = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); + e.execute_cql("CREATE TABLE tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))").get(); + + const unsigned num_rows = 20; + const sstring val(100 * 1024, 'a'); + const auto id = e.prepare(format("INSERT INTO tbl (pk, ck, v) VALUES (0, ?, '{}')", val)).get0(); + for (int ck = 0; ck < num_rows; ++ck) { + e.execute_prepared(id, {cql3::raw_value::make_value(int32_type->decompose(ck))}).get(); + } + + { + testlog.info("Single partition scan"); + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()}); + assert_that(e.execute_cql("SELECT pk, ck FROM tbl WHERE pk = 0", std::move(qo)).get0()).is_rows().with_size(num_rows); + } + + { + testlog.info("Full scan"); + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()}); + assert_that(e.execute_cql("SELECT pk, ck FROM tbl", std::move(qo)).get0()).is_rows().with_size(num_rows); + } + }, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get(); +} diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 5953959318..3dade7f33b 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -685,9 +685,9 @@ future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_c return single_node_cql_env::do_with(func, std::move(cfg_in)); } -future<> do_with_cql_env_thread(std::function func, cql_test_config cfg_in) { - return single_node_cql_env::do_with([func = std::move(func)] (auto& e) { - return seastar::async([func = std::move(func), &e] { +future<> do_with_cql_env_thread(std::function func, cql_test_config cfg_in, thread_attributes thread_attr) { + return single_node_cql_env::do_with([func = std::move(func), thread_attr] (auto& e) { + return seastar::async(thread_attr, [func = std::move(func), &e] { return func(e); }); }, std::move(cfg_in)); diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index e3d22fe6c7..f114c27755 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -135,4 +135,4 @@ public: }; future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_config = {}); -future<> do_with_cql_env_thread(std::function func, cql_test_config = {}); +future<> do_with_cql_env_thread(std::function func, cql_test_config = {}, thread_attributes thread_attr = {});