From 29195f67f1ae0032d16359665fbd6da7387c60fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 10 Feb 2021 17:08:37 +0200 Subject: [PATCH 01/16] multishard_mutation_query: extract page reading logic into separate method The block of code moved also coincides with the scope in which the reader has to be alive, making the code more clear. --- multishard_mutation_query.cc | 59 +++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 1b7aebb265..51ff19d2cd 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -608,6 +608,40 @@ struct page_consume_result { } // anonymous namespace +static future read_page( + shared_ptr ctx, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout, + query::result_memory_accounter&& accounter) { + auto ms = mutation_source([&] (schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding, + mutation_reader::forwarding fwd_mr) { + return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr); + }); + auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges, + cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); + + auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), + cmd.partition_limit); + + return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( + flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { + auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); + return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, + timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable { + return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); + }); + }); +} + static future do_query_mutations( distributed& db, schema_ptr s, @@ -620,30 +654,7 @@ static future do_query_mutations( accounter = std::move(accounter)] (shared_ptr& ctx) mutable { return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout, accounter = std::move(accounter)] () mutable { - auto ms = mutation_source([&] (schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding, - mutation_reader::forwarding fwd_mr) { - return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr); - }); - auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges, - cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); - - auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), - cmd.partition_limit); - - return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( - flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { - auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); - return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, - timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable { - return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); - }); - }).then_wrapped([&ctx] (future&& result_fut) { + return read_page(ctx, s, cmd, ranges, trace_state, timeout, std::move(accounter)).then_wrapped([&ctx] (future&& result_fut) { if (result_fut.failed()) { return make_exception_future(std::move(result_fut.get_exception())); } From 8138bdb43426df17e5108d23cbf554829c0cf5eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 16:24:46 +0200 Subject: [PATCH 02/16] multishard_mutation_query: read_page(): convert to coroutine In preparation to generalizing it w.r.t. the result builder used. This change will be much simpler with the coroutine code. --- multishard_mutation_query.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 51ff19d2cd..fdc12360ec 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -25,6 +25,8 @@ #include "database.hh" #include "db/config.hh" +#include + #include #include @@ -632,14 +634,10 @@ static future read_page( auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), cmd.partition_limit); - return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( - flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { - auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); - return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, - timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable { - return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); - }); - }); + auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); + auto result = co_await query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, + timeout, *cmd.max_result_size); + co_return page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)); } static future do_query_mutations( From 5d856156983298e21c583ff2e4cd6aaa66c20e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 16:48:41 +0200 Subject: [PATCH 03/16] multishar_mutation_query: do_query_mutations(): convert to coroutine In preparation to generalizing it w.r.t. the result builder used. This change will be much simpler with the coroutine code. --- multishard_mutation_query.cc | 41 ++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index fdc12360ec..f539be120f 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -648,29 +648,28 @@ static future do_query_mutations( tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, query::result_memory_accounter&& accounter) { - return do_with(seastar::make_shared(db, s, cmd, ranges, trace_state), [&db, s, &cmd, &ranges, trace_state, timeout, - accounter = std::move(accounter)] (shared_ptr& ctx) mutable { - return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout, - accounter = std::move(accounter)] () mutable { - return read_page(ctx, s, cmd, ranges, trace_state, timeout, std::move(accounter)).then_wrapped([&ctx] (future&& result_fut) { - if (result_fut.failed()) { - return make_exception_future(std::move(result_fut.get_exception())); - } + auto ctx = seastar::make_shared(db, s, cmd, ranges, trace_state); - auto [last_ckey, result, unconsumed_buffer, compaction_state] = result_fut.get0(); - if (!compaction_state->are_limits_reached() && !result.is_short_read()) { - return make_ready_future(std::move(result)); - } + co_await ctx->lookup_readers(); - return ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), - std::move(last_ckey)).then_wrapped([result = std::move(result)] (future<>&&) mutable { - return make_ready_future(std::move(result)); - }); - }).finally([&ctx] { - return ctx->stop(); - }); - }); - }); + std::exception_ptr ex; + + try { + auto [last_ckey, result, unconsumed_buffer, compaction_state] = co_await read_page(ctx, s, cmd, ranges, trace_state, timeout, std::move(accounter)); + + if (compaction_state->are_limits_reached() || result.is_short_read()) { + co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey)); + } + + co_await ctx->stop(); + co_return std::move(result); + } catch (...) { + ex = std::current_exception(); + } + + co_await ctx->stop(); + + std::rethrow_exception(std::move(ex)); } future>, cache_temperature>> query_mutations_on_all_shards( From b0b620b501c3d8f1ea9fc62e59fc3b3d0b2cecc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 17:14:34 +0200 Subject: [PATCH 04/16] multishard_mutation_query: query_mutations_on_all_shards(): convert to coroutine In preparation to generalizing it w.r.t. the result builder used. This change will be much simpler with the coroutine code. --- multishard_mutation_query.cc | 40 ++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index f539be120f..00cb783563 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -680,30 +680,26 @@ future>, cache_tempera tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { - return make_ready_future>, cache_temperature>>( - std::tuple( + co_return std::tuple( make_foreign(make_lw_shared()), - db.local().find_column_family(s).get_global_cache_hit_rate())); + db.local().find_column_family(s).get_global_cache_hit_rate()); } + auto& local_db = db.local(); + auto& stats = local_db.get_stats(); const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); - return db.local().get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed).then([&, s = std::move(s), - trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable { - return do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)).then_wrapped( - [&db, s = std::move(s)] (future&& f) { - auto& local_db = db.local(); - auto& stats = local_db.get_stats(); - if (f.failed()) { - ++stats.total_reads_failed; - return make_exception_future>, cache_temperature>>(f.get_exception()); - } else { - ++stats.total_reads; - auto result = f.get0(); - stats.short_mutation_queries += bool(result.is_short_read()); - auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); - return make_ready_future>, cache_temperature>>( - std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate)); - } - }); - }); + + try { + auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed); + + auto result = co_await do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)); + + ++stats.total_reads; + stats.short_mutation_queries += bool(result.is_short_read()); + auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); + co_return std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate); + } catch (...) { + ++stats.total_reads_failed; + throw; + } } From bddb0d35d605541e02761faf68a285dc3b49c247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 17:22:27 +0200 Subject: [PATCH 05/16] multishard_mutation_query: query_mutations_on_all_shards(): extract logic into new method In the next patches we are going to generalize the query logic w.r.t. the result builder used, so query_mutations_on_all_shards() will be just a facade parametrizing the actual query code with the right result builder. --- multishard_mutation_query.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 00cb783563..ce3575fa2f 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -672,7 +672,7 @@ static future do_query_mutations( std::rethrow_exception(std::move(ex)); } -future>, cache_temperature>> query_mutations_on_all_shards( +static future>, cache_temperature>> do_query_mutations_on_all_shards( distributed& db, schema_ptr s, const query::read_command& cmd, @@ -703,3 +703,13 @@ future>, cache_tempera throw; } } + +future>, cache_temperature>> query_mutations_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout) { + return do_query_mutations_on_all_shards(db, std::move(s), cmd, ranges, std::move(trace_state), timeout); +} From f19ab5cff13cff663414f9e0f3415ee07c07510e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 17:44:41 +0200 Subject: [PATCH 06/16] multishard_mutation_query: generalize query code w.r.t. the result builder used We want to add support to building `query::result` directly and reuse the code path we use to build reconcilable result currently for it. So templatize said code path on the result builder used. Since the different result builders don't have a source level compatible interface an adaptor class is used. --- multishard_mutation_query.cc | 85 ++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 23 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index ce3575fa2f..909040e0ef 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -62,7 +62,7 @@ using foreign_unique_ptr = foreign_ptr>; /// /// Note: /// 1) Each step can only be started when the previous phase has finished. -/// 2) This usage is implemented in the `do_query_mutations()` function below. +/// 2) This usage is implemented in the `do_query()` function below. /// 3) Both, `read_context::lookup_readers()` and `read_context::save_readers()` /// knows to do nothing when the query is not stateful and just short /// circuit. @@ -591,18 +591,23 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu namespace { -using consume_result = std::tuple, reconcilable_result>; +template +using consume_result = std::tuple, ResultType>; +template +using compact_for_result_state = compact_for_query_state; + +template struct page_consume_result { std::optional last_ckey; - reconcilable_result result; + typename ResultBuilder::result_type result; flat_mutation_reader::tracked_buffer unconsumed_fragments; - lw_shared_ptr compaction_state; + lw_shared_ptr> compaction_state; - page_consume_result(consume_result&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments, - lw_shared_ptr&& compaction_state) + page_consume_result(consume_result&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments, + lw_shared_ptr>&& compaction_state) : last_ckey(std::get>(std::move(result))) - , result(std::get(std::move(result))) + , result(std::get(std::move(result))) , unconsumed_fragments(std::move(unconsumed_fragments)) , compaction_state(std::move(compaction_state)) { } @@ -610,14 +615,15 @@ struct page_consume_result { } // anonymous namespace -static future read_page( +template +future> read_page( shared_ptr ctx, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, - query::result_memory_accounter&& accounter) { + ResultBuilder&& result_builder) { auto ms = mutation_source([&] (schema_ptr s, reader_permit permit, const dht::partition_range& pr, @@ -631,23 +637,23 @@ static future read_page( auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges, cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); - auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), + auto compaction_state = make_lw_shared>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), cmd.partition_limit); - auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); - auto result = co_await query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, - timeout, *cmd.max_result_size); - co_return page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)); + auto result = co_await query::consume_page(reader, compaction_state, cmd.slice, std::move(result_builder), cmd.get_row_limit(), + cmd.partition_limit, cmd.timestamp, timeout, *cmd.max_result_size); + co_return page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)); } -static future do_query_mutations( +template +future do_query( distributed& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, - query::result_memory_accounter&& accounter) { + ResultBuilder&& result_builder) { auto ctx = seastar::make_shared(db, s, cmd, ranges, trace_state); co_await ctx->lookup_readers(); @@ -655,7 +661,8 @@ static future do_query_mutations( std::exception_ptr ex; try { - auto [last_ckey, result, unconsumed_buffer, compaction_state] = co_await read_page(ctx, s, cmd, ranges, trace_state, timeout, std::move(accounter)); + auto [last_ckey, result, unconsumed_buffer, compaction_state] = co_await read_page(ctx, s, cmd, ranges, trace_state, timeout, + std::move(result_builder)); if (compaction_state->are_limits_reached() || result.is_short_read()) { co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey)); @@ -672,16 +679,18 @@ static future do_query_mutations( std::rethrow_exception(std::move(ex)); } -static future>, cache_temperature>> do_query_mutations_on_all_shards( +template +static future>, cache_temperature>> do_query_on_all_shards( distributed& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, - db::timeout_clock::time_point timeout) { + db::timeout_clock::time_point timeout, + std::function result_builder_factory) { if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { co_return std::tuple( - make_foreign(make_lw_shared()), + make_foreign(make_lw_shared()), db.local().find_column_family(s).get_global_cache_hit_rate()); } @@ -692,18 +701,45 @@ static future>, cache_ try { auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed); - auto result = co_await do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)); + auto result_builder = result_builder_factory(std::move(accounter)); + + auto result = co_await do_query(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(result_builder)); ++stats.total_reads; stats.short_mutation_queries += bool(result.is_short_read()); auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); - co_return std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate); + co_return std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate); } catch (...) { ++stats.total_reads_failed; throw; } } +namespace { + +class mutation_query_result_builder { +public: + using result_type = reconcilable_result; + static constexpr emit_only_live_rows only_live = emit_only_live_rows::no; + +private: + reconcilable_result_builder _builder; + +public: + mutation_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) + : _builder(s, slice, std::move(accounter)) { } + + void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } + void consume(tombstone t) { _builder.consume(t); } + stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } + stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } + result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } +}; + +} // anonymous namespace + future>, cache_temperature>> query_mutations_on_all_shards( distributed& db, schema_ptr s, @@ -711,5 +747,8 @@ future>, cache_tempera const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { - return do_query_mutations_on_all_shards(db, std::move(s), cmd, ranges, std::move(trace_state), timeout); + return do_query_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout, + [s, &cmd] (query::result_memory_accounter&& accounter) { + return mutation_query_result_builder(*s, cmd.slice, std::move(accounter)); + }); } From 950150c6df08a40c92222cf6eaedbbfd57b20126 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 22 Feb 2021 15:13:05 +0200 Subject: [PATCH 07/16] query_result_builder: make it a public type We will want to use it in multishard_mutation_query.cc. --- mutation_partition.cc | 47 +++++++----------------------------------- query-result-writer.hh | 46 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 39 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index 00422b1c1c..ade961d906 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1806,30 +1806,6 @@ mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) *this = std::move(tmp); } -// Adds mutation to query::result. -class mutation_querier { - const schema& _schema; - query::result_memory_accounter& _memory_accounter; - query::result::partition_writer _pw; - ser::qr_partition__static_row__cells _static_cells_wr; - bool _live_data_in_static_row{}; - uint64_t _live_clustering_rows = 0; - std::optional> _rows_wr; -private: - void query_static_row(const row& r, tombstone current_tombstone); - void prepare_writers(); -public: - mutation_querier(const schema& s, query::result::partition_writer pw, - query::result_memory_accounter& memory_accounter); - void consume(tombstone) { } - // Requires that sr.has_any_live_data() - stop_iteration consume(static_row&& sr, tombstone current_tombstone); - // Requires that cr.has_any_live_data() - stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); - stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } - uint64_t consume_end_of_stream(); -}; - mutation_querier::mutation_querier(const schema& s, query::result::partition_writer pw, query::result_memory_accounter& memory_accounter) : _schema(s) @@ -1947,37 +1923,31 @@ uint64_t mutation_querier::consume_end_of_stream() { } } -class query_result_builder { - const schema& _schema; - query::result::builder& _rb; - std::optional _mutation_consumer; - stop_iteration _stop; -public: - query_result_builder(const schema& s, query::result::builder& rb) + query_result_builder::query_result_builder(const schema& s, query::result::builder& rb) : _schema(s), _rb(rb) { } - void consume_new_partition(const dht::decorated_key& dk) { + void query_result_builder::consume_new_partition(const dht::decorated_key& dk) { _mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter())); } - void consume(tombstone t) { + void query_result_builder::consume(tombstone t) { _mutation_consumer->consume(t); } - stop_iteration consume(static_row&& sr, tombstone t, bool) { + stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) { _stop = _mutation_consumer->consume(std::move(sr), t); return _stop; } - stop_iteration consume(clustering_row&& cr, row_tombstone t, bool) { + stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) { _stop = _mutation_consumer->consume(std::move(cr), t); return _stop; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration query_result_builder::consume(range_tombstone&& rt) { _stop = _mutation_consumer->consume(std::move(rt)); return _stop; } - stop_iteration consume_end_of_partition() { + stop_iteration query_result_builder::consume_end_of_partition() { auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); if (live_rows_in_partition > 0 && !_stop) { _stop = _rb.memory_accounter().check(); @@ -1988,9 +1958,8 @@ public: return _stop; } - void consume_end_of_stream() { + void query_result_builder::consume_end_of_stream() { } -}; future<> data_query( schema_ptr s, diff --git a/query-result-writer.hh b/query-result-writer.hh index a0a223b313..c2cecd8e44 100644 --- a/query-result-writer.hh +++ b/query-result-writer.hh @@ -194,3 +194,49 @@ public: }; } + +class row; +class static_row; +class clustering_row; +class range_tombstone; + +// Adds mutation to query::result. +class mutation_querier { + const schema& _schema; + query::result_memory_accounter& _memory_accounter; + query::result::partition_writer _pw; + ser::qr_partition__static_row__cells _static_cells_wr; + bool _live_data_in_static_row{}; + uint64_t _live_clustering_rows = 0; + std::optional> _rows_wr; +private: + void query_static_row(const row& r, tombstone current_tombstone); + void prepare_writers(); +public: + mutation_querier(const schema& s, query::result::partition_writer pw, + query::result_memory_accounter& memory_accounter); + void consume(tombstone) { } + // Requires that sr.has_any_live_data() + stop_iteration consume(static_row&& sr, tombstone current_tombstone); + // Requires that cr.has_any_live_data() + stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); + stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } + uint64_t consume_end_of_stream(); +}; + +class query_result_builder { + const schema& _schema; + query::result::builder& _rb; + std::optional _mutation_consumer; + stop_iteration _stop; +public: + query_result_builder(const schema& s, query::result::builder& rb); + + void consume_new_partition(const dht::decorated_key& dk); + void consume(tombstone t); + stop_iteration consume(static_row&& sr, tombstone t, bool); + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool); + stop_iteration consume(range_tombstone&& rt); + stop_iteration consume_end_of_partition(); + void consume_end_of_stream(); +}; From df0f501ba26e7a57aa3b9ce391128d96a152b9b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 22 Feb 2021 15:13:38 +0200 Subject: [PATCH 08/16] mutation_partition.cc: fix indentation Left broken from the previous patch. --- mutation_partition.cc | 64 +++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index ade961d906..0028fccc86 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1923,43 +1923,43 @@ uint64_t mutation_querier::consume_end_of_stream() { } } - query_result_builder::query_result_builder(const schema& s, query::result::builder& rb) - : _schema(s), _rb(rb) - { } +query_result_builder::query_result_builder(const schema& s, query::result::builder& rb) + : _schema(s), _rb(rb) +{ } - void query_result_builder::consume_new_partition(const dht::decorated_key& dk) { - _mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter())); - } +void query_result_builder::consume_new_partition(const dht::decorated_key& dk) { + _mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter())); +} - void query_result_builder::consume(tombstone t) { - _mutation_consumer->consume(t); - } - stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(sr), t); - return _stop; - } - stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) { - _stop = _mutation_consumer->consume(std::move(cr), t); - return _stop; - } - stop_iteration query_result_builder::consume(range_tombstone&& rt) { - _stop = _mutation_consumer->consume(std::move(rt)); - return _stop; - } +void query_result_builder::consume(tombstone t) { + _mutation_consumer->consume(t); +} +stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) { + _stop = _mutation_consumer->consume(std::move(sr), t); + return _stop; +} +stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) { + _stop = _mutation_consumer->consume(std::move(cr), t); + return _stop; +} +stop_iteration query_result_builder::consume(range_tombstone&& rt) { + _stop = _mutation_consumer->consume(std::move(rt)); + return _stop; +} - stop_iteration query_result_builder::consume_end_of_partition() { - auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); - if (live_rows_in_partition > 0 && !_stop) { - _stop = _rb.memory_accounter().check(); - } - if (_stop) { - _rb.mark_as_short_read(); - } - return _stop; +stop_iteration query_result_builder::consume_end_of_partition() { + auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); + if (live_rows_in_partition > 0 && !_stop) { + _stop = _rb.memory_accounter().check(); } + if (_stop) { + _rb.mark_as_short_read(); + } + return _stop; +} - void query_result_builder::consume_end_of_stream() { - } +void query_result_builder::consume_end_of_stream() { +} future<> data_query( schema_ptr s, From 034cb8132328c5ee6cbe6326e16804b182c69df5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 18 Feb 2021 18:28:11 +0200 Subject: [PATCH 09/16] multishard_mutation_query: add query_data_on_all_shards() A data query variant of the existing `query_mutations_on_all_shards()`. This variant builds a `query::result`, instead of `reconcilable_result`. This is actually the result format coordinators want when executing range scans, the reason for using the reconcilable result for these queries is historic, and it just introduces an unnecessary intermediate format. This new method allows the storage proxy to skip this intermediate format and the associated conversion to `query::result`, just like we do for single partition queries. Reverse queries are refused because they are not supported on the client API (CQL) level anyway and hence it is unspecified how they should work and more importantly: they are not tested. --- multishard_mutation_query.cc | 44 ++++++++++++++++++++++++++++++++++++ multishard_mutation_query.hh | 13 +++++++++++ 2 files changed, 57 insertions(+) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 909040e0ef..649ef82320 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -24,6 +24,7 @@ #include "multishard_mutation_query.hh" #include "database.hh" #include "db/config.hh" +#include "query-result-writer.hh" #include @@ -738,6 +739,32 @@ public: result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } }; +class data_query_result_builder { +public: + using result_type = query::result; + static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes; + +private: + std::unique_ptr _res_builder; + query_result_builder _builder; + +public: + data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter) + : _res_builder(std::make_unique(slice, opts, std::move(accounter))) + , _builder(s, *_res_builder) { } + + void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } + void consume(tombstone t) { _builder.consume(t); } + stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } + stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } + result_type consume_end_of_stream() { + _builder.consume_end_of_stream(); + return _res_builder->build(); + } +}; + } // anonymous namespace future>, cache_temperature>> query_mutations_on_all_shards( @@ -752,3 +779,20 @@ future>, cache_tempera return mutation_query_result_builder(*s, cmd.slice, std::move(accounter)); }); } + +future>, cache_temperature>> query_data_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + query::result_options opts, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout) { + if (cmd.slice.options.contains(query::partition_slice::option::reversed)) { + throw std::runtime_error("query_data_on_all_shards(): reverse range scans are not supported"); + } + return do_query_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout, + [s, &cmd, opts] (query::result_memory_accounter&& accounter) { + return data_query_result_builder(*s, cmd.slice, opts, std::move(accounter)); + }); +} diff --git a/multishard_mutation_query.hh b/multishard_mutation_query.hh index f0baed9c78..7d9a1c0233 100644 --- a/multishard_mutation_query.hh +++ b/multishard_mutation_query.hh @@ -68,3 +68,16 @@ future>, cache_tempera const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); + +/// Run the data query on all shards. +/// +/// Identical to `query_mutations_on_all_shards()` except that it builds results +/// in the `query::result` format instead of in the `reconcilable_result` one. +future>, cache_temperature>> query_data_on_all_shards( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + query::result_options opts, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout); From 0f0c3be63e4f1bfbcdde988a7bdd7f5fa54d9c2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 1 Mar 2021 10:35:23 +0200 Subject: [PATCH 10/16] multishard_mutation_query: query_mutations_on_all_shards(): refuse reverse queries Refuse reverse queries just like in the new `query_data_on_all_shards()`. The reason is the same, reverse range scans are not supported on the client API level and hence they are underspecified and more importantly: not tested. --- multishard_mutation_query.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 649ef82320..5059915ccc 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -774,6 +774,9 @@ future>, cache_tempera const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + if (cmd.slice.options.contains(query::partition_slice::option::reversed)) { + throw std::runtime_error("query_mutations_on_all_shards(): reverse range scans are not supported"); + } return do_query_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout, [s, &cmd] (query::result_memory_accounter&& accounter) { return mutation_query_result_builder(*s, cmd.slice, std::move(accounter)); From 5c84aa52db4b1537b65dbb74e63d62674917d6c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 22 Feb 2021 17:33:41 +0200 Subject: [PATCH 11/16] gms: add RANGE_SCAN_DATA_VARIANT cluster feature To control the transition to the data variant of range scans. As there is a difference in how the data and mutation variants calculate pages sizes, the transition to the former has to happen in a controlled manner, when all nodes in the cluster support it, to avoid artificial differences in page content and subsequently triggering false-positive read repair. --- gms/feature.hh | 1 + gms/feature_service.cc | 4 ++++ gms/feature_service.hh | 7 +++++++ 3 files changed, 12 insertions(+) diff --git a/gms/feature.hh b/gms/feature.hh index c2ae7a5c19..e32da7eb0b 100644 --- a/gms/feature.hh +++ b/gms/feature.hh @@ -145,6 +145,7 @@ extern const std::string_view PER_TABLE_CACHING; extern const std::string_view DIGEST_FOR_NULL_VALUES; extern const std::string_view CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX; extern const std::string_view ALTERNATOR_STREAMS; +extern const std::string_view RANGE_SCAN_DATA_VARIANT; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index b3c1e5c568..724734b164 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -64,6 +64,7 @@ constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING"; constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES"; constexpr std::string_view features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX = "CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX"; constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS"; +constexpr std::string_view features::RANGE_SCAN_DATA_VARIANT = "RANGE_SCAN_DATA_VARIANT"; static logging::logger logger("features"); @@ -90,6 +91,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg) , _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES) , _correct_idx_token_in_secondary_index_feature(*this, features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX) , _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS) + , _range_scan_data_variant(*this, features::RANGE_SCAN_DATA_VARIANT) {} feature_config feature_config_from_db_config(db::config& cfg, std::set disabled) { @@ -193,6 +195,7 @@ std::set feature_service::known_feature_set() { gms::features::DIGEST_FOR_NULL_VALUES, gms::features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX, gms::features::ALTERNATOR_STREAMS, + gms::features::RANGE_SCAN_DATA_VARIANT, }; for (const sstring& s : _config._disabled_features) { @@ -274,6 +277,7 @@ void feature_service::enable(const std::set& list) { std::ref(_digest_for_null_values_feature), std::ref(_correct_idx_token_in_secondary_index_feature), std::ref(_alternator_streams_feature), + std::ref(_range_scan_data_variant), }) { if (list.contains(f.name())) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 3644220f30..6beb925f4c 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -94,6 +94,7 @@ private: gms::feature _digest_for_null_values_feature; gms::feature _correct_idx_token_in_secondary_index_feature; gms::feature _alternator_streams_feature; + gms::feature _range_scan_data_variant; public: bool cluster_supports_user_defined_functions() const { @@ -170,6 +171,12 @@ public: bool cluster_supports_alternator_streams() const { return bool(_alternator_streams_feature); } + + // Range scans have a data variant, which produces query::result directly, + // instead of through the intermediate reconcilable_result format. + bool cluster_supports_range_scan_data_variant() const { + return bool(_range_scan_data_variant); + } }; } // namespace gms From f15551d23a6539057aaec58d32361bf4ff2a8c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 24 Feb 2021 18:11:12 +0200 Subject: [PATCH 12/16] query: partition_slice: add range_scan_data_variant option Switching to the data variant of range scans have to be coordinated by the coordinator to avoid replicas noticing the availability of the respective feature in different time, resulting in some using the mutation variant, some using the data variant. So the plan is that it will be the coordinator's job to check the cluster feature and set the option in the partition slice which will tell the replicas to use the data variant for the query. --- query-request.hh | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/query-request.hh b/query-request.hh index feb818ca3c..31d404a81a 100644 --- a/query-request.hh +++ b/query-request.hh @@ -142,6 +142,10 @@ public: // key restrictions and the partition doesn't have any rows matching // the restrictions, see #589. This flag overrides this behavior. always_return_static_content, + // Use the new data range scan variant, which builds query::result + // directly, bypassing the intermediate reconcilable_result format used + // in pre 4.5 range scans. + range_scan_data_variant, }; using option_set = enum_set>; + option::always_return_static_content, + option::range_scan_data_variant>>; clustering_row_ranges _row_ranges; public: column_id_vector static_columns; // TODO: consider using bitmap From f8ce168c8ed02e8b0a6631a083f263bc24264ae2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 19 Feb 2021 09:13:42 +0200 Subject: [PATCH 13/16] storage_proxy: use query_data_on_all_shards() for data range scan queries Currently range scans build their result using the `reconcilable_result` format and then convert it to `query::result`. This is inefficient for multiple reasons: 1) it introduces an additional intermediate result format and a subsequent conversion to the final one; 2) the reconcilable result format was designed for reconciliation so it contains all data, including columns unselected by the query, dead rows and tombstones, which takes much more memory to build; There is no reason to go through all this trouble, if there ever was one in the past it doesn't stand anymore. So switch to the newly introduced `query_data_on_all_shards()` when doing normal data range scans, but only if all the nodes in the cluster supports it, to avoid artificial differences in page sizes due to how reconcilable result and query::result calculates result size and the consequent false-positive read repair. The transition to this new more efficient method is coordinated by a cluster feature and whether to use it is decided by the coordinator (instead of each replica individually). This is to avoid needless reconciliation due to the different page sizes the two formats will produce. --- service/storage_proxy.cc | 26 +++++++++++++++++++++++--- service/storage_proxy.hh | 3 +++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 8be26bb72d..add6883d1b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3894,11 +3894,11 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr>, cache_temperature>&& r_ht) { + return query_nonsingular_data_locally(s, cmd, {pr}, opts, trace_state, timeout).then( + [trace_state = std::move(trace_state)] (rpc::tuple>, cache_temperature>&& r_ht) { auto&& [r, ht] = r_ht; tracing::trace(trace_state, "Querying is done"); - return make_ready_future>, cache_temperature>>( - rpc::tuple(::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->get_row_limit(), cmd->partition_limit, opts))), ht)); + return make_ready_future>, cache_temperature>>(rpc::tuple(std::move(r), ht)); }); } } @@ -4014,6 +4014,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t std::unordered_map> ranges_per_exec; const auto tmptr = get_token_metadata_ptr(); + if (_features.cluster_supports_range_scan_data_variant()) { + cmd->slice.options.set(); + } + const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) { auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token))); return it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); @@ -5189,6 +5193,22 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, }); } +future>, cache_temperature>> +storage_proxy::query_nonsingular_data_locally(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& prs, + query::result_options opts, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) { + auto ranges = std::move(prs); + auto local_cmd = cmd; + rpc::tuple>, cache_temperature> ret; + if (local_cmd->slice.options.contains(query::partition_slice::option::range_scan_data_variant)) { + ret = co_await query_data_on_all_shards(_db, std::move(s), *local_cmd, ranges, opts, std::move(trace_state), timeout); + } else { + auto res = co_await query_mutations_on_all_shards(_db, s, *local_cmd, ranges, std::move(trace_state), timeout); + ret = rpc::tuple(make_foreign(make_lw_shared(to_data_query_result(std::move(*std::get<0>(res)), std::move(s), local_cmd->slice, + local_cmd->get_row_limit(), local_cmd->partition_limit, opts))), std::get<1>(res)); + } + co_return ret; +} + future<> storage_proxy::start_hints_manager(shared_ptr gossiper_ptr, shared_ptr ss_ptr) { future<> f = make_ready_future<>(); if (!_hints_manager.is_disabled_for_all()) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 98859d0d31..06628d0d3d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -413,6 +413,9 @@ private: future>, cache_temperature>> query_nonsingular_mutations_locally( schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state, clock_type::time_point timeout); + future>, cache_temperature>> query_nonsingular_data_locally( + schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector&& pr, query::result_options opts, + tracing::trace_state_ptr trace_state, clock_type::time_point timeout); future<> mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr trace_state, service_permit permit); From fe280271a68cf121979fb0f7a8772954b21c755e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 1 Mar 2021 14:57:29 +0200 Subject: [PATCH 14/16] cql_query_test: test_query_limit: clean up scheduling groups Destroy scheduling groups created for this test, so other tests can create scheduling groups with the same name, without conflicts. --- test/boost/cql_query_test.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 6c3dda060d..1383aa8aeb 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4609,6 +4609,11 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0(); + auto clean_up_sched_groups = defer([dbcfg = *cfg.dbcfg] { + seastar::destroy_scheduling_group(dbcfg.statement_scheduling_group).get0(); + seastar::destroy_scheduling_group(dbcfg.streaming_scheduling_group).get0(); + }); + do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("CREATE TABLE test (pk int, ck int, v text, PRIMARY KEY (pk, ck));").get(); auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0(); From af0a23e75cdeccd2d3567bb06d07fc6b0b11122c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 1 Mar 2021 13:27:46 +0200 Subject: [PATCH 15/16] test/cql_test_env: do_with_cql_test_env(): add thread_attributes parameter To allow conveniently setting the scheduling group `func` is to be run in. --- test/lib/cql_test_env.cc | 6 +++--- test/lib/cql_test_env.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 5953959318..3dade7f33b 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -685,9 +685,9 @@ future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_c return single_node_cql_env::do_with(func, std::move(cfg_in)); } -future<> do_with_cql_env_thread(std::function func, cql_test_config cfg_in) { - return single_node_cql_env::do_with([func = std::move(func)] (auto& e) { - return seastar::async([func = std::move(func), &e] { +future<> do_with_cql_env_thread(std::function func, cql_test_config cfg_in, thread_attributes thread_attr) { + return single_node_cql_env::do_with([func = std::move(func), thread_attr] (auto& e) { + return seastar::async(thread_attr, [func = std::move(func), &e] { return func(e); }); }, std::move(cfg_in)); diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index e3d22fe6c7..f114c27755 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -135,4 +135,4 @@ public: }; future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_config = {}); -future<> do_with_cql_env_thread(std::function func, cql_test_config = {}); +future<> do_with_cql_env_thread(std::function func, cql_test_config = {}, thread_attributes thread_attr = {}); From 257c295cff4daba32309743ec500b0562ef9525a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 1 Mar 2021 10:34:22 +0200 Subject: [PATCH 16/16] cql_query_test: add unit test for the more efficient range scan result format The most user-visible aspect of this change is range scans which select a small subset of the columns. These queries work as the user expects them to work: unselected columns are not included in determining the size of the result (or that of the page). This is the aspect this test is checking for. While at it, also test single partition queries too. --- test/boost/cql_query_test.cc | 43 ++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 1383aa8aeb..9b12b52f1f 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4838,3 +4838,46 @@ SEASTAR_THREAD_TEST_CASE(test_twcs_optimal_query_path) { .is_rows().with_size(1); }).get(); } + +SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) { + cql_test_config cfg; + + cfg.db_config->max_memory_for_unlimited_query_soft_limit(1024 * 1024, utils::config_file::config_source::CommandLine); + cfg.db_config->max_memory_for_unlimited_query_hard_limit(1024 * 1024, utils::config_file::config_source::CommandLine); + + cfg.dbcfg.emplace(); + cfg.dbcfg->available_memory = memory::stats().total_memory(); + cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); + + auto statement_sched_group = cfg.dbcfg->statement_scheduling_group; + + auto clean_up_sched_groups = defer([statement_sched_group] { + seastar::destroy_scheduling_group(statement_sched_group).get0(); + }); + + do_with_cql_env_thread([] (cql_test_env& e) { + auto now_nano = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); + e.execute_cql("CREATE TABLE tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))").get(); + + const unsigned num_rows = 20; + const sstring val(100 * 1024, 'a'); + const auto id = e.prepare(format("INSERT INTO tbl (pk, ck, v) VALUES (0, ?, '{}')", val)).get0(); + for (int ck = 0; ck < num_rows; ++ck) { + e.execute_prepared(id, {cql3::raw_value::make_value(int32_type->decompose(ck))}).get(); + } + + { + testlog.info("Single partition scan"); + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()}); + assert_that(e.execute_cql("SELECT pk, ck FROM tbl WHERE pk = 0", std::move(qo)).get0()).is_rows().with_size(num_rows); + } + + { + testlog.info("Full scan"); + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()}); + assert_that(e.execute_cql("SELECT pk, ck FROM tbl", std::move(qo)).get0()).is_rows().with_size(num_rows); + } + }, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get(); +}