diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index a587608a6c..4317925b8b 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -943,12 +943,6 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const sch fmr_logger.debug("[validator {}] Will validate {} monotonicity.", static_cast(this), compare_keys ? "keys" : "only partition regions"); } -mutation_fragment_stream_validator::~mutation_fragment_stream_validator() { - if (_prev_kind != mutation_fragment::kind::partition_end) { - on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast(this), _prev_kind)); - } -} - bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv) { auto kind = mv.mutation_fragment_kind(); auto pos = mv.position(); @@ -988,3 +982,9 @@ bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv) } return true; } + +void mutation_fragment_stream_validator::on_end_of_stream() const { + if (_prev_kind != mutation_fragment::kind::partition_end) { + on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast(this), _prev_kind)); + } +} diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index bad79bfce6..0546249558 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -58,13 +58,10 @@ GCC6_CONCEPT( } template - concept bool PartitionFilter = requires(T filter, const dht::decorated_key& dk) { + concept bool FlattenedConsumerFilter = requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { { filter(dk) } -> bool; - }; - - template - concept bool MutationFragmentFilter = requires(T filter, const mutation_fragment& mf) { { filter(mf) } -> bool; + { filter.on_end_of_stream() } -> void; }; ) @@ -182,7 +179,7 @@ public: template GCC6_CONCEPT( - requires FlatMutationReaderConsumer() && PartitionFilter && MutationFragmentFilter + requires FlatMutationReaderConsumer() && FlattenedConsumerFilter ) // A variant of consume_pausable() that expects to be run in // a seastar::thread. @@ -195,6 +192,7 @@ public: } if (is_buffer_empty()) { if (is_end_of_stream()) { + filter.on_end_of_stream(); return; } fill_buffer(timeout).get(); @@ -282,7 +280,7 @@ public: template GCC6_CONCEPT( - requires FlattenedConsumer() && PartitionFilter && MutationFragmentFilter + requires FlattenedConsumer() && FlattenedConsumerFilter ) // A variant of consumee() that expects to be run in a seastar::thread. // Partitions for which filter(decorated_key) returns false are skipped @@ -395,6 +393,8 @@ public: bool operator()(const mutation_fragment& mf) const { return _mutation_fragment_filter(mf); } + + void on_end_of_stream() const { } }; struct no_filter { @@ -405,11 +405,13 @@ public: bool operator()(const mutation_fragment& mf) const { return true; } + + void on_end_of_stream() const { } }; template GCC6_CONCEPT( - requires FlattenedConsumer() && PartitionFilter && MutationFragmentFilter + requires FlattenedConsumer() && FlattenedConsumerFilter ) auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout) { return _impl->consume_in_thread(std::move(consumer), std::move(filter), timeout); @@ -755,8 +757,8 @@ class mutation_fragment_stream_validator { dht::decorated_key _prev_partition_key; public: mutation_fragment_stream_validator(const schema& s, bool compare_keys = false); - ~mutation_fragment_stream_validator(); bool operator()(const dht::decorated_key& dk); bool operator()(const mutation_fragment& mv); + void on_end_of_stream() const; };