sstables: implement mutation_reader::impl::fast_forward_to()
This patch allows sstable readers to be fast forwarded without making it necessary to recreate the reader (and dropping all buffers in the process). It is built on top of index_reader and ability of data_consume_context to be fast forwarded. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -636,6 +636,7 @@ struct sstable_data_source {
|
||||
shared_sstable _sst;
|
||||
mp_row_consumer _consumer;
|
||||
data_consume_context _context;
|
||||
std::unique_ptr<index_reader> _index;
|
||||
|
||||
sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer)
|
||||
: _sst(std::move(sst))
|
||||
@@ -643,18 +644,26 @@ struct sstable_data_source {
|
||||
, _context(_sst->data_consume_rows(_consumer))
|
||||
{ }
|
||||
|
||||
sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread)
|
||||
sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread, std::unique_ptr<index_reader> index)
|
||||
: _sst(std::move(sst))
|
||||
, _consumer(std::move(consumer))
|
||||
, _context(_sst->data_consume_rows(_consumer, std::move(toread)))
|
||||
, _index(std::move(index))
|
||||
{ }
|
||||
|
||||
sstable_data_source(schema_ptr s, shared_sstable sst, const sstables::key& k, const io_priority_class& pc,
|
||||
const query::partition_slice& slice, sstable::disk_read_range toread)
|
||||
: _sst(std::move(sst))
|
||||
, _consumer(k, s, slice, pc)
|
||||
, _context(_sst->data_consume_rows(_consumer, std::move(toread)))
|
||||
, _context(_sst->data_consume_single_partition(_consumer, std::move(toread)))
|
||||
{ }
|
||||
|
||||
~sstable_data_source() {
|
||||
if (_index) {
|
||||
auto f = _index->close();
|
||||
f.handle_exception([index = std::move(_index)] (auto&&) { });
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class sstable_streamed_mutation : public streamed_mutation::impl {
|
||||
@@ -983,6 +992,8 @@ sstables::sstable::find_disk_ranges(
|
||||
|
||||
class mutation_reader::impl {
|
||||
private:
|
||||
sstable* _sst;
|
||||
const io_priority_class& _pc;
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<sstable_data_source> _ds;
|
||||
// For some reason std::function requires functors to be copyable and that's
|
||||
@@ -994,15 +1005,15 @@ private:
|
||||
public:
|
||||
impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread,
|
||||
const io_priority_class &pc)
|
||||
: _schema(schema)
|
||||
: _sst(&*sst), _pc(pc), _schema(schema)
|
||||
, _consumer(schema, query::full_slice, pc)
|
||||
, _get_data_source([this, sst = std::move(sst), toread] {
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(sst), std::move(_consumer), std::move(toread));
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(sst), std::move(_consumer), std::move(toread), std::unique_ptr<index_reader>());
|
||||
return make_ready_future<lw_shared_ptr<sstable_data_source>>(std::move(ds));
|
||||
}) { }
|
||||
impl(shared_sstable sst, schema_ptr schema,
|
||||
const io_priority_class &pc)
|
||||
: _schema(schema)
|
||||
: _sst(&*sst), _pc(pc), _schema(schema)
|
||||
, _consumer(schema, query::full_slice, pc)
|
||||
, _get_data_source([this, sst = std::move(sst)] {
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(sst), std::move(_consumer));
|
||||
@@ -1010,20 +1021,19 @@ public:
|
||||
}) { }
|
||||
impl(shared_sstable sst,
|
||||
schema_ptr schema,
|
||||
std::function<future<uint64_t>()> start,
|
||||
std::function<future<uint64_t>()> end,
|
||||
const query::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc)
|
||||
: _schema(schema)
|
||||
: _sst(&*sst), _pc(pc), _schema(schema)
|
||||
, _consumer(schema, slice, pc)
|
||||
, _get_data_source([this, sst = std::move(sst), start = std::move(start), end = std::move(end)] () mutable {
|
||||
return start().then([this, sst = std::move(sst), end = std::move(end)] (uint64_t start) mutable {
|
||||
return end().then([this, sst = std::move(sst), start] (uint64_t end) mutable {
|
||||
return make_lw_shared<sstable_data_source>(std::move(sst), std::move(_consumer), sstable::disk_read_range{start, end});
|
||||
});
|
||||
, _get_data_source([this, &pr, sst = std::move(sst)] () mutable {
|
||||
auto index = std::make_unique<index_reader>(_sst->get_index_reader(_pc));
|
||||
auto f = index->get_disk_read_range(*_schema, pr);
|
||||
return f.then([this, index = std::move(index), sst = std::move(sst)] (sstable::disk_read_range drr) mutable {
|
||||
return make_lw_shared<sstable_data_source>(std::move(sst), std::move(_consumer), std::move(drr), std::move(index));
|
||||
});
|
||||
}) { }
|
||||
impl() : _get_data_source() { }
|
||||
impl() : _pc(default_priority_class()), _get_data_source() { }
|
||||
|
||||
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
|
||||
impl(impl&&) = delete;
|
||||
@@ -1043,6 +1053,12 @@ public:
|
||||
return do_read();
|
||||
});
|
||||
}
|
||||
future<> fast_forward_to(const query::partition_range& pr) {
|
||||
assert(_ds->_index);
|
||||
return _ds->_index->get_disk_read_range(*_schema, pr).then([this] (sstable::disk_read_range drr) {
|
||||
return _ds->_context.fast_forward_to(drr.start, drr.end);
|
||||
});
|
||||
}
|
||||
private:
|
||||
future<streamed_mutation_opt> do_read() {
|
||||
return _ds->_context.read().then([this] {
|
||||
@@ -1072,6 +1088,9 @@ mutation_reader::mutation_reader(std::unique_ptr<impl> p)
|
||||
future<streamed_mutation_opt> mutation_reader::read() {
|
||||
return _pimpl->read();
|
||||
}
|
||||
future<> mutation_reader::fast_forward_to(const query::partition_range& pr) {
|
||||
return _pimpl->fast_forward_to(pr);
|
||||
}
|
||||
|
||||
mutation_reader sstable::read_rows(schema_ptr schema, const io_priority_class& pc) {
|
||||
return std::make_unique<mutation_reader::impl>(shared_from_this(), schema, pc);
|
||||
@@ -1101,23 +1120,8 @@ sstable::read_range_rows(schema_ptr schema,
|
||||
if (query::is_wrap_around(range, *schema)) {
|
||||
fail(unimplemented::cause::WRAP_AROUND);
|
||||
}
|
||||
|
||||
auto start = [this, range, schema, &pc] {
|
||||
return range.start() ? (range.start()->is_inclusive()
|
||||
? lower_bound(schema, range.start()->value(), pc)
|
||||
: upper_bound(schema, range.start()->value(), pc))
|
||||
: make_ready_future<uint64_t>(0);
|
||||
};
|
||||
|
||||
auto end = [this, range, schema, &pc] {
|
||||
return range.end() ? (range.end()->is_inclusive()
|
||||
? upper_bound(schema, range.end()->value(), pc)
|
||||
: lower_bound(schema, range.end()->value(), pc))
|
||||
: make_ready_future<uint64_t>(data_size());
|
||||
};
|
||||
|
||||
return std::make_unique<mutation_reader::impl>(
|
||||
shared_from_this(), std::move(schema), std::move(start), std::move(end), slice, pc);
|
||||
shared_from_this(), std::move(schema), range, slice, pc);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -404,6 +404,9 @@ public:
|
||||
future<> read() {
|
||||
return _ctx->consume_input(*_ctx);
|
||||
}
|
||||
void fast_forward_to(uint64_t begin, uint64_t end) {
|
||||
_ctx->fast_forward_to(begin, end);
|
||||
}
|
||||
};
|
||||
|
||||
data_consume_context::~data_consume_context() = default;
|
||||
@@ -418,19 +421,25 @@ data_consume_context::data_consume_context(std::unique_ptr<impl> p) : _pimpl(std
|
||||
future<> data_consume_context::read() {
|
||||
return _pimpl->read();
|
||||
}
|
||||
void data_consume_context::fast_forward_to(uint64_t begin, uint64_t end) {
|
||||
return _pimpl->fast_forward_to(begin, end);
|
||||
}
|
||||
|
||||
data_consume_context sstable::data_consume_rows(
|
||||
row_consumer& consumer, sstable::disk_read_range toread) {
|
||||
// TODO: The second "end - start" below is redundant: The first one tells
|
||||
// data_stream() to stop at the "end" byte, which allows optimal read-
|
||||
// ahead and avoiding over-read at the end. The second one tells the
|
||||
// consumer to stop at exactly the same place, and forces the consumer
|
||||
// to maintain its own byte count.
|
||||
return std::make_unique<data_consume_context::impl>(shared_from_this(),
|
||||
consumer, data_stream(toread.start, toread.end - toread.start,
|
||||
consumer, data_stream(toread.start, data_size() - toread.start,
|
||||
consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri);
|
||||
}
|
||||
|
||||
data_consume_context sstable::data_consume_single_partition(
|
||||
row_consumer& consumer, sstable::disk_read_range toread) {
|
||||
return std::make_unique<data_consume_context::impl>(shared_from_this(),
|
||||
consumer, data_stream(toread.start, toread.end - toread.start,
|
||||
consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri);
|
||||
}
|
||||
|
||||
|
||||
data_consume_context sstable::data_consume_rows(row_consumer& consumer) {
|
||||
return data_consume_rows(consumer, {0, data_size()});
|
||||
}
|
||||
|
||||
@@ -2055,6 +2055,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
options.read_ahead = 4;
|
||||
options.dynamic_adjustments = make_lw_shared<file_input_stream_history>();
|
||||
if (_compression) {
|
||||
return make_compressed_file_input_stream(_data_file, &_compression,
|
||||
pos, len, std::move(options));
|
||||
|
||||
@@ -75,6 +75,7 @@ class data_consume_context {
|
||||
friend class sstable;
|
||||
public:
|
||||
future<> read();
|
||||
void fast_forward_to(uint64_t begin, uint64_t end);
|
||||
// Define (as defaults) the destructor and move operations in the source
|
||||
// file, so here we don't need to know the incomplete impl type.
|
||||
~data_consume_context();
|
||||
@@ -102,6 +103,7 @@ class mutation_reader {
|
||||
friend class sstable;
|
||||
public:
|
||||
future<streamed_mutation_opt> read();
|
||||
future<> fast_forward_to(const query::partition_range&);
|
||||
// Define (as defaults) the destructor and move operations in the source
|
||||
// file, so here we don't need to know the incomplete impl type.
|
||||
~mutation_reader();
|
||||
@@ -210,6 +212,8 @@ public:
|
||||
// progress (i.e., returned a future which hasn't completed yet).
|
||||
data_consume_context data_consume_rows(row_consumer& consumer, disk_read_range toread);
|
||||
|
||||
data_consume_context data_consume_single_partition(row_consumer& consumer, disk_read_range toread);
|
||||
|
||||
// Like data_consume_rows() with bounds, but iterates over whole range
|
||||
data_consume_context data_consume_rows(row_consumer& consumer);
|
||||
|
||||
@@ -662,6 +666,7 @@ public:
|
||||
friend class components_writer;
|
||||
friend class sstable_writer;
|
||||
friend class index_reader;
|
||||
friend class mutation_reader::impl;
|
||||
};
|
||||
|
||||
using shared_sstable = lw_shared_ptr<sstable>;
|
||||
|
||||
Reference in New Issue
Block a user