Merge "Build query::result directly in range scan queries" from Botond

"
Currently range scans build their results on the replica in the
`reconcilable_result` format, that -- as its name suggests -- is
normally used for reconciliation (read repair). As such this result
format is quite inefficient for normal queries: it contains all columns
and all tombstones in the requested range. These are all unnecessary for
normal queries which only want live data and only those columns that are
requested by the user.
Furthermore, as the coordinator works in terms of `query::result` for
normal queries anyway, this intermediate result has to be converted to
the final `query::result` format adding an unnecessary intermediate
conversion step.
This series gets rid of this problem by introducing
`query_data_on_all_shards()`, a variant of
`query_mutations_on_all_shards()` that builds `query::result` directly.
Reverse queries still use the old intermediate method behind the scenes.

Fixes #8061
Refs #7434

Tests: unit(release, debug)
"

* 'range-scan-data-variant/v5-rebased' of https://github.com/denesb/scylla:
  cql_query_test: add unit test for the more efficient range scan result format
  test/cql_test_env: do_with_cql_test_env(): add thread_attributes parameter
  cql_query_test: test_query_limit: clean up scheduling groups
  storage_proxy: use query_data_on_all_shards() for data range scan queries
  query: partition_slice: add range_scan_data_variant option
  gms: add RANGE_SCAN_DATA_VARIANT cluster feature
  multishard_mutation_query: query_mutations_on_all_shards(): refuse reverse queries
  multishard_mutation_query: add query_data_on_all_shards()
  mutation_partition.cc: fix indentation
  query_result_builder: make it a public type
  multishard_mutation_query: generalize query code w.r.t. the result builder used
  multishard_mutation_query: query_mutations_on_all_shards(): extract logic into new method
  multishard_mutation_query: query_mutations_on_all_shards(): convert to coroutine
  multishar_mutation_query: do_query_mutations(): convert to coroutine
  multishard_mutation_query: read_page(): convert to coroutine
  multishard_mutation_query: extract page reading logic into separate method
This commit is contained in:
Avi Kivity
2021-03-02 08:54:41 +02:00
13 changed files with 363 additions and 147 deletions

View File

@@ -145,6 +145,7 @@ extern const std::string_view PER_TABLE_CACHING;
extern const std::string_view DIGEST_FOR_NULL_VALUES;
extern const std::string_view CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX;
extern const std::string_view ALTERNATOR_STREAMS;
extern const std::string_view RANGE_SCAN_DATA_VARIANT;
}

View File

@@ -64,6 +64,7 @@ constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING";
constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES";
constexpr std::string_view features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX = "CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX";
constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS";
constexpr std::string_view features::RANGE_SCAN_DATA_VARIANT = "RANGE_SCAN_DATA_VARIANT";
static logging::logger logger("features");
@@ -90,6 +91,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES)
, _correct_idx_token_in_secondary_index_feature(*this, features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX)
, _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS)
, _range_scan_data_variant(*this, features::RANGE_SCAN_DATA_VARIANT)
{}
feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring> disabled) {
@@ -193,6 +195,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::DIGEST_FOR_NULL_VALUES,
gms::features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX,
gms::features::ALTERNATOR_STREAMS,
gms::features::RANGE_SCAN_DATA_VARIANT,
};
for (const sstring& s : _config._disabled_features) {
@@ -274,6 +277,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_digest_for_null_values_feature),
std::ref(_correct_idx_token_in_secondary_index_feature),
std::ref(_alternator_streams_feature),
std::ref(_range_scan_data_variant),
})
{
if (list.contains(f.name())) {

View File

@@ -94,6 +94,7 @@ private:
gms::feature _digest_for_null_values_feature;
gms::feature _correct_idx_token_in_secondary_index_feature;
gms::feature _alternator_streams_feature;
gms::feature _range_scan_data_variant;
public:
bool cluster_supports_user_defined_functions() const {
@@ -170,6 +171,12 @@ public:
bool cluster_supports_alternator_streams() const {
return bool(_alternator_streams_feature);
}
// Range scans have a data variant, which produces query::result directly,
// instead of through the intermediate reconcilable_result format.
bool cluster_supports_range_scan_data_variant() const {
return bool(_range_scan_data_variant);
}
};
} // namespace gms

View File

@@ -24,6 +24,9 @@
#include "multishard_mutation_query.hh"
#include "database.hh"
#include "db/config.hh"
#include "query-result-writer.hh"
#include <seastar/core/coroutine.hh>
#include <boost/range/adaptor/reversed.hpp>
@@ -60,7 +63,7 @@ using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
///
/// Note:
/// 1) Each step can only be started when the previous phase has finished.
/// 2) This usage is implemented in the `do_query_mutations()` function below.
/// 2) This usage is implemented in the `do_query()` function below.
/// 3) Both, `read_context::lookup_readers()` and `read_context::save_readers()`
/// knows to do nothing when the query is not stateful and just short
/// circuit.
@@ -589,18 +592,23 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu
namespace {
using consume_result = std::tuple<std::optional<clustering_key_prefix>, reconcilable_result>;
template <typename ResultType>
using consume_result = std::tuple<std::optional<clustering_key_prefix>, ResultType>;
template <typename ResultType>
using compact_for_result_state = compact_for_query_state<ResultType::only_live>;
template <typename ResultBuilder>
struct page_consume_result {
std::optional<clustering_key_prefix> last_ckey;
reconcilable_result result;
typename ResultBuilder::result_type result;
flat_mutation_reader::tracked_buffer unconsumed_fragments;
lw_shared_ptr<compact_for_mutation_query_state> compaction_state;
lw_shared_ptr<compact_for_result_state<ResultBuilder>> compaction_state;
page_consume_result(consume_result&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments,
lw_shared_ptr<compact_for_mutation_query_state>&& compaction_state)
page_consume_result(consume_result<typename ResultBuilder::result_type>&& result, flat_mutation_reader::tracked_buffer&& unconsumed_fragments,
lw_shared_ptr<compact_for_result_state<ResultBuilder>>&& compaction_state)
: last_ckey(std::get<std::optional<clustering_key_prefix>>(std::move(result)))
, result(std::get<reconcilable_result>(std::move(result)))
, result(std::get<typename ResultBuilder::result_type>(std::move(result)))
, unconsumed_fragments(std::move(unconsumed_fragments))
, compaction_state(std::move(compaction_state)) {
}
@@ -608,62 +616,157 @@ struct page_consume_result {
} // anonymous namespace
static future<reconcilable_result> do_query_mutations(
template <typename ResultBuilder>
future<page_consume_result<ResultBuilder>> read_page(
shared_ptr<read_context> ctx,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
ResultBuilder&& result_builder) {
auto ms = mutation_source([&] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr);
});
auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges,
cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no);
auto compaction_state = make_lw_shared<compact_for_result_state<ResultBuilder>>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(),
cmd.partition_limit);
auto result = co_await query::consume_page(reader, compaction_state, cmd.slice, std::move(result_builder), cmd.get_row_limit(),
cmd.partition_limit, cmd.timestamp, timeout, *cmd.max_result_size);
co_return page_consume_result<ResultBuilder>(std::move(result), reader.detach_buffer(), std::move(compaction_state));
}
template <typename ResultBuilder>
future<typename ResultBuilder::result_type> do_query(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
query::result_memory_accounter&& accounter) {
return do_with(seastar::make_shared<read_context>(db, s, cmd, ranges, trace_state), [&db, s, &cmd, &ranges, trace_state, timeout,
accounter = std::move(accounter)] (shared_ptr<read_context>& ctx) mutable {
return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout,
accounter = std::move(accounter)] () mutable {
auto ms = mutation_source([&] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr);
});
auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges,
cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no);
ResultBuilder&& result_builder) {
auto ctx = seastar::make_shared<read_context>(db, s, cmd, ranges, trace_state);
auto compaction_state = make_lw_shared<compact_for_mutation_query_state>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(),
cmd.partition_limit);
co_await ctx->lookup_readers();
return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] (
flat_mutation_reader& reader, lw_shared_ptr<compact_for_mutation_query_state>& compaction_state) mutable {
auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter));
return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp,
timeout, *cmd.max_result_size).then([&] (consume_result&& result) mutable {
return make_ready_future<page_consume_result>(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)));
});
}).then_wrapped([&ctx] (future<page_consume_result>&& result_fut) {
if (result_fut.failed()) {
return make_exception_future<reconcilable_result>(std::move(result_fut.get_exception()));
}
std::exception_ptr ex;
auto [last_ckey, result, unconsumed_buffer, compaction_state] = result_fut.get0();
if (!compaction_state->are_limits_reached() && !result.is_short_read()) {
return make_ready_future<reconcilable_result>(std::move(result));
}
try {
auto [last_ckey, result, unconsumed_buffer, compaction_state] = co_await read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, timeout,
std::move(result_builder));
return ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(),
std::move(last_ckey)).then_wrapped([result = std::move(result)] (future<>&&) mutable {
return make_ready_future<reconcilable_result>(std::move(result));
});
}).finally([&ctx] {
return ctx->stop();
});
});
});
if (compaction_state->are_limits_reached() || result.is_short_read()) {
co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey));
}
co_await ctx->stop();
co_return std::move(result);
} catch (...) {
ex = std::current_exception();
}
co_await ctx->stop();
std::rethrow_exception(std::move(ex));
}
template <typename ResultBuilder>
static future<std::tuple<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>, cache_temperature>> do_query_on_all_shards(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
std::function<ResultBuilder(query::result_memory_accounter&&)> result_builder_factory) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
co_return std::tuple(
make_foreign(make_lw_shared<typename ResultBuilder::result_type>()),
db.local().find_column_family(s).get_global_cache_hit_rate());
}
auto& local_db = db.local();
auto& stats = local_db.get_stats();
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
try {
auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed);
auto result_builder = result_builder_factory(std::move(accounter));
auto result = co_await do_query<ResultBuilder>(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(result_builder));
++stats.total_reads;
stats.short_mutation_queries += bool(result.is_short_read());
auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate();
co_return std::tuple(make_foreign(make_lw_shared<typename ResultBuilder::result_type>(std::move(result))), hit_rate);
} catch (...) {
++stats.total_reads_failed;
throw;
}
}
namespace {
class mutation_query_result_builder {
public:
using result_type = reconcilable_result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::no;
private:
reconcilable_result_builder _builder;
public:
mutation_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter)
: _builder(s, slice, std::move(accounter)) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
void consume(tombstone t) { _builder.consume(t); }
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); }
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); }
};
class data_query_result_builder {
public:
using result_type = query::result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
private:
std::unique_ptr<query::result::builder> _res_builder;
query_result_builder _builder;
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter)
: _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
, _builder(s, *_res_builder) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
void consume(tombstone t) { _builder.consume(t); }
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); }
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() {
_builder.consume_end_of_stream();
return _res_builder->build();
}
};
} // anonymous namespace
future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_mutations_on_all_shards(
distributed<database>& db,
schema_ptr s,
@@ -671,31 +774,28 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
return make_ready_future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>(
std::tuple(
make_foreign(make_lw_shared<reconcilable_result>()),
db.local().find_column_family(s).get_global_cache_hit_rate()));
if (cmd.slice.options.contains(query::partition_slice::option::reversed)) {
throw std::runtime_error("query_mutations_on_all_shards(): reverse range scans are not supported");
}
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
return db.local().get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed).then([&, s = std::move(s),
trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable {
return do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)).then_wrapped(
[&db, s = std::move(s)] (future<reconcilable_result>&& f) {
auto& local_db = db.local();
auto& stats = local_db.get_stats();
if (f.failed()) {
++stats.total_reads_failed;
return make_exception_future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>(f.get_exception());
} else {
++stats.total_reads;
auto result = f.get0();
stats.short_mutation_queries += bool(result.is_short_read());
auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate();
return make_ready_future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>(
std::tuple(make_foreign(make_lw_shared<reconcilable_result>(std::move(result))), hit_rate));
}
});
return do_query_on_all_shards<mutation_query_result_builder>(db, s, cmd, ranges, std::move(trace_state), timeout,
[s, &cmd] (query::result_memory_accounter&& accounter) {
return mutation_query_result_builder(*s, cmd.slice, std::move(accounter));
});
}
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
query::result_options opts,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
if (cmd.slice.options.contains(query::partition_slice::option::reversed)) {
throw std::runtime_error("query_data_on_all_shards(): reverse range scans are not supported");
}
return do_query_on_all_shards<data_query_result_builder>(db, s, cmd, ranges, std::move(trace_state), timeout,
[s, &cmd, opts] (query::result_memory_accounter&& accounter) {
return data_query_result_builder(*s, cmd.slice, opts, std::move(accounter));
});
}

View File

@@ -68,3 +68,16 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);
/// Run the data query on all shards.
///
/// Identical to `query_mutations_on_all_shards()` except that it builds results
/// in the `query::result` format instead of in the `reconcilable_result` one.
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
query::result_options opts,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);

View File

@@ -1806,30 +1806,6 @@ mutation_partition::upgrade(const schema& old_schema, const schema& new_schema)
*this = std::move(tmp);
}
// Adds mutation to query::result.
class mutation_querier {
const schema& _schema;
query::result_memory_accounter& _memory_accounter;
query::result::partition_writer _pw;
ser::qr_partition__static_row__cells<bytes_ostream> _static_cells_wr;
bool _live_data_in_static_row{};
uint64_t _live_clustering_rows = 0;
std::optional<ser::qr_partition__rows<bytes_ostream>> _rows_wr;
private:
void query_static_row(const row& r, tombstone current_tombstone);
void prepare_writers();
public:
mutation_querier(const schema& s, query::result::partition_writer pw,
query::result_memory_accounter& memory_accounter);
void consume(tombstone) { }
// Requires that sr.has_any_live_data()
stop_iteration consume(static_row&& sr, tombstone current_tombstone);
// Requires that cr.has_any_live_data()
stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone);
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
uint64_t consume_end_of_stream();
};
mutation_querier::mutation_querier(const schema& s, query::result::partition_writer pw,
query::result_memory_accounter& memory_accounter)
: _schema(s)
@@ -1947,50 +1923,43 @@ uint64_t mutation_querier::consume_end_of_stream() {
}
}
class query_result_builder {
const schema& _schema;
query::result::builder& _rb;
std::optional<mutation_querier> _mutation_consumer;
stop_iteration _stop;
public:
query_result_builder(const schema& s, query::result::builder& rb)
: _schema(s), _rb(rb)
{ }
query_result_builder::query_result_builder(const schema& s, query::result::builder& rb)
: _schema(s), _rb(rb)
{ }
void consume_new_partition(const dht::decorated_key& dk) {
_mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter()));
}
void query_result_builder::consume_new_partition(const dht::decorated_key& dk) {
_mutation_consumer.emplace(mutation_querier(_schema, _rb.add_partition(_schema, dk.key()), _rb.memory_accounter()));
}
void consume(tombstone t) {
_mutation_consumer->consume(t);
}
stop_iteration consume(static_row&& sr, tombstone t, bool) {
_stop = _mutation_consumer->consume(std::move(sr), t);
return _stop;
}
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool) {
_stop = _mutation_consumer->consume(std::move(cr), t);
return _stop;
}
stop_iteration consume(range_tombstone&& rt) {
_stop = _mutation_consumer->consume(std::move(rt));
return _stop;
}
void query_result_builder::consume(tombstone t) {
_mutation_consumer->consume(t);
}
stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) {
_stop = _mutation_consumer->consume(std::move(sr), t);
return _stop;
}
stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) {
_stop = _mutation_consumer->consume(std::move(cr), t);
return _stop;
}
stop_iteration query_result_builder::consume(range_tombstone&& rt) {
_stop = _mutation_consumer->consume(std::move(rt));
return _stop;
}
stop_iteration consume_end_of_partition() {
auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream();
if (live_rows_in_partition > 0 && !_stop) {
_stop = _rb.memory_accounter().check();
}
if (_stop) {
_rb.mark_as_short_read();
}
return _stop;
stop_iteration query_result_builder::consume_end_of_partition() {
auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream();
if (live_rows_in_partition > 0 && !_stop) {
_stop = _rb.memory_accounter().check();
}
if (_stop) {
_rb.mark_as_short_read();
}
return _stop;
}
void consume_end_of_stream() {
}
};
void query_result_builder::consume_end_of_stream() {
}
future<> data_query(
schema_ptr s,

View File

@@ -142,6 +142,10 @@ public:
// key restrictions and the partition doesn't have any rows matching
// the restrictions, see #589. This flag overrides this behavior.
always_return_static_content,
// Use the new data range scan variant, which builds query::result
// directly, bypassing the intermediate reconcilable_result format used
// in pre 4.5 range scans.
range_scan_data_variant,
};
using option_set = enum_set<super_enum<option,
option::send_clustering_key,
@@ -155,7 +159,8 @@ public:
option::allow_short_read,
option::with_digest,
option::bypass_cache,
option::always_return_static_content>>;
option::always_return_static_content,
option::range_scan_data_variant>>;
clustering_row_ranges _row_ranges;
public:
column_id_vector static_columns; // TODO: consider using bitmap

View File

@@ -194,3 +194,49 @@ public:
};
}
class row;
class static_row;
class clustering_row;
class range_tombstone;
// Adds mutation to query::result.
class mutation_querier {
const schema& _schema;
query::result_memory_accounter& _memory_accounter;
query::result::partition_writer _pw;
ser::qr_partition__static_row__cells<bytes_ostream> _static_cells_wr;
bool _live_data_in_static_row{};
uint64_t _live_clustering_rows = 0;
std::optional<ser::qr_partition__rows<bytes_ostream>> _rows_wr;
private:
void query_static_row(const row& r, tombstone current_tombstone);
void prepare_writers();
public:
mutation_querier(const schema& s, query::result::partition_writer pw,
query::result_memory_accounter& memory_accounter);
void consume(tombstone) { }
// Requires that sr.has_any_live_data()
stop_iteration consume(static_row&& sr, tombstone current_tombstone);
// Requires that cr.has_any_live_data()
stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone);
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
uint64_t consume_end_of_stream();
};
class query_result_builder {
const schema& _schema;
query::result::builder& _rb;
std::optional<mutation_querier> _mutation_consumer;
stop_iteration _stop;
public:
query_result_builder(const schema& s, query::result::builder& rb);
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t);
stop_iteration consume(static_row&& sr, tombstone t, bool);
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool);
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
};

View File

@@ -3894,11 +3894,11 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
} else {
// FIXME: adjust multishard_mutation_query to accept an smp_service_group and propagate it there
tracing::trace(trace_state, "Start querying token range {}", pr);
return query_nonsingular_mutations_locally(s, cmd, {pr}, trace_state, timeout).then([s, cmd, opts, trace_state = std::move(trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>&& r_ht) {
return query_nonsingular_data_locally(s, cmd, {pr}, opts, trace_state, timeout).then(
[trace_state = std::move(trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>&& r_ht) {
auto&& [r, ht] = r_ht;
tracing::trace(trace_state, "Querying is done");
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(
rpc::tuple(::make_foreign(::make_lw_shared<query::result>(to_data_query_result(*r, s, cmd->slice, cmd->get_row_limit(), cmd->partition_limit, opts))), ht));
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(std::move(r), ht));
});
}
}
@@ -4014,6 +4014,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
std::unordered_map<abstract_read_executor*, std::vector<dht::token_range>> ranges_per_exec;
const auto tmptr = get_token_metadata_ptr();
if (_features.cluster_supports_range_scan_data_variant()) {
cmd->slice.options.set<query::partition_slice::option::range_scan_data_variant>();
}
const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) {
auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token)));
return it == preferred_replicas.end() ? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(*tmptr, it->second);
@@ -5189,6 +5193,22 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s,
});
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_nonsingular_data_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& prs,
query::result_options opts, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) {
auto ranges = std::move(prs);
auto local_cmd = cmd;
rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> ret;
if (local_cmd->slice.options.contains(query::partition_slice::option::range_scan_data_variant)) {
ret = co_await query_data_on_all_shards(_db, std::move(s), *local_cmd, ranges, opts, std::move(trace_state), timeout);
} else {
auto res = co_await query_mutations_on_all_shards(_db, s, *local_cmd, ranges, std::move(trace_state), timeout);
ret = rpc::tuple(make_foreign(make_lw_shared<query::result>(to_data_query_result(std::move(*std::get<0>(res)), std::move(s), local_cmd->slice,
local_cmd->get_row_limit(), local_cmd->partition_limit, opts))), std::get<1>(res));
}
co_return ret;
}
future<> storage_proxy::start_hints_manager(shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr) {
future<> f = make_ready_future<>();
if (!_hints_manager.is_disabled_for_all()) {

View File

@@ -413,6 +413,9 @@ private:
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_nonsingular_mutations_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state,
clock_type::time_point timeout);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_nonsingular_data_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& pr, query::result_options opts,
tracing::trace_state_ptr trace_state, clock_type::time_point timeout);
future<> mutate_counters_on_leader(std::vector<frozen_mutation_and_schema> mutations, db::consistency_level cl, clock_type::time_point timeout,
tracing::trace_state_ptr trace_state, service_permit permit);

View File

@@ -4609,6 +4609,11 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0();
cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0();
auto clean_up_sched_groups = defer([dbcfg = *cfg.dbcfg] {
seastar::destroy_scheduling_group(dbcfg.statement_scheduling_group).get0();
seastar::destroy_scheduling_group(dbcfg.streaming_scheduling_group).get0();
});
do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE test (pk int, ck int, v text, PRIMARY KEY (pk, ck));").get();
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0();
@@ -4833,3 +4838,46 @@ SEASTAR_THREAD_TEST_CASE(test_twcs_optimal_query_path) {
.is_rows().with_size(1);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) {
cql_test_config cfg;
cfg.db_config->max_memory_for_unlimited_query_soft_limit(1024 * 1024, utils::config_file::config_source::CommandLine);
cfg.db_config->max_memory_for_unlimited_query_hard_limit(1024 * 1024, utils::config_file::config_source::CommandLine);
cfg.dbcfg.emplace();
cfg.dbcfg->available_memory = memory::stats().total_memory();
cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0();
auto statement_sched_group = cfg.dbcfg->statement_scheduling_group;
auto clean_up_sched_groups = defer([statement_sched_group] {
seastar::destroy_scheduling_group(statement_sched_group).get0();
});
do_with_cql_env_thread([] (cql_test_env& e) {
auto now_nano = std::chrono::duration_cast<std::chrono::nanoseconds>(db_clock::now().time_since_epoch()).count();
e.execute_cql("CREATE TABLE tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))").get();
const unsigned num_rows = 20;
const sstring val(100 * 1024, 'a');
const auto id = e.prepare(format("INSERT INTO tbl (pk, ck, v) VALUES (0, ?, '{}')", val)).get0();
for (int ck = 0; ck < num_rows; ++ck) {
e.execute_prepared(id, {cql3::raw_value::make_value(int32_type->decompose(ck))}).get();
}
{
testlog.info("Single partition scan");
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()});
assert_that(e.execute_cql("SELECT pk, ck FROM tbl WHERE pk = 0", std::move(qo)).get0()).is_rows().with_size(num_rows);
}
{
testlog.info("Full scan");
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{-1, nullptr, {}, api::new_timestamp()});
assert_that(e.execute_cql("SELECT pk, ck FROM tbl", std::move(qo)).get0()).is_rows().with_size(num_rows);
}
}, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get();
}

View File

@@ -685,9 +685,9 @@ future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_c
return single_node_cql_env::do_with(func, std::move(cfg_in));
}
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config cfg_in) {
return single_node_cql_env::do_with([func = std::move(func)] (auto& e) {
return seastar::async([func = std::move(func), &e] {
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config cfg_in, thread_attributes thread_attr) {
return single_node_cql_env::do_with([func = std::move(func), thread_attr] (auto& e) {
return seastar::async(thread_attr, [func = std::move(func), &e] {
return func(e);
});
}, std::move(cfg_in));

View File

@@ -135,4 +135,4 @@ public:
};
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config = {});
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config = {});
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config = {}, thread_attributes thread_attr = {});