diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 9632cda9ca..6e0417e59d 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco namespace { template -using compact_for_result_state = compact_for_query_state; +using compact_for_result_state = compact_for_query_state_v2; template requires std::is_nothrow_move_constructible_v @@ -821,7 +821,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } }; @@ -844,7 +844,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); diff --git a/querier.hh b/querier.hh index 55256c173d..ef924d4a35 100644 --- a/querier.hh +++ b/querier.hh @@ -44,8 +44,8 @@ public: *_last_ckey = cr.key(); return _consumer.consume(std::move(cr), std::move(t), is_live); } - stop_iteration consume(range_tombstone&& rt) { - return _consumer.consume(std::move(rt)); + stop_iteration consume(range_tombstone_change&& rtc) { + return _consumer.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _consumer.consume_end_of_partition(); @@ -63,42 +63,9 @@ public: /// or std::nullopt if the last row wasn't a clustering row, and whatever the /// consumer's `consume_end_of_stream()` method returns. template -requires CompactedFragmentsConsumer -auto consume_page(flat_mutation_reader& reader, - lw_shared_ptr> compaction_state, - const query::partition_slice& slice, - Consumer&& consumer, - uint64_t row_limit, - uint32_t partition_limit, - gc_clock::time_point query_time) { - return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( - mutation_fragment* next_fragment) mutable { - const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; - compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); - - auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( - compaction_state, - clustering_position_tracker(std::move(consumer), last_ckey)); - - return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { - static_assert(sizeof...(results) <= 1); - return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); - }); - }); -} - -/// Consume a page worth of data from the reader. -/// -/// Uses `compaction_state` for compacting the fragments and `consumer` for -/// building the results. -/// Returns a future containing a tuple with the last consumed clustering key, -/// or std::nullopt if the last row wasn't a clustering row, and whatever the -/// consumer's `consume_end_of_stream()` method returns. -template -requires CompactedFragmentsConsumer +requires CompactedFragmentsConsumerV2 auto consume_page(flat_mutation_reader_v2& reader, - lw_shared_ptr> compaction_state, + lw_shared_ptr> compaction_state, const query::partition_slice& slice, Consumer&& consumer, uint64_t row_limit, @@ -110,7 +77,7 @@ auto consume_page(flat_mutation_reader_v2& reader, compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( + auto reader_consumer = compact_for_query_v2>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); @@ -211,7 +178,7 @@ public: /// instead. template class querier : public querier_base { - lw_shared_ptr> _compaction_state; + lw_shared_ptr> _compaction_state; std::optional _last_ckey; public: @@ -223,7 +190,7 @@ public: const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) : querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr)) - , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { + , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } bool are_limits_reached() const { @@ -231,7 +198,7 @@ public: } template - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumerV2 auto consume_page(Consumer&& consumer, uint64_t row_limit, uint32_t partition_limit, diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 3dce71cac1..f82cffd676 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -45,7 +45,7 @@ public: _ck = cr.key(); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() {