Merge "Multishard combined reader" from Botond
" The multishard combined reader provides a convenient flat_mutation_reader implementation that takes care of efficiently reading a range from all shards that own data belonging to the range. All this happens transparently, the user of the reader need only pass a factory function to the multishard reader which it uses to create remote readers when needed. These remote readers will then be managed through foreign reader which abstracts away the fact that the reader is located on a remote shard. Sub readers are created for the entire read range, meaning they are free to cross shard-range limits to fill their buffer. The output of these sub readers is merged in a round-robin manner, the same way data is distributed among shards. The multishard reader will move to the next shard's reader whenever it encounters a partition whose token is after the delimiter token. To improve throughput and latency two levels of read-ahead is employed. One in foreign_reader, which will try to fill the remote shard reader's buffer in the background, in parallel to processing the results on the local shard. And one in the multishard reader itself which will exponentially increase concurrency whenever a sub-reader's buffer becomes empty. But only if this happened after crossing a shard boundary. This is important because there is no point in increasing concurrency if a single sub reader can fill the multishard readers' buffer. " * 'multishard-reader/v3' of https://github.com/denesb/scylla: Add unit tests for multishard_combined_reader Add multishard_combined_reader flat_mutation_reader: add peek_buffer() Add unit tests for foreign_reader forwardable reader: implement fast_forward_to(position_in_partition) Add foreign_reader flat_mutation_reader: add detach_buffer()
This commit is contained in:
@@ -292,6 +292,11 @@ public:
|
||||
virtual size_t buffer_size() const {
|
||||
return _buffer_size;
|
||||
}
|
||||
|
||||
circular_buffer<mutation_fragment> detach_buffer() {
|
||||
_buffer_size = 0;
|
||||
return std::exchange(_buffer, {});
|
||||
}
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
@@ -408,6 +413,9 @@ public:
|
||||
return peek();
|
||||
});
|
||||
}
|
||||
// A peek at the next fragment in the buffer.
|
||||
// Cannot be called if is_buffer_empty() returns true.
|
||||
const mutation_fragment& peek_buffer() const { return _impl->_buffer.front(); }
|
||||
// The actual buffer size of the reader.
|
||||
// Altough we consistently refer to this as buffer size throught the code
|
||||
// we really use "buffer size" as the size of the collective memory
|
||||
@@ -415,6 +423,14 @@ public:
|
||||
size_t buffer_size() const {
|
||||
return _impl->buffer_size();
|
||||
}
|
||||
// Detach the internal buffer of the reader.
|
||||
// Roughly equivalent to depleting it by calling pop_mutation_fragment()
|
||||
// until is_buffer_empty() returns true.
|
||||
// The reader will need to allocate a new buffer on the next fill_buffer()
|
||||
// call.
|
||||
circular_buffer<mutation_fragment> detach_buffer() {
|
||||
return _impl->detach_buffer();
|
||||
}
|
||||
};
|
||||
|
||||
using flat_mutation_reader_opt = optimized_optional<flat_mutation_reader>;
|
||||
@@ -493,7 +509,9 @@ flat_mutation_reader transform(flat_mutation_reader r, T t) {
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::bad_function_call();
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
virtual size_t buffer_size() const override {
|
||||
return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size();
|
||||
|
||||
@@ -839,3 +839,426 @@ mutation_source make_combined_mutation_source(std::vector<mutation_source> adden
|
||||
return make_combined_reader(s, std::move(rd), fwd);
|
||||
});
|
||||
}
|
||||
|
||||
/// See make_foreign_reader() for description.
|
||||
class foreign_reader : public flat_mutation_reader::impl {
|
||||
template <typename T>
|
||||
using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
|
||||
|
||||
foreign_unique_ptr<flat_mutation_reader> _reader;
|
||||
foreign_unique_ptr<future<>> _read_ahead_future;
|
||||
// Increase this counter every time next_partition() is called.
|
||||
// These pending calls will be executed the next time we go to the remote
|
||||
// reader (a fill_buffer() or a fast_forward_to() call).
|
||||
unsigned _pending_next_partition = 0;
|
||||
streamed_mutation::forwarding _fwd_sm;
|
||||
|
||||
// Forward an operation to the reader on the remote shard.
|
||||
// If the remote reader has an ongoing read-ahead, bring it to the
|
||||
// foreground (wait on it) and execute the operation after.
|
||||
// After the operation completes, kick off a new read-ahead (fill_buffer())
|
||||
// and move it to the background (save it's future but don't wait on it
|
||||
// now). If all works well read-aheads complete by the next operation and
|
||||
// we don't have to wait on the remote reader filling its buffer.
|
||||
template <typename Operation, typename Result = futurize_t<std::result_of_t<Operation()>>>
|
||||
Result forward_operation(db::timeout_clock::time_point timeout, Operation op) {
|
||||
auto read_ahead_future = _read_ahead_future ? _read_ahead_future.get() : nullptr;
|
||||
return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), read_ahead_future,
|
||||
pending_next_partition = std::exchange(_pending_next_partition, 0), timeout, op = std::move(op)] () mutable {
|
||||
auto exec_op_and_read_ahead = [=] () mutable {
|
||||
while (pending_next_partition) {
|
||||
--pending_next_partition;
|
||||
reader->next_partition();
|
||||
}
|
||||
return op().then([=] (auto... results) {
|
||||
return make_ready_future<foreign_unique_ptr<future<>>, decltype(results)...>(
|
||||
std::make_unique<future<>>(reader->fill_buffer(timeout)), std::move(results)...);
|
||||
});
|
||||
};
|
||||
if (read_ahead_future) {
|
||||
return read_ahead_future->then(std::move(exec_op_and_read_ahead));
|
||||
} else {
|
||||
return exec_op_and_read_ahead();
|
||||
}
|
||||
}).then([this] (foreign_unique_ptr<future<>> new_read_ahead_future, auto... results) {
|
||||
_read_ahead_future = std::move(new_read_ahead_future);
|
||||
return make_ready_future<decltype(results)...>(std::move(results)...);
|
||||
});
|
||||
}
|
||||
public:
|
||||
foreign_reader(schema_ptr schema,
|
||||
foreign_unique_ptr<flat_mutation_reader> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
~foreign_reader();
|
||||
|
||||
// this is captured.
|
||||
foreign_reader(const foreign_reader&) = delete;
|
||||
foreign_reader& operator=(const foreign_reader&) = delete;
|
||||
foreign_reader(foreign_reader&&) = delete;
|
||||
foreign_reader& operator=(foreign_reader&&) = delete;
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
|
||||
virtual void next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override;
|
||||
};
|
||||
|
||||
foreign_reader::foreign_reader(schema_ptr schema,
|
||||
foreign_unique_ptr<flat_mutation_reader> reader,
|
||||
streamed_mutation::forwarding fwd_sm)
|
||||
: impl(std::move(schema))
|
||||
, _reader(std::move(reader))
|
||||
, _fwd_sm(fwd_sm) {
|
||||
}
|
||||
|
||||
foreign_reader::~foreign_reader() {
|
||||
smp::submit_to(_reader.get_owner_shard(), [reader = std::move(_reader), read_ahead_future = std::move(_read_ahead_future)] () mutable {
|
||||
if (read_ahead_future) {
|
||||
return read_ahead_future->finally([r = std::move(reader)] {});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
future<> foreign_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
if (_end_of_stream || is_buffer_full()) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
using fragment_buffer = circular_buffer<mutation_fragment>;
|
||||
|
||||
return forward_operation(timeout, [reader = _reader.get(), timeout] () {
|
||||
auto f = reader->is_buffer_empty() ? reader->fill_buffer(timeout) : make_ready_future<>();
|
||||
return f.then([=] {
|
||||
return make_ready_future<foreign_unique_ptr<fragment_buffer>, bool>(
|
||||
std::make_unique<fragment_buffer>(reader->detach_buffer()),
|
||||
reader->is_end_of_stream());
|
||||
});
|
||||
}).then([this] (foreign_unique_ptr<fragment_buffer> buffer, bool end_of_steam) mutable {
|
||||
_end_of_stream = end_of_steam;
|
||||
for (const auto& mf : *buffer) {
|
||||
// Need a copy since the mf is on the remote shard.
|
||||
push_mutation_fragment(mf);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void foreign_reader::next_partition() {
|
||||
if (_fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
++_pending_next_partition;
|
||||
} else {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_end_of_stream = false;
|
||||
++_pending_next_partition;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return forward_operation(timeout, [reader = _reader.get(), &pr, timeout] () {
|
||||
return reader->fast_forward_to(pr, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<> foreign_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
return forward_operation(timeout, [reader = _reader.get(), pr = std::move(pr), timeout] () {
|
||||
return reader->fast_forward_to(std::move(pr), timeout);
|
||||
});
|
||||
}
|
||||
|
||||
flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
streamed_mutation::forwarding fwd_sm) {
|
||||
return make_flat_mutation_reader<foreign_reader>(std::move(schema), std::move(reader), fwd_sm);
|
||||
}
|
||||
|
||||
// See make_foreign_reader() for description.
|
||||
class multishard_combining_reader : public flat_mutation_reader::impl {
|
||||
const dht::i_partitioner& _partitioner;
|
||||
const dht::partition_range* _pr;
|
||||
remote_reader_factory _reader_factory;
|
||||
const streamed_mutation::forwarding _fwd_sm;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
|
||||
// Thin wrapper around a flat_mutation_reader (foreign_reader) that
|
||||
// lazy-creates the reader when needed and transparently keeps track
|
||||
// of read-ahead.
|
||||
class shard_reader {
|
||||
multishard_combining_reader& _parent;
|
||||
unsigned _shard;
|
||||
// We could use an optional here but some methods (due to the context
|
||||
// they are called from) know the reader was already created and by
|
||||
// keeping a separate flag we can omit the check in these cases.
|
||||
flat_mutation_reader _reader;
|
||||
bool _reader_created = false;
|
||||
unsigned _pending_next_partition = 0;
|
||||
std::optional<future<>> _read_ahead;
|
||||
|
||||
public:
|
||||
shard_reader(multishard_combining_reader& parent, unsigned shard)
|
||||
: _parent(parent)
|
||||
, _shard(shard)
|
||||
, _reader(make_empty_flat_reader(_parent._schema)) {
|
||||
}
|
||||
|
||||
shard_reader(shard_reader&&) = default;
|
||||
shard_reader& operator=(shard_reader&&) = delete;
|
||||
|
||||
shard_reader(const shard_reader&) = delete;
|
||||
shard_reader& operator=(const shard_reader&) = delete;
|
||||
|
||||
~shard_reader() {
|
||||
if (_read_ahead) {
|
||||
// Avoid errors in the logs about ignored exceptional future.
|
||||
_read_ahead->finally([] {});
|
||||
}
|
||||
}
|
||||
|
||||
// These methods assume the reader is already created.
|
||||
bool is_end_of_stream() const {
|
||||
return _reader.is_end_of_stream();
|
||||
}
|
||||
bool is_buffer_empty() const {
|
||||
return _reader.is_buffer_empty();
|
||||
}
|
||||
mutation_fragment pop_mutation_fragment() {
|
||||
return _reader.pop_mutation_fragment();
|
||||
}
|
||||
const mutation_fragment& peek_buffer() const {
|
||||
return _reader.peek_buffer();
|
||||
}
|
||||
future<> fill_buffer(db::timeout_clock::time_point timeout);
|
||||
void read_ahead(db::timeout_clock::time_point timeout);
|
||||
bool is_read_ahead_in_progress() const {
|
||||
return _read_ahead.has_value();
|
||||
}
|
||||
|
||||
// These methods don't assume the reader is already created.
|
||||
void next_partition();
|
||||
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
|
||||
future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout);
|
||||
future<> create_reader();
|
||||
explicit operator bool() const {
|
||||
return _reader_created;
|
||||
}
|
||||
bool done() const {
|
||||
return _reader_created && _reader.is_buffer_empty() && _reader.is_end_of_stream();
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<shard_reader> _shard_readers;
|
||||
unsigned _current_shard;
|
||||
dht::token _next_token;
|
||||
bool _crossed_shards;
|
||||
unsigned _concurrency = 1;
|
||||
|
||||
void move_to_next_shard();
|
||||
future<> handle_empty_reader_buffer(db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
multishard_combining_reader(schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const dht::i_partitioner& partitioner,
|
||||
remote_reader_factory reader_factory,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
// this is captured.
|
||||
multishard_combining_reader(const multishard_combining_reader&) = delete;
|
||||
multishard_combining_reader& operator=(const multishard_combining_reader&) = delete;
|
||||
multishard_combining_reader(multishard_combining_reader&&) = delete;
|
||||
multishard_combining_reader& operator=(multishard_combining_reader&&) = delete;
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
|
||||
virtual void next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override;
|
||||
};
|
||||
|
||||
future<> multishard_combining_reader::shard_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
if (_read_ahead) {
|
||||
return *std::exchange(_read_ahead, std::nullopt);
|
||||
}
|
||||
return _reader.fill_buffer();
|
||||
}
|
||||
|
||||
void multishard_combining_reader::shard_reader::read_ahead(db::timeout_clock::time_point timeout) {
|
||||
_read_ahead.emplace(_reader.fill_buffer(timeout));
|
||||
}
|
||||
|
||||
void multishard_combining_reader::shard_reader::next_partition() {
|
||||
if (_reader_created) {
|
||||
_reader.next_partition();
|
||||
} else {
|
||||
++_pending_next_partition;
|
||||
}
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
if (_reader_created) {
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
}
|
||||
// No need to fast-forward uncreated readers, they will be passed the new
|
||||
// range when created.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::shard_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
|
||||
if (_reader_created) {
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
}
|
||||
return create_reader().then([this, pr = std::move(pr), timeout] {
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::shard_reader::create_reader() {
|
||||
if (_reader_created) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _parent._reader_factory(_shard, *_parent._pr, _parent._fwd_sm, _parent._fwd_mr).then(
|
||||
[this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>>&& r) mutable {
|
||||
_reader = make_foreign_reader(_parent._schema, std::move(r), _parent._fwd_sm);
|
||||
while (_pending_next_partition) {
|
||||
--_pending_next_partition;
|
||||
_reader.next_partition();
|
||||
}
|
||||
_reader_created = true;
|
||||
});
|
||||
}
|
||||
|
||||
void multishard_combining_reader::move_to_next_shard() {
|
||||
_crossed_shards = true;
|
||||
_current_shard = (_current_shard + 1) % _partitioner.shard_count();
|
||||
_next_token = _partitioner.token_for_next_shard(_next_token, _current_shard);
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::handle_empty_reader_buffer(db::timeout_clock::time_point timeout) {
|
||||
auto& reader = _shard_readers[_current_shard];
|
||||
|
||||
if (reader.is_end_of_stream()) {
|
||||
if (_fwd_sm || std::all_of(_shard_readers.begin(), _shard_readers.end(), std::mem_fn(&shard_reader::done))) {
|
||||
_end_of_stream = true;
|
||||
} else {
|
||||
move_to_next_shard();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
} else if (reader.is_read_ahead_in_progress()) {
|
||||
return reader.fill_buffer(timeout);
|
||||
} else {
|
||||
// If we crossed shards and the next reader has an empty buffer we
|
||||
// double concurrency so the next time we cross shards we will have
|
||||
// more chances of hitting the reader's buffer.
|
||||
if (_crossed_shards) {
|
||||
_concurrency = std::min(_concurrency * 2, _partitioner.shard_count());
|
||||
|
||||
// If concurrency > 1 we kick-off concurrency-1 read-aheads in the
|
||||
// background. They will be brought to the foreground when we move
|
||||
// to their respective shard.
|
||||
for (unsigned i = 1; i < _concurrency; ++i) {
|
||||
_shard_readers[(_current_shard + i) % _partitioner.shard_count()].read_ahead(timeout);
|
||||
}
|
||||
}
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
multishard_combining_reader::multishard_combining_reader(schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const dht::i_partitioner& partitioner,
|
||||
remote_reader_factory reader_factory,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(s)
|
||||
, _partitioner(partitioner)
|
||||
, _pr(&pr)
|
||||
, _reader_factory(std::move(reader_factory))
|
||||
, _fwd_sm(fwd_sm)
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _current_shard(pr.start() ? _partitioner.shard_of(pr.start()->value().token()) : _partitioner.shard_of_minimum_token())
|
||||
, _next_token(_partitioner.token_for_next_shard(pr.start() ? pr.start()->value().token() : dht::minimum_token(),
|
||||
(_current_shard + 1) % _partitioner.shard_count())) {
|
||||
_shard_readers.reserve(_partitioner.shard_count());
|
||||
for (unsigned i = 0; i < _partitioner.shard_count(); ++i) {
|
||||
_shard_readers.emplace_back(*this, i);
|
||||
}
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
_crossed_shards = false;
|
||||
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] {
|
||||
if (!_shard_readers[_current_shard]) {
|
||||
return _shard_readers[_current_shard].create_reader();
|
||||
}
|
||||
auto& reader = _shard_readers[_current_shard];
|
||||
|
||||
if (reader.is_buffer_empty()) {
|
||||
return handle_empty_reader_buffer(timeout);
|
||||
}
|
||||
|
||||
while (!reader.is_buffer_empty() && !is_buffer_full()) {
|
||||
if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && mf.as_partition_start().key().token() >= _next_token) {
|
||||
move_to_next_shard();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
void multishard_combining_reader::next_partition() {
|
||||
if (_fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
_shard_readers[_current_shard].next_partition();
|
||||
} else {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_shard_readers[_current_shard].next_partition();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
if (pr.start()) {
|
||||
auto& t = pr.start()->value().token();
|
||||
_current_shard = _partitioner.shard_of(t);
|
||||
_next_token = _partitioner.token_for_next_shard(t, (_current_shard + 1) % _partitioner.shard_count());
|
||||
} else {
|
||||
_current_shard = _partitioner.shard_of_minimum_token();
|
||||
_next_token = _partitioner.token_for_next_shard(dht::minimum_token(), (_current_shard + 1) % _partitioner.shard_count());
|
||||
}
|
||||
_pr = ≺
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return parallel_for_each(_shard_readers, [this, timeout] (shard_reader& sr) {
|
||||
return sr.fast_forward_to(*_pr, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
if (is_buffer_empty()) {
|
||||
return _shard_readers[_current_shard].fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
flat_mutation_reader make_multishard_combining_reader(schema_ptr schema,
|
||||
const dht::partition_range& pr,
|
||||
const dht::i_partitioner& partitioner,
|
||||
remote_reader_factory reader_factory,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_flat_mutation_reader<multishard_combining_reader>(schema, pr, partitioner, std::move(reader_factory), fwd_sm, fwd_mr);
|
||||
}
|
||||
|
||||
@@ -370,3 +370,46 @@ template<typename FlattenedConsumer, typename... Args>
|
||||
stable_flattened_mutations_consumer<FlattenedConsumer> make_stable_flattened_mutations_consumer(Args&&... args) {
|
||||
return { std::make_unique<FlattenedConsumer>(std::forward<Args>(args)...) };
|
||||
}
|
||||
|
||||
/// Make a foreign_reader.
|
||||
///
|
||||
/// foreign_reader is a local representant of a reader located on a remote
|
||||
/// shard. Manages its lifecycle and takes care of seamlessly transferring
|
||||
/// produced fragments. Fragments are *copied* between the shards, a
|
||||
/// bufferful at a time.
|
||||
/// To maximize throughput read-ahead is used. After each fill_buffer() or
|
||||
/// fast_forward_to() a read-ahead (a fill_buffer() on the remote reader) is
|
||||
/// issued. This read-ahead runs in the background and is brough back to
|
||||
/// foreground on the next fill_buffer() or fast_forward_to() call.
|
||||
flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
using remote_reader_factory = noncopyable_function<future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>>(unsigned,
|
||||
const dht::partition_range&,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding)>;
|
||||
|
||||
/// Make a multishard_combining_reader.
|
||||
///
|
||||
/// multishard_combining_reader takes care of reading a range from all shards
|
||||
/// that own a subrange in the range. Readers are created on-demand with the
|
||||
/// supplied reader_factory. This factory function is expected to create an
|
||||
/// appropriate reader on the specified shard and return a foreign_ptr to it.
|
||||
///
|
||||
/// The read starts with a concurrency of one, that is the reader reads from a
|
||||
/// single shard at a time. The concurrency is exponentially increased (to a
|
||||
/// maximum of the number of shards) when a reader's buffer is empty after
|
||||
/// moving the next shard. This condition is important as we only wan't to
|
||||
/// increase concurrency for sparse tables that have little data and the reader
|
||||
/// has to move between shards often. When concurrency is > 1, the reader
|
||||
/// issues background read-aheads to the next shards so that by the time it
|
||||
/// needs to move to them they have the data ready.
|
||||
/// For dense tables (where we rarely cross shards) we rely on the
|
||||
/// foreign_reader to issue sufficient read-aheads on its own to avoid blocking.
|
||||
flat_mutation_reader make_multishard_combining_reader(schema_ptr schema,
|
||||
const dht::partition_range& pr,
|
||||
const dht::i_partitioner& partitioner,
|
||||
remote_reader_factory reader_factory,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <boost/range/irange.hpp>
|
||||
#include <boost/range/adaptor/uniqued.hpp>
|
||||
|
||||
#include "core/sleep.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -35,6 +36,7 @@
|
||||
#include "tests/simple_schema.hh"
|
||||
#include "tests/test_services.hh"
|
||||
#include "tests/mutation_source_test.hh"
|
||||
#include "tests/cql_test_env.hh"
|
||||
|
||||
#include "mutation_reader.hh"
|
||||
#include "schema_builder.hh"
|
||||
@@ -42,6 +44,7 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "database.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "schema_registry.hh"
|
||||
|
||||
static schema_ptr make_schema() {
|
||||
return schema_builder("ks", "cf")
|
||||
@@ -1380,3 +1383,202 @@ SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) {
|
||||
run_mutation_source_tests(make_combined_populator(3));
|
||||
});
|
||||
}
|
||||
|
||||
// Best run with SMP >= 2
|
||||
SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
|
||||
do_with_cql_env([] (cql_test_env& env) -> future<> {
|
||||
auto populate = [] (schema_ptr s, const std::vector<mutation>& mutations) {
|
||||
const auto remote_shard = (engine().cpu_id() + 1) % smp::count;
|
||||
auto remote_mt = smp::submit_to(remote_shard, [s = global_schema_ptr(s), &mutations] {
|
||||
auto mt = make_lw_shared<memtable>(s.get());
|
||||
|
||||
for (auto& mut : mutations) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
|
||||
return make_foreign(mt);
|
||||
}).get0();
|
||||
|
||||
auto reader_factory = [remote_shard, remote_mt = std::move(remote_mt)] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto remote_reader = smp::submit_to(remote_shard,
|
||||
[&, s = global_schema_ptr(s), fwd_sm, fwd_mr, trace_state = tracing::global_trace_state_ptr(trace_state)] {
|
||||
return make_foreign(std::make_unique<flat_mutation_reader>(remote_mt->make_flat_reader(s.get(),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
trace_state.get(),
|
||||
fwd_sm,
|
||||
fwd_mr)));
|
||||
}).get0();
|
||||
return make_foreign_reader(s, std::move(remote_reader), fwd_sm);
|
||||
};
|
||||
|
||||
auto reader_factory_ptr = make_lw_shared<decltype(reader_factory)>(std::move(reader_factory));
|
||||
|
||||
return mutation_source([reader_factory_ptr] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return (*reader_factory_ptr)(std::move(s), range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr);
|
||||
});
|
||||
};
|
||||
|
||||
run_mutation_source_tests(populate);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Shards tokens such that mutations are owned by shards in a round-robin manner.
|
||||
class dummy_partitioner : public dht::i_partitioner {
|
||||
dht::i_partitioner& _partitioner;
|
||||
std::vector<dht::token> _tokens;
|
||||
|
||||
public:
|
||||
dummy_partitioner(dht::i_partitioner& partitioner, const std::map<dht::token, std::vector<mutation>>& mutations_by_token)
|
||||
: i_partitioner(smp::count)
|
||||
, _partitioner(partitioner)
|
||||
, _tokens(boost::copy_range<std::vector<dht::token>>(mutations_by_token | boost::adaptors::map_keys)) {
|
||||
}
|
||||
|
||||
virtual dht::token midpoint(const dht::token& left, const dht::token& right) const override { return _partitioner.midpoint(left, right); }
|
||||
virtual dht::token get_token(const schema& s, partition_key_view key) override { return _partitioner.get_token(s, key); }
|
||||
virtual dht::token get_token(const sstables::key_view& key) override { return _partitioner.get_token(key); }
|
||||
virtual sstring to_sstring(const dht::token& t) const override { return _partitioner.to_sstring(t); }
|
||||
virtual dht::token from_sstring(const sstring& t) const override { return _partitioner.from_sstring(t); }
|
||||
virtual dht::token from_bytes(bytes_view bytes) const override { return _partitioner.from_bytes(bytes); }
|
||||
virtual dht::token get_random_token() override { return _partitioner.get_random_token(); }
|
||||
virtual bool preserves_order() override { return _partitioner.preserves_order(); }
|
||||
virtual std::map<dht::token, float> describe_ownership(const std::vector<dht::token>& sorted_tokens) override { return _partitioner.describe_ownership(sorted_tokens); }
|
||||
virtual data_type get_token_validator() override { return _partitioner.get_token_validator(); }
|
||||
virtual const sstring name() const override { return _partitioner.name(); }
|
||||
virtual unsigned shard_of(const dht::token& t) const override;
|
||||
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
|
||||
virtual int tri_compare(dht::token_view t1, dht::token_view t2) const override { return _partitioner.tri_compare(t1, t2); }
|
||||
};
|
||||
|
||||
unsigned dummy_partitioner::shard_of(const dht::token& t) const {
|
||||
auto it = boost::find(_tokens, t);
|
||||
// Unknown tokens are assigned to shard 0
|
||||
return it == _tokens.end() ? 0 : std::distance(_tokens.begin(), it) % _partitioner.shard_count();
|
||||
}
|
||||
|
||||
dht::token dummy_partitioner::token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans) const {
|
||||
// Find the first token that belongs to `shard` and is larger than `t`
|
||||
auto it = std::find_if(_tokens.begin(), _tokens.end(), [this, &t, shard] (const dht::token& shard_token) {
|
||||
return shard_token > t && shard_of(shard_token) == shard;
|
||||
});
|
||||
|
||||
if (it == _tokens.end()) {
|
||||
return dht::maximum_token();
|
||||
}
|
||||
|
||||
--spans;
|
||||
|
||||
while (spans) {
|
||||
if (std::distance(it, _tokens.end()) <= _partitioner.shard_count()) {
|
||||
return dht::maximum_token();
|
||||
}
|
||||
it += _partitioner.shard_count();
|
||||
--spans;
|
||||
}
|
||||
|
||||
return *it;
|
||||
}
|
||||
|
||||
// Best run with SMP >= 2
|
||||
SEASTAR_THREAD_TEST_CASE(test_multishard_combined_reader_as_mutation_source) {
|
||||
do_with_cql_env([] (cql_test_env& env) -> future<> {
|
||||
auto populate = [] (schema_ptr s, const std::vector<mutation>& mutations) {
|
||||
// We need to group mutations that have the same token so they land on the same shard.
|
||||
std::map<dht::token, std::vector<mutation>> mutations_by_token;
|
||||
|
||||
for (const auto& mut : mutations) {
|
||||
mutations_by_token[mut.token()].push_back(mut);
|
||||
}
|
||||
|
||||
auto partitioner = make_lw_shared<dummy_partitioner>(dht::global_partitioner(), mutations_by_token);
|
||||
|
||||
auto merged_mutations = boost::copy_range<std::vector<std::vector<mutation>>>(mutations_by_token | boost::adaptors::map_values);
|
||||
|
||||
auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
|
||||
for (unsigned shard = 0; shard < partitioner->shard_count(); ++shard) {
|
||||
auto remote_mt = smp::submit_to(shard, [shard, s = global_schema_ptr(s), &merged_mutations, partitioner = *partitioner] {
|
||||
auto mt = make_lw_shared<memtable>(s.get());
|
||||
|
||||
for (unsigned i = shard; i < merged_mutations.size(); i += partitioner.shard_count()) {
|
||||
for (auto& mut : merged_mutations[i]) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
}
|
||||
|
||||
return make_foreign(mt);
|
||||
}).get0();
|
||||
remote_memtables->emplace_back(std::move(remote_mt));
|
||||
}
|
||||
|
||||
return mutation_source([partitioner, remote_memtables] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto factory = [remote_memtables, s, &slice, &pc, trace_state] (unsigned shard,
|
||||
const dht::partition_range& range,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return smp::submit_to(shard, [mt = &*remote_memtables->at(shard), s = global_schema_ptr(s), &range, &slice, &pc,
|
||||
trace_state = tracing::global_trace_state_ptr(trace_state), fwd_sm, fwd_mr] () mutable {
|
||||
return make_foreign(std::make_unique<flat_mutation_reader>(mt->make_flat_reader(s.get(),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
trace_state.get(),
|
||||
fwd_sm,
|
||||
fwd_mr)));
|
||||
});
|
||||
};
|
||||
|
||||
return make_multishard_combining_reader(s, range, *partitioner, factory, fwd_sm, fwd_mr);
|
||||
});
|
||||
};
|
||||
|
||||
run_mutation_source_tests(populate);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Best run with SMP >= 2
|
||||
SEASTAR_THREAD_TEST_CASE(test_multishard_combined_reader_reading_empty_table) {
|
||||
do_with_cql_env([] (cql_test_env& env) -> future<> {
|
||||
std::vector<bool> shards_touched(smp::count, false);
|
||||
simple_schema s;
|
||||
auto factory = [&shards_touched, &s] (unsigned shard,
|
||||
const dht::partition_range& range,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
shards_touched[shard] = true;
|
||||
return smp::submit_to(shard, [gs = global_schema_ptr(s.schema())] () mutable {
|
||||
return make_foreign(std::make_unique<flat_mutation_reader>(make_empty_flat_reader(gs.get())));
|
||||
});
|
||||
};
|
||||
|
||||
assert_that(make_multishard_combining_reader(s.schema(), query::full_partition_range, dht::global_partitioner(), std::move(factory)))
|
||||
.produces_end_of_stream();
|
||||
|
||||
for (unsigned i = 0; i < smp::count; ++i) {
|
||||
BOOST_REQUIRE(shards_touched.at(i));
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user