multishard_mutation_query: add query_data_on_all_shards()

A data query variant of the existing `query_mutations_on_all_shards()`.
This variant builds a `query::result`, instead of `reconcilable_result`.
This is actually the result format coordinators want when executing
range scans, the reason for using the reconcilable result for these
queries is historic, and it just introduces an unnecessary intermediate
format.
This new method allows the storage proxy to skip this intermediate
format and the associated conversion to `query::result`, just like we do
for single partition queries.

Reverse queries are refused because they are not supported on the client
API (CQL) level anyway and hence it is unspecified how they should work
and more importantly: they are not tested.
This commit is contained in:
Botond Dénes
2021-02-18 18:28:11 +02:00
parent df0f501ba2
commit 034cb81323
2 changed files with 57 additions and 0 deletions

View File

@@ -24,6 +24,7 @@
#include "multishard_mutation_query.hh"
#include "database.hh"
#include "db/config.hh"
#include "query-result-writer.hh"
#include <seastar/core/coroutine.hh>
@@ -738,6 +739,32 @@ public:
result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); }
};
class data_query_result_builder {
public:
using result_type = query::result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
private:
std::unique_ptr<query::result::builder> _res_builder;
query_result_builder _builder;
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter)
: _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
, _builder(s, *_res_builder) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
void consume(tombstone t) { _builder.consume(t); }
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); }
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() {
_builder.consume_end_of_stream();
return _res_builder->build();
}
};
} // anonymous namespace
future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_mutations_on_all_shards(
@@ -752,3 +779,20 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
return mutation_query_result_builder(*s, cmd.slice, std::move(accounter));
});
}
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
query::result_options opts,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
if (cmd.slice.options.contains(query::partition_slice::option::reversed)) {
throw std::runtime_error("query_data_on_all_shards(): reverse range scans are not supported");
}
return do_query_on_all_shards<data_query_result_builder>(db, s, cmd, ranges, std::move(trace_state), timeout,
[s, &cmd, opts] (query::result_memory_accounter&& accounter) {
return data_query_result_builder(*s, cmd.slice, opts, std::move(accounter));
});
}

View File

@@ -68,3 +68,16 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);
/// Run the data query on all shards.
///
/// Identical to `query_mutations_on_all_shards()` except that it builds results
/// in the `query::result` format instead of in the `reconcilable_result` one.
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
query::result_options opts,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);