mutation_fragment_stream_validator: validate end of stream in partition_key filter

Currently end of stream validation is done in the destructor,
but the validator may be destructed prematurely, e.g. on
exception, as seen in https://github.com/scylladb/scylla/issues/5215

This patch adds a on_end_of_stream() method explicitly called by
consume_pausable_in_thread.  Also, the respective concepts for
ParitionFilter, MutationFragmentFilter and a new on for the
on_end_of_stream method were unified as FlattenedConsumerFilter.

Refs #5215

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 506ff40bd447f00158c24859819d4bb06436c996)
This commit is contained in:
Benny Halevy
2019-10-24 14:23:54 +03:00
committed by Tomasz Grabiec
parent d5f53bc307
commit 79d5fed40b
2 changed files with 17 additions and 15 deletions

View File

@@ -943,12 +943,6 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const sch
fmr_logger.debug("[validator {}] Will validate {} monotonicity.", static_cast<void*>(this), compare_keys ? "keys" : "only partition regions");
}
mutation_fragment_stream_validator::~mutation_fragment_stream_validator() {
if (_prev_kind != mutation_fragment::kind::partition_end) {
on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast<void*>(this), _prev_kind));
}
}
bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv) {
auto kind = mv.mutation_fragment_kind();
auto pos = mv.position();
@@ -988,3 +982,9 @@ bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mv)
}
return true;
}
void mutation_fragment_stream_validator::on_end_of_stream() const {
if (_prev_kind != mutation_fragment::kind::partition_end) {
on_internal_error(fmr_logger, format("[validator {}] Stream ended with unclosed partition: {}", static_cast<const void*>(this), _prev_kind));
}
}

View File

@@ -58,13 +58,10 @@ GCC6_CONCEPT(
}
template<typename T>
concept bool PartitionFilter = requires(T filter, const dht::decorated_key& dk) {
concept bool FlattenedConsumerFilter = requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) {
{ filter(dk) } -> bool;
};
template<typename T>
concept bool MutationFragmentFilter = requires(T filter, const mutation_fragment& mf) {
{ filter(mf) } -> bool;
{ filter.on_end_of_stream() } -> void;
};
)
@@ -182,7 +179,7 @@ public:
template<typename Consumer, typename Filter>
GCC6_CONCEPT(
requires FlatMutationReaderConsumer<Consumer>() && PartitionFilter<Filter> && MutationFragmentFilter<Filter>
requires FlatMutationReaderConsumer<Consumer>() && FlattenedConsumerFilter<Filter>
)
// A variant of consume_pausable() that expects to be run in
// a seastar::thread.
@@ -195,6 +192,7 @@ public:
}
if (is_buffer_empty()) {
if (is_end_of_stream()) {
filter.on_end_of_stream();
return;
}
fill_buffer(timeout).get();
@@ -282,7 +280,7 @@ public:
template<typename Consumer, typename Filter>
GCC6_CONCEPT(
requires FlattenedConsumer<Consumer>() && PartitionFilter<Filter> && MutationFragmentFilter<Filter>
requires FlattenedConsumer<Consumer>() && FlattenedConsumerFilter<Filter>
)
// A variant of consumee() that expects to be run in a seastar::thread.
// Partitions for which filter(decorated_key) returns false are skipped
@@ -395,6 +393,8 @@ public:
bool operator()(const mutation_fragment& mf) const {
return _mutation_fragment_filter(mf);
}
void on_end_of_stream() const { }
};
struct no_filter {
@@ -405,11 +405,13 @@ public:
bool operator()(const mutation_fragment& mf) const {
return true;
}
void on_end_of_stream() const { }
};
template<typename Consumer, typename Filter>
GCC6_CONCEPT(
requires FlattenedConsumer<Consumer>() && PartitionFilter<Filter> && MutationFragmentFilter<Filter>
requires FlattenedConsumer<Consumer>() && FlattenedConsumerFilter<Filter>
)
auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout) {
return _impl->consume_in_thread(std::move(consumer), std::move(filter), timeout);
@@ -755,8 +757,8 @@ class mutation_fragment_stream_validator {
dht::decorated_key _prev_partition_key;
public:
mutation_fragment_stream_validator(const schema& s, bool compare_keys = false);
~mutation_fragment_stream_validator();
bool operator()(const dht::decorated_key& dk);
bool operator()(const mutation_fragment& mv);
void on_end_of_stream() const;
};