multishard_mutation_query: add query_data_on_all_shards()
A data query variant of the existing `query_mutations_on_all_shards()`. This variant builds a `query::result`, instead of `reconcilable_result`. This is actually the result format coordinators want when executing range scans, the reason for using the reconcilable result for these queries is historic, and it just introduces an unnecessary intermediate format. This new method allows the storage proxy to skip this intermediate format and the associated conversion to `query::result`, just like we do for single partition queries. Reverse queries are refused because they are not supported on the client API (CQL) level anyway and hence it is unspecified how they should work and more importantly: they are not tested.
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
#include "multishard_mutation_query.hh"
|
||||
#include "database.hh"
|
||||
#include "db/config.hh"
|
||||
#include "query-result-writer.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
@@ -738,6 +739,32 @@ public:
|
||||
result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); }
|
||||
};
|
||||
|
||||
class data_query_result_builder {
|
||||
public:
|
||||
using result_type = query::result;
|
||||
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
|
||||
|
||||
private:
|
||||
std::unique_ptr<query::result::builder> _res_builder;
|
||||
query_result_builder _builder;
|
||||
|
||||
public:
|
||||
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter)
|
||||
: _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
|
||||
, _builder(s, *_res_builder) { }
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
|
||||
void consume(tombstone t) { _builder.consume(t); }
|
||||
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
|
||||
stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); }
|
||||
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
|
||||
result_type consume_end_of_stream() {
|
||||
_builder.consume_end_of_stream();
|
||||
return _res_builder->build();
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_mutations_on_all_shards(
|
||||
@@ -752,3 +779,20 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
|
||||
return mutation_query_result_builder(*s, cmd.slice, std::move(accounter));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
|
||||
distributed<database>& db,
|
||||
schema_ptr s,
|
||||
const query::read_command& cmd,
|
||||
const dht::partition_range_vector& ranges,
|
||||
query::result_options opts,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
if (cmd.slice.options.contains(query::partition_slice::option::reversed)) {
|
||||
throw std::runtime_error("query_data_on_all_shards(): reverse range scans are not supported");
|
||||
}
|
||||
return do_query_on_all_shards<data_query_result_builder>(db, s, cmd, ranges, std::move(trace_state), timeout,
|
||||
[s, &cmd, opts] (query::result_memory_accounter&& accounter) {
|
||||
return data_query_result_builder(*s, cmd.slice, opts, std::move(accounter));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -68,3 +68,16 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
/// Run the data query on all shards.
|
||||
///
|
||||
/// Identical to `query_mutations_on_all_shards()` except that it builds results
|
||||
/// in the `query::result` format instead of in the `reconcilable_result` one.
|
||||
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
|
||||
distributed<database>& db,
|
||||
schema_ptr s,
|
||||
const query::read_command& cmd,
|
||||
const dht::partition_range_vector& ranges,
|
||||
query::result_options opts,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
Reference in New Issue
Block a user