mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 11:22:01 +00:00
mutation_query: drop querying_reader
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -62,51 +62,6 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
querying_reader::querying_reader(schema_ptr s,
|
||||
const mutation_source& source,
|
||||
const query::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
uint32_t row_limit,
|
||||
gc_clock::time_point query_time,
|
||||
std::function<void(uint32_t, mutation&&)> consumer)
|
||||
: _schema(std::move(s))
|
||||
, _range(range)
|
||||
, _slice(slice)
|
||||
, _requested_limit(row_limit)
|
||||
, _query_time(query_time)
|
||||
, _limit(row_limit)
|
||||
, _source(source)
|
||||
, _consumer(std::move(consumer))
|
||||
{ }
|
||||
|
||||
future<> querying_reader::read() {
|
||||
_reader = _source(_schema, _range, query::clustering_key_filtering_context::create(_schema, _slice),
|
||||
service::get_local_sstable_query_read_priority());
|
||||
return consume(*_reader, [this](mutation&& m) {
|
||||
// FIXME: Make data sources respect row_ranges so that we don't have to filter them out here.
|
||||
auto is_distinct = _slice.options.contains(query::partition_slice::option::distinct);
|
||||
auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed);
|
||||
auto limit = !is_distinct ? _limit : 1;
|
||||
auto rows_left = m.partition().compact_for_query(*m.schema(), _query_time,
|
||||
_slice.row_ranges(*m.schema(), m.key()),
|
||||
is_reversed, limit);
|
||||
_limit -= rows_left;
|
||||
|
||||
if (rows_left || !m.partition().empty()) {
|
||||
// NOTE: We must return all columns, regardless of what's in
|
||||
// partition_slice, for the results to be reconcilable with tombstones.
|
||||
// That's because row's presence depends on existence of any
|
||||
// column in a row (See mutation_partition::query). We could
|
||||
// optimize this case and only send cell timestamps, without data,
|
||||
// for the cells which are not queried for (TODO).
|
||||
_consumer(rows_left, std::move(m));
|
||||
}
|
||||
|
||||
return _limit ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) {
|
||||
out << "{rows=" << pr.self.row_count() << ", [";
|
||||
bool first = true;
|
||||
|
||||
@@ -118,25 +118,3 @@ future<reconcilable_result> mutation_query(
|
||||
future<uint32_t> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time,
|
||||
query::result::builder& builder);
|
||||
|
||||
class querying_reader {
|
||||
schema_ptr _schema;
|
||||
const query::partition_range& _range;
|
||||
const query::partition_slice& _slice;
|
||||
uint32_t _requested_limit;
|
||||
gc_clock::time_point _query_time;
|
||||
uint32_t _limit;
|
||||
const mutation_source& _source;
|
||||
std::function<void(uint32_t, mutation&&)> _consumer;
|
||||
std::experimental::optional<mutation_reader> _reader;
|
||||
public:
|
||||
querying_reader(schema_ptr s,
|
||||
const mutation_source& source,
|
||||
const query::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
uint32_t row_limit,
|
||||
gc_clock::time_point query_time,
|
||||
std::function<void(uint32_t, mutation&&)> consumer);
|
||||
|
||||
future<> read();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user