make_flat_multi_range_reader: add generator overload

Allows creating a multi range reader from an arbitrary callable that
return std::optional<dht::partition_range>. The callable is expected to
return a new range on each call, such that passing each successive range
to `flat_mutation_reader::fast_forward_to` is valid. When exhausted the
callable is expected to return std::nullopt.
This commit is contained in:
Botond Dénes
2018-09-25 11:09:21 +03:00
parent 8c5387890d
commit 39bfd5d1df
2 changed files with 65 additions and 5 deletions

View File

@@ -641,12 +641,17 @@ class flat_multi_range_mutation_reader : public flat_mutation_reader::impl {
}
public:
flat_multi_range_mutation_reader(schema_ptr s, mutation_source source, Generator generator,
const query::partition_slice& slice, const io_priority_class& pc,
tracing::trace_state_ptr trace_state)
flat_multi_range_mutation_reader(
schema_ptr s,
mutation_source source,
const dht::partition_range& first_range,
Generator generator,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state)
: impl(s)
, _generator(std::move(generator))
, _reader(source.make_reader(s, *(*_generator)(), slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
, _reader(source.make_reader(s, first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
{
}
@@ -723,7 +728,52 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
return source.make_reader(std::move(s), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
} else {
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(source),
adapter(ranges.cbegin(), ranges.cend()), slice, pc, std::move(trace_state));
ranges.front(), adapter(std::next(ranges.cbegin()), ranges.cend()), slice, pc, std::move(trace_state));
}
}
flat_mutation_reader
make_flat_multi_range_reader(
schema_ptr s,
mutation_source source,
std::function<std::optional<dht::partition_range>()> generator,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
class adapter {
std::function<std::optional<dht::partition_range>()> _generator;
std::unique_ptr<dht::partition_range> _previous;
std::unique_ptr<dht::partition_range> _current;
public:
explicit adapter(std::function<std::optional<dht::partition_range>()> generator)
: _generator(std::move(generator))
, _previous(std::make_unique<dht::partition_range>(dht::partition_range::make_singular({dht::token{}, partition_key::make_empty()})))
, _current(std::make_unique<dht::partition_range>(dht::partition_range::make_singular({dht::token{}, partition_key::make_empty()}))) {
}
const dht::partition_range* operator()() {
std::swap(_current, _previous);
if (auto next = _generator()) {
*_current = std::move(*next);
return _current.get();
} else {
return nullptr;
}
}
};
auto adapted_generator = adapter(std::move(generator));
auto* first_range = adapted_generator();
if (!first_range) {
if (fwd_mr) {
return make_flat_mutation_reader<forwardable_empty_mutation_reader>(std::move(s), std::move(source), slice, pc, std::move(trace_state));
} else {
return make_empty_flat_reader(std::move(s));
}
} else {
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(source),
*first_range, std::move(adapted_generator), slice, pc, std::move(trace_state));
}
}

View File

@@ -616,6 +616,16 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
tracing::trace_state_ptr trace_state = nullptr,
flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes);
flat_mutation_reader
make_flat_multi_range_reader(
schema_ptr s,
mutation_source source,
std::function<std::optional<dht::partition_range>()> generator,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes);
flat_mutation_reader
make_flat_mutation_reader_from_fragments(schema_ptr, std::deque<mutation_fragment>);