From c63e88d5561b7efc8ba16db6cfc54ede358ad7df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 2 Aug 2016 12:41:37 +0100 Subject: [PATCH] sstables: implement mutation_reader::impl::fast_forward_to() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch allows sstable readers to be fast forwarded without making it necessary to recreate the reader (and dropping all buffers in the process). It is built on top of index_reader and ability of data_consume_context to be fast forwarded. Signed-off-by: Paweł Dziepak --- sstables/partition.cc | 64 +++++++++++++++++++++++-------------------- sstables/row.cc | 21 ++++++++++---- sstables/sstables.cc | 1 + sstables/sstables.hh | 5 ++++ 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index f300360a73..b9cfffe12b 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -636,6 +636,7 @@ struct sstable_data_source { shared_sstable _sst; mp_row_consumer _consumer; data_consume_context _context; + std::unique_ptr _index; sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer) : _sst(std::move(sst)) @@ -643,18 +644,26 @@ struct sstable_data_source { , _context(_sst->data_consume_rows(_consumer)) { } - sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread) + sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread, std::unique_ptr index) : _sst(std::move(sst)) , _consumer(std::move(consumer)) , _context(_sst->data_consume_rows(_consumer, std::move(toread))) + , _index(std::move(index)) { } sstable_data_source(schema_ptr s, shared_sstable sst, const sstables::key& k, const io_priority_class& pc, const query::partition_slice& slice, sstable::disk_read_range toread) : _sst(std::move(sst)) , _consumer(k, s, slice, pc) - , _context(_sst->data_consume_rows(_consumer, std::move(toread))) + , _context(_sst->data_consume_single_partition(_consumer, std::move(toread))) { } + + ~sstable_data_source() { + if (_index) { + auto f = _index->close(); + f.handle_exception([index = std::move(_index)] (auto&&) { }); + } + } }; class sstable_streamed_mutation : public streamed_mutation::impl { @@ -983,6 +992,8 @@ sstables::sstable::find_disk_ranges( class mutation_reader::impl { private: + sstable* _sst; + const io_priority_class& _pc; schema_ptr _schema; lw_shared_ptr _ds; // For some reason std::function requires functors to be copyable and that's @@ -994,15 +1005,15 @@ private: public: impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread, const io_priority_class &pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, query::full_slice, pc) , _get_data_source([this, sst = std::move(sst), toread] { - auto ds = make_lw_shared(std::move(sst), std::move(_consumer), std::move(toread)); + auto ds = make_lw_shared(std::move(sst), std::move(_consumer), std::move(toread), std::unique_ptr()); return make_ready_future>(std::move(ds)); }) { } impl(shared_sstable sst, schema_ptr schema, const io_priority_class &pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, query::full_slice, pc) , _get_data_source([this, sst = std::move(sst)] { auto ds = make_lw_shared(std::move(sst), std::move(_consumer)); @@ -1010,20 +1021,19 @@ public: }) { } impl(shared_sstable sst, schema_ptr schema, - std::function()> start, - std::function()> end, + const query::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, slice, pc) - , _get_data_source([this, sst = std::move(sst), start = std::move(start), end = std::move(end)] () mutable { - return start().then([this, sst = std::move(sst), end = std::move(end)] (uint64_t start) mutable { - return end().then([this, sst = std::move(sst), start] (uint64_t end) mutable { - return make_lw_shared(std::move(sst), std::move(_consumer), sstable::disk_read_range{start, end}); - }); + , _get_data_source([this, &pr, sst = std::move(sst)] () mutable { + auto index = std::make_unique(_sst->get_index_reader(_pc)); + auto f = index->get_disk_read_range(*_schema, pr); + return f.then([this, index = std::move(index), sst = std::move(sst)] (sstable::disk_read_range drr) mutable { + return make_lw_shared(std::move(sst), std::move(_consumer), std::move(drr), std::move(index)); }); }) { } - impl() : _get_data_source() { } + impl() : _pc(default_priority_class()), _get_data_source() { } // Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy impl(impl&&) = delete; @@ -1043,6 +1053,12 @@ public: return do_read(); }); } + future<> fast_forward_to(const query::partition_range& pr) { + assert(_ds->_index); + return _ds->_index->get_disk_read_range(*_schema, pr).then([this] (sstable::disk_read_range drr) { + return _ds->_context.fast_forward_to(drr.start, drr.end); + }); + } private: future do_read() { return _ds->_context.read().then([this] { @@ -1072,6 +1088,9 @@ mutation_reader::mutation_reader(std::unique_ptr p) future mutation_reader::read() { return _pimpl->read(); } +future<> mutation_reader::fast_forward_to(const query::partition_range& pr) { + return _pimpl->fast_forward_to(pr); +} mutation_reader sstable::read_rows(schema_ptr schema, const io_priority_class& pc) { return std::make_unique(shared_from_this(), schema, pc); @@ -1101,23 +1120,8 @@ sstable::read_range_rows(schema_ptr schema, if (query::is_wrap_around(range, *schema)) { fail(unimplemented::cause::WRAP_AROUND); } - - auto start = [this, range, schema, &pc] { - return range.start() ? (range.start()->is_inclusive() - ? lower_bound(schema, range.start()->value(), pc) - : upper_bound(schema, range.start()->value(), pc)) - : make_ready_future(0); - }; - - auto end = [this, range, schema, &pc] { - return range.end() ? (range.end()->is_inclusive() - ? upper_bound(schema, range.end()->value(), pc) - : lower_bound(schema, range.end()->value(), pc)) - : make_ready_future(data_size()); - }; - return std::make_unique( - shared_from_this(), std::move(schema), std::move(start), std::move(end), slice, pc); + shared_from_this(), std::move(schema), range, slice, pc); } diff --git a/sstables/row.cc b/sstables/row.cc index a598891abb..6a92d5d75e 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -404,6 +404,9 @@ public: future<> read() { return _ctx->consume_input(*_ctx); } + void fast_forward_to(uint64_t begin, uint64_t end) { + _ctx->fast_forward_to(begin, end); + } }; data_consume_context::~data_consume_context() = default; @@ -418,19 +421,25 @@ data_consume_context::data_consume_context(std::unique_ptr p) : _pimpl(std future<> data_consume_context::read() { return _pimpl->read(); } +void data_consume_context::fast_forward_to(uint64_t begin, uint64_t end) { + return _pimpl->fast_forward_to(begin, end); +} data_consume_context sstable::data_consume_rows( row_consumer& consumer, sstable::disk_read_range toread) { - // TODO: The second "end - start" below is redundant: The first one tells - // data_stream() to stop at the "end" byte, which allows optimal read- - // ahead and avoiding over-read at the end. The second one tells the - // consumer to stop at exactly the same place, and forces the consumer - // to maintain its own byte count. return std::make_unique(shared_from_this(), - consumer, data_stream(toread.start, toread.end - toread.start, + consumer, data_stream(toread.start, data_size() - toread.start, consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri); } +data_consume_context sstable::data_consume_single_partition( + row_consumer& consumer, sstable::disk_read_range toread) { + return std::make_unique(shared_from_this(), + consumer, data_stream(toread.start, toread.end - toread.start, + consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri); +} + + data_consume_context sstable::data_consume_rows(row_consumer& consumer) { return data_consume_rows(consumer, {0, data_size()}); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 619b0c0ff3..604bd524fa 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2055,6 +2055,7 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior options.buffer_size = sstable_buffer_size; options.io_priority_class = pc; options.read_ahead = 4; + options.dynamic_adjustments = make_lw_shared(); if (_compression) { return make_compressed_file_input_stream(_data_file, &_compression, pos, len, std::move(options)); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 38db421847..97bb2b263c 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -75,6 +75,7 @@ class data_consume_context { friend class sstable; public: future<> read(); + void fast_forward_to(uint64_t begin, uint64_t end); // Define (as defaults) the destructor and move operations in the source // file, so here we don't need to know the incomplete impl type. ~data_consume_context(); @@ -102,6 +103,7 @@ class mutation_reader { friend class sstable; public: future read(); + future<> fast_forward_to(const query::partition_range&); // Define (as defaults) the destructor and move operations in the source // file, so here we don't need to know the incomplete impl type. ~mutation_reader(); @@ -210,6 +212,8 @@ public: // progress (i.e., returned a future which hasn't completed yet). data_consume_context data_consume_rows(row_consumer& consumer, disk_read_range toread); + data_consume_context data_consume_single_partition(row_consumer& consumer, disk_read_range toread); + // Like data_consume_rows() with bounds, but iterates over whole range data_consume_context data_consume_rows(row_consumer& consumer); @@ -662,6 +666,7 @@ public: friend class components_writer; friend class sstable_writer; friend class index_reader; + friend class mutation_reader::impl; }; using shared_sstable = lw_shared_ptr;