diff --git a/sstables/partition.cc b/sstables/partition.cc index f300360a73..b9cfffe12b 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -636,6 +636,7 @@ struct sstable_data_source { shared_sstable _sst; mp_row_consumer _consumer; data_consume_context _context; + std::unique_ptr _index; sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer) : _sst(std::move(sst)) @@ -643,18 +644,26 @@ struct sstable_data_source { , _context(_sst->data_consume_rows(_consumer)) { } - sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread) + sstable_data_source(shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread, std::unique_ptr index) : _sst(std::move(sst)) , _consumer(std::move(consumer)) , _context(_sst->data_consume_rows(_consumer, std::move(toread))) + , _index(std::move(index)) { } sstable_data_source(schema_ptr s, shared_sstable sst, const sstables::key& k, const io_priority_class& pc, const query::partition_slice& slice, sstable::disk_read_range toread) : _sst(std::move(sst)) , _consumer(k, s, slice, pc) - , _context(_sst->data_consume_rows(_consumer, std::move(toread))) + , _context(_sst->data_consume_single_partition(_consumer, std::move(toread))) { } + + ~sstable_data_source() { + if (_index) { + auto f = _index->close(); + f.handle_exception([index = std::move(_index)] (auto&&) { }); + } + } }; class sstable_streamed_mutation : public streamed_mutation::impl { @@ -983,6 +992,8 @@ sstables::sstable::find_disk_ranges( class mutation_reader::impl { private: + sstable* _sst; + const io_priority_class& _pc; schema_ptr _schema; lw_shared_ptr _ds; // For some reason std::function requires functors to be copyable and that's @@ -994,15 +1005,15 @@ private: public: impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread, const io_priority_class &pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, query::full_slice, pc) , _get_data_source([this, sst = std::move(sst), toread] { - auto ds = make_lw_shared(std::move(sst), std::move(_consumer), std::move(toread)); + auto ds = make_lw_shared(std::move(sst), std::move(_consumer), std::move(toread), std::unique_ptr()); return make_ready_future>(std::move(ds)); }) { } impl(shared_sstable sst, schema_ptr schema, const io_priority_class &pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, query::full_slice, pc) , _get_data_source([this, sst = std::move(sst)] { auto ds = make_lw_shared(std::move(sst), std::move(_consumer)); @@ -1010,20 +1021,19 @@ public: }) { } impl(shared_sstable sst, schema_ptr schema, - std::function()> start, - std::function()> end, + const query::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc) - : _schema(schema) + : _sst(&*sst), _pc(pc), _schema(schema) , _consumer(schema, slice, pc) - , _get_data_source([this, sst = std::move(sst), start = std::move(start), end = std::move(end)] () mutable { - return start().then([this, sst = std::move(sst), end = std::move(end)] (uint64_t start) mutable { - return end().then([this, sst = std::move(sst), start] (uint64_t end) mutable { - return make_lw_shared(std::move(sst), std::move(_consumer), sstable::disk_read_range{start, end}); - }); + , _get_data_source([this, &pr, sst = std::move(sst)] () mutable { + auto index = std::make_unique(_sst->get_index_reader(_pc)); + auto f = index->get_disk_read_range(*_schema, pr); + return f.then([this, index = std::move(index), sst = std::move(sst)] (sstable::disk_read_range drr) mutable { + return make_lw_shared(std::move(sst), std::move(_consumer), std::move(drr), std::move(index)); }); }) { } - impl() : _get_data_source() { } + impl() : _pc(default_priority_class()), _get_data_source() { } // Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy impl(impl&&) = delete; @@ -1043,6 +1053,12 @@ public: return do_read(); }); } + future<> fast_forward_to(const query::partition_range& pr) { + assert(_ds->_index); + return _ds->_index->get_disk_read_range(*_schema, pr).then([this] (sstable::disk_read_range drr) { + return _ds->_context.fast_forward_to(drr.start, drr.end); + }); + } private: future do_read() { return _ds->_context.read().then([this] { @@ -1072,6 +1088,9 @@ mutation_reader::mutation_reader(std::unique_ptr p) future mutation_reader::read() { return _pimpl->read(); } +future<> mutation_reader::fast_forward_to(const query::partition_range& pr) { + return _pimpl->fast_forward_to(pr); +} mutation_reader sstable::read_rows(schema_ptr schema, const io_priority_class& pc) { return std::make_unique(shared_from_this(), schema, pc); @@ -1101,23 +1120,8 @@ sstable::read_range_rows(schema_ptr schema, if (query::is_wrap_around(range, *schema)) { fail(unimplemented::cause::WRAP_AROUND); } - - auto start = [this, range, schema, &pc] { - return range.start() ? (range.start()->is_inclusive() - ? lower_bound(schema, range.start()->value(), pc) - : upper_bound(schema, range.start()->value(), pc)) - : make_ready_future(0); - }; - - auto end = [this, range, schema, &pc] { - return range.end() ? (range.end()->is_inclusive() - ? upper_bound(schema, range.end()->value(), pc) - : lower_bound(schema, range.end()->value(), pc)) - : make_ready_future(data_size()); - }; - return std::make_unique( - shared_from_this(), std::move(schema), std::move(start), std::move(end), slice, pc); + shared_from_this(), std::move(schema), range, slice, pc); } diff --git a/sstables/row.cc b/sstables/row.cc index a598891abb..6a92d5d75e 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -404,6 +404,9 @@ public: future<> read() { return _ctx->consume_input(*_ctx); } + void fast_forward_to(uint64_t begin, uint64_t end) { + _ctx->fast_forward_to(begin, end); + } }; data_consume_context::~data_consume_context() = default; @@ -418,19 +421,25 @@ data_consume_context::data_consume_context(std::unique_ptr p) : _pimpl(std future<> data_consume_context::read() { return _pimpl->read(); } +void data_consume_context::fast_forward_to(uint64_t begin, uint64_t end) { + return _pimpl->fast_forward_to(begin, end); +} data_consume_context sstable::data_consume_rows( row_consumer& consumer, sstable::disk_read_range toread) { - // TODO: The second "end - start" below is redundant: The first one tells - // data_stream() to stop at the "end" byte, which allows optimal read- - // ahead and avoiding over-read at the end. The second one tells the - // consumer to stop at exactly the same place, and forces the consumer - // to maintain its own byte count. return std::make_unique(shared_from_this(), - consumer, data_stream(toread.start, toread.end - toread.start, + consumer, data_stream(toread.start, data_size() - toread.start, consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri); } +data_consume_context sstable::data_consume_single_partition( + row_consumer& consumer, sstable::disk_read_range toread) { + return std::make_unique(shared_from_this(), + consumer, data_stream(toread.start, toread.end - toread.start, + consumer.io_priority()), toread.start, toread.end - toread.start, toread.ri); +} + + data_consume_context sstable::data_consume_rows(row_consumer& consumer) { return data_consume_rows(consumer, {0, data_size()}); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 619b0c0ff3..604bd524fa 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2055,6 +2055,7 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior options.buffer_size = sstable_buffer_size; options.io_priority_class = pc; options.read_ahead = 4; + options.dynamic_adjustments = make_lw_shared(); if (_compression) { return make_compressed_file_input_stream(_data_file, &_compression, pos, len, std::move(options)); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 38db421847..97bb2b263c 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -75,6 +75,7 @@ class data_consume_context { friend class sstable; public: future<> read(); + void fast_forward_to(uint64_t begin, uint64_t end); // Define (as defaults) the destructor and move operations in the source // file, so here we don't need to know the incomplete impl type. ~data_consume_context(); @@ -102,6 +103,7 @@ class mutation_reader { friend class sstable; public: future read(); + future<> fast_forward_to(const query::partition_range&); // Define (as defaults) the destructor and move operations in the source // file, so here we don't need to know the incomplete impl type. ~mutation_reader(); @@ -210,6 +212,8 @@ public: // progress (i.e., returned a future which hasn't completed yet). data_consume_context data_consume_rows(row_consumer& consumer, disk_read_range toread); + data_consume_context data_consume_single_partition(row_consumer& consumer, disk_read_range toread); + // Like data_consume_rows() with bounds, but iterates over whole range data_consume_context data_consume_rows(row_consumer& consumer); @@ -662,6 +666,7 @@ public: friend class components_writer; friend class sstable_writer; friend class index_reader; + friend class mutation_reader::impl; }; using shared_sstable = lw_shared_ptr;