From 79d5fed40b034f772445ee02a19e1f51911ffc0c Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 24 Oct 2019 14:23:54 +0300 Subject: [PATCH] mutation_fragment_stream_validator: validate end of stream in partition_key filter Currently end of stream validation is done in the destructor, but the validator may be destructed prematurely, e.g. on exception, as seen in https://github.com/scylladb/scylla/issues/5215 This patch adds a on_end_of_stream() method explicitly called by consume_pausable_in_thread. Also, the respective concepts for ParitionFilter, MutationFragmentFilter and a new on for the on_end_of_stream method were unified as FlattenedConsumerFilter. Refs #5215 Signed-off-by: Benny Halevy (cherry picked from commit 506ff40bd447f00158c24859819d4bb06436c996) --- flat_mutation_reader.cc | 12 ++++++------ flat_mutation_reader.hh | 20 +++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index a587608a6c..4317925b8b 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -943,12 +943,6 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const sch fmr_logger.debug("[validator {}] Will validate {} monotonicity.", static_cast(this), compare_keys ? "keys" : "only partition regions"); } -mutation_fragment_stream_validator::~mutation_fragment_stream_validator() { - if (_prev_kind != mutation_fragment::kind::partition_end) { - on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast(this), _prev_kind)); - } -} - bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv) { auto kind = mv.mutation_fragment_kind(); auto pos = mv.position(); @@ -988,3 +982,9 @@ bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv) } return true; } + +void mutation_fragment_stream_validator::on_end_of_stream() const { + if (_prev_kind != mutation_fragment::kind::partition_end) { + on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast(this), _prev_kind)); + } +} diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index bad79bfce6..0546249558 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -58,13 +58,10 @@ GCC6_CONCEPT( } template - concept bool PartitionFilter = requires(T filter, const dht::decorated_key& dk) { + concept bool FlattenedConsumerFilter = requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { { filter(dk) } -> bool; - }; - - template - concept bool MutationFragmentFilter = requires(T filter, const mutation_fragment& mf) { { filter(mf) } -> bool; + { filter.on_end_of_stream() } -> void; }; ) @@ -182,7 +179,7 @@ public: template GCC6_CONCEPT( - requires FlatMutationReaderConsumer() && PartitionFilter && MutationFragmentFilter + requires FlatMutationReaderConsumer() && FlattenedConsumerFilter ) // A variant of consume_pausable() that expects to be run in // a seastar::thread. @@ -195,6 +192,7 @@ public: } if (is_buffer_empty()) { if (is_end_of_stream()) { + filter.on_end_of_stream(); return; } fill_buffer(timeout).get(); @@ -282,7 +280,7 @@ public: template GCC6_CONCEPT( - requires FlattenedConsumer() && PartitionFilter && MutationFragmentFilter + requires FlattenedConsumer() && FlattenedConsumerFilter ) // A variant of consumee() that expects to be run in a seastar::thread. // Partitions for which filter(decorated_key) returns false are skipped @@ -395,6 +393,8 @@ public: bool operator()(const mutation_fragment& mf) const { return _mutation_fragment_filter(mf); } + + void on_end_of_stream() const { } }; struct no_filter { @@ -405,11 +405,13 @@ public: bool operator()(const mutation_fragment& mf) const { return true; } + + void on_end_of_stream() const { } }; template GCC6_CONCEPT( - requires FlattenedConsumer() && PartitionFilter && MutationFragmentFilter + requires FlattenedConsumer() && FlattenedConsumerFilter ) auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout) { return _impl->consume_in_thread(std::move(consumer), std::move(filter), timeout); @@ -755,8 +757,8 @@ class mutation_fragment_stream_validator { dht::decorated_key _prev_partition_key; public: mutation_fragment_stream_validator(const schema& s, bool compare_keys = false); - ~mutation_fragment_stream_validator(); bool operator()(const dht::decorated_key& dk); bool operator()(const mutation_fragment& mv); + void on_end_of_stream() const; };