Add reader_selector to combined_mutation_reader

combined_mutation_reader now accepts as a constructor argument a
reader_selector instance whoose task is to create new readers on
each call to operator()() if needed and possible.
This way it is possible to control how readers are created through
different specializations of reader_selector.

The previous logic is refactored into list_reader_selector which
is using a pre-provided mutation_reader list and forwards all of them to
combined_mutation_reader at once.
This commit is contained in:
Botond Dénes
2017-08-01 15:40:42 +03:00
parent 94fc550e68
commit a6b9186cab
2 changed files with 92 additions and 15 deletions

View File

@@ -28,12 +28,75 @@
#include "utils/move.hh"
#include "stdx.hh"
// Dumb selector implementation for combined_mutation_reader that simply
// forwards it's list of readers.
class list_reader_selector : public reader_selector {
std::vector<mutation_reader> _readers;
public:
explicit list_reader_selector(std::vector<mutation_reader> readers)
: _readers(std::move(readers)) {
_selector_position = dht::minimum_token();
}
list_reader_selector(const list_reader_selector&) = delete;
list_reader_selector& operator=(const list_reader_selector&) = delete;
list_reader_selector(list_reader_selector&&) = default;
list_reader_selector& operator=(list_reader_selector&&) = default;
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const) override {
if (_readers.empty()) {
return {};
}
_selector_position = dht::maximum_token();
return std::exchange(_readers, {});
}
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range&) override {
return {};
}
};
void combined_mutation_reader::maybe_add_readers(const dht::token* const t) {
if (!_selector->has_new_readers(t)) {
return;
}
add_readers(_selector->create_new_readers(t));
}
void combined_mutation_reader::add_readers(std::vector<mutation_reader> new_readers) {
for (auto&& new_reader : new_readers) {
_readers.emplace_back(std::move(new_reader));
auto* r = &_readers.back();
_all_readers.emplace_back(r);
_next.emplace_back(r);
}
}
const dht::token* combined_mutation_reader::current_position() const {
if (_ptables.empty()) {
return nullptr;
}
return &_ptables.front().m.decorated_key().token();
}
future<> combined_mutation_reader::prepare_next() {
maybe_add_readers(current_position());
return parallel_for_each(_next, [this] (mutation_reader* mr) {
return (*mr)().then([this, mr] (streamed_mutation_opt next) {
if (next) {
_ptables.emplace_back(mutation_and_reader { std::move(*next), mr });
boost::range::push_heap(_ptables, &heap_compare);
} else {
auto it = std::remove(_all_readers.begin(), _all_readers.end(), mr);
_all_readers.erase(it);
_readers.remove_if([mr](auto& r) { return &r == mr; });
}
});
}).then([this] {
@@ -42,12 +105,12 @@ future<> combined_mutation_reader::prepare_next() {
}
future<streamed_mutation_opt> combined_mutation_reader::next() {
if (_current.empty() && !_next.empty()) {
if ((_current.empty() && !_next.empty()) || _selector->has_new_readers(current_position())) {
return prepare_next().then([this] { return next(); });
}
if (_ptables.empty()) {
return make_ready_future<streamed_mutation_opt>();
};
}
while (!_ptables.empty()) {
boost::range::pop_heap(_ptables, &heap_compare);
@@ -95,24 +158,19 @@ future<> combined_mutation_reader::fast_forward_to(std::vector<mutation_reader*>
});
}
combined_mutation_reader::combined_mutation_reader(std::vector<mutation_reader> readers)
: _readers(std::move(readers))
combined_mutation_reader::combined_mutation_reader(std::unique_ptr<reader_selector> selector)
: _selector(std::move(selector))
{
_next.reserve(_readers.size());
_current.reserve(_readers.size());
_ptables.reserve(_readers.size());
for (auto&& r : _readers) {
_next.emplace_back(&r);
}
_all_readers.assign(_next.begin(), _next.end());
}
future<> combined_mutation_reader::fast_forward_to(const dht::partition_range& pr) {
_ptables.clear();
_next.assign(_all_readers.begin(), _all_readers.end());
return parallel_for_each(_next, [this, &pr] (mutation_reader* mr) {
return mr->fast_forward_to(pr);
}).then([this, pr] {
add_readers(_selector->fast_forward_to(pr));
});
}
@@ -122,7 +180,7 @@ future<streamed_mutation_opt> combined_mutation_reader::operator()() {
mutation_reader
make_combined_reader(std::vector<mutation_reader> readers) {
return make_mutation_reader<combined_mutation_reader>(std::move(readers));
return make_mutation_reader<combined_mutation_reader>(std::make_unique<list_reader_selector>(std::move(readers)));
}
mutation_reader

View File

@@ -105,9 +105,25 @@ make_mutation_reader(Args&&... args) {
return mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
}
class reader_selector {
protected:
dht::token _selector_position;
public:
virtual ~reader_selector() = default;
// Call only if has_new_readers() returned true.
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) = 0;
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) = 0;
// Can be false-positive but never false-negative!
bool has_new_readers(const dht::token* const t) const noexcept {
return !_selector_position.is_maximum() && (!t || *t >= _selector_position);
}
};
// Combines multiple mutation_readers into one.
class combined_mutation_reader : public mutation_reader::impl {
std::vector<mutation_reader> _readers;
std::unique_ptr<reader_selector> _selector;
std::list<mutation_reader> _readers;
std::vector<mutation_reader*> _all_readers;
struct mutation_and_reader {
@@ -140,6 +156,9 @@ class combined_mutation_reader : public mutation_reader::impl {
std::vector<streamed_mutation> _current;
std::vector<mutation_reader*> _next;
private:
const dht::token* current_position() const;
void maybe_add_readers(const dht::token* const t);
void add_readers(std::vector<mutation_reader> new_readers);
future<> prepare_next();
// Produces next mutation or disengaged optional if there are no more.
future<streamed_mutation_opt> next();
@@ -148,7 +167,7 @@ protected:
void init_mutation_reader_set(std::vector<mutation_reader*>);
future<> fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr);
public:
combined_mutation_reader(std::vector<mutation_reader> readers);
combined_mutation_reader(std::unique_ptr<reader_selector> selector);
virtual future<streamed_mutation_opt> operator()() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
};