Add reader_selector to combined_mutation_reader
combined_mutation_reader now accepts as a constructor argument a reader_selector instance whoose task is to create new readers on each call to operator()() if needed and possible. This way it is possible to control how readers are created through different specializations of reader_selector. The previous logic is refactored into list_reader_selector which is using a pre-provided mutation_reader list and forwards all of them to combined_mutation_reader at once.
This commit is contained in:
@@ -28,12 +28,75 @@
|
||||
#include "utils/move.hh"
|
||||
#include "stdx.hh"
|
||||
|
||||
// Dumb selector implementation for combined_mutation_reader that simply
|
||||
// forwards it's list of readers.
|
||||
class list_reader_selector : public reader_selector {
|
||||
std::vector<mutation_reader> _readers;
|
||||
|
||||
public:
|
||||
explicit list_reader_selector(std::vector<mutation_reader> readers)
|
||||
: _readers(std::move(readers)) {
|
||||
_selector_position = dht::minimum_token();
|
||||
}
|
||||
|
||||
list_reader_selector(const list_reader_selector&) = delete;
|
||||
list_reader_selector& operator=(const list_reader_selector&) = delete;
|
||||
|
||||
list_reader_selector(list_reader_selector&&) = default;
|
||||
list_reader_selector& operator=(list_reader_selector&&) = default;
|
||||
|
||||
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const) override {
|
||||
if (_readers.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
_selector_position = dht::maximum_token();
|
||||
return std::exchange(_readers, {});
|
||||
}
|
||||
|
||||
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range&) override {
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
void combined_mutation_reader::maybe_add_readers(const dht::token* const t) {
|
||||
if (!_selector->has_new_readers(t)) {
|
||||
return;
|
||||
}
|
||||
|
||||
add_readers(_selector->create_new_readers(t));
|
||||
}
|
||||
|
||||
void combined_mutation_reader::add_readers(std::vector<mutation_reader> new_readers) {
|
||||
for (auto&& new_reader : new_readers) {
|
||||
_readers.emplace_back(std::move(new_reader));
|
||||
|
||||
auto* r = &_readers.back();
|
||||
_all_readers.emplace_back(r);
|
||||
_next.emplace_back(r);
|
||||
}
|
||||
}
|
||||
|
||||
const dht::token* combined_mutation_reader::current_position() const {
|
||||
if (_ptables.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return &_ptables.front().m.decorated_key().token();
|
||||
}
|
||||
|
||||
future<> combined_mutation_reader::prepare_next() {
|
||||
maybe_add_readers(current_position());
|
||||
|
||||
return parallel_for_each(_next, [this] (mutation_reader* mr) {
|
||||
return (*mr)().then([this, mr] (streamed_mutation_opt next) {
|
||||
if (next) {
|
||||
_ptables.emplace_back(mutation_and_reader { std::move(*next), mr });
|
||||
boost::range::push_heap(_ptables, &heap_compare);
|
||||
} else {
|
||||
auto it = std::remove(_all_readers.begin(), _all_readers.end(), mr);
|
||||
_all_readers.erase(it);
|
||||
_readers.remove_if([mr](auto& r) { return &r == mr; });
|
||||
}
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -42,12 +105,12 @@ future<> combined_mutation_reader::prepare_next() {
|
||||
}
|
||||
|
||||
future<streamed_mutation_opt> combined_mutation_reader::next() {
|
||||
if (_current.empty() && !_next.empty()) {
|
||||
if ((_current.empty() && !_next.empty()) || _selector->has_new_readers(current_position())) {
|
||||
return prepare_next().then([this] { return next(); });
|
||||
}
|
||||
if (_ptables.empty()) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
};
|
||||
}
|
||||
|
||||
while (!_ptables.empty()) {
|
||||
boost::range::pop_heap(_ptables, &heap_compare);
|
||||
@@ -95,24 +158,19 @@ future<> combined_mutation_reader::fast_forward_to(std::vector<mutation_reader*>
|
||||
});
|
||||
}
|
||||
|
||||
combined_mutation_reader::combined_mutation_reader(std::vector<mutation_reader> readers)
|
||||
: _readers(std::move(readers))
|
||||
combined_mutation_reader::combined_mutation_reader(std::unique_ptr<reader_selector> selector)
|
||||
: _selector(std::move(selector))
|
||||
{
|
||||
_next.reserve(_readers.size());
|
||||
_current.reserve(_readers.size());
|
||||
_ptables.reserve(_readers.size());
|
||||
|
||||
for (auto&& r : _readers) {
|
||||
_next.emplace_back(&r);
|
||||
}
|
||||
_all_readers.assign(_next.begin(), _next.end());
|
||||
}
|
||||
|
||||
future<> combined_mutation_reader::fast_forward_to(const dht::partition_range& pr) {
|
||||
_ptables.clear();
|
||||
_next.assign(_all_readers.begin(), _all_readers.end());
|
||||
|
||||
return parallel_for_each(_next, [this, &pr] (mutation_reader* mr) {
|
||||
return mr->fast_forward_to(pr);
|
||||
}).then([this, pr] {
|
||||
add_readers(_selector->fast_forward_to(pr));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -122,7 +180,7 @@ future<streamed_mutation_opt> combined_mutation_reader::operator()() {
|
||||
|
||||
mutation_reader
|
||||
make_combined_reader(std::vector<mutation_reader> readers) {
|
||||
return make_mutation_reader<combined_mutation_reader>(std::move(readers));
|
||||
return make_mutation_reader<combined_mutation_reader>(std::make_unique<list_reader_selector>(std::move(readers)));
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
|
||||
@@ -105,9 +105,25 @@ make_mutation_reader(Args&&... args) {
|
||||
return mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
class reader_selector {
|
||||
protected:
|
||||
dht::token _selector_position;
|
||||
public:
|
||||
virtual ~reader_selector() = default;
|
||||
// Call only if has_new_readers() returned true.
|
||||
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) = 0;
|
||||
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) = 0;
|
||||
|
||||
// Can be false-positive but never false-negative!
|
||||
bool has_new_readers(const dht::token* const t) const noexcept {
|
||||
return !_selector_position.is_maximum() && (!t || *t >= _selector_position);
|
||||
}
|
||||
};
|
||||
|
||||
// Combines multiple mutation_readers into one.
|
||||
class combined_mutation_reader : public mutation_reader::impl {
|
||||
std::vector<mutation_reader> _readers;
|
||||
std::unique_ptr<reader_selector> _selector;
|
||||
std::list<mutation_reader> _readers;
|
||||
std::vector<mutation_reader*> _all_readers;
|
||||
|
||||
struct mutation_and_reader {
|
||||
@@ -140,6 +156,9 @@ class combined_mutation_reader : public mutation_reader::impl {
|
||||
std::vector<streamed_mutation> _current;
|
||||
std::vector<mutation_reader*> _next;
|
||||
private:
|
||||
const dht::token* current_position() const;
|
||||
void maybe_add_readers(const dht::token* const t);
|
||||
void add_readers(std::vector<mutation_reader> new_readers);
|
||||
future<> prepare_next();
|
||||
// Produces next mutation or disengaged optional if there are no more.
|
||||
future<streamed_mutation_opt> next();
|
||||
@@ -148,7 +167,7 @@ protected:
|
||||
void init_mutation_reader_set(std::vector<mutation_reader*>);
|
||||
future<> fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr);
|
||||
public:
|
||||
combined_mutation_reader(std::vector<mutation_reader> readers);
|
||||
combined_mutation_reader(std::unique_ptr<reader_selector> selector);
|
||||
virtual future<streamed_mutation_opt> operator()() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user