diff --git a/sstables/row.cc b/sstables/row.cc index e67efcd17a..8fcc378992 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -532,8 +532,9 @@ future<> data_consume_context::read() { data_consume_context sstable::data_consume_rows( row_consumer& consumer, uint64_t start, uint64_t end) { + auto estimated_size = std::min(uint64_t(sstable_buffer_size), align_up(end - start, uint64_t(8 << 10))); return std::make_unique( - consumer, data_stream_at(start), end - start); + consumer, data_stream_at(start, estimated_size), end - start); } data_consume_context sstable::data_consume_rows(row_consumer& consumer) { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 8713239c8a..59a33c0f21 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -21,6 +21,7 @@ #include "unimplemented.hh" #include #include +#include namespace sstables { @@ -752,14 +753,23 @@ future sstable::read_indexes(uint64_t summary_idx) { uint64_t position = _summary.entries[summary_idx].position; uint64_t quantity = _summary.header.sampling_level; + uint64_t estimated_size; + if (++summary_idx >= _summary.header.size) { + estimated_size = index_size() - position; + } else { + estimated_size = _summary.entries[summary_idx].position - position; + } + + estimated_size = std::min(uint64_t(sstable_buffer_size), align_up(estimated_size, uint64_t(8 << 10))); + struct reader { uint64_t count = 0; std::vector indexes; shared_file_random_access_reader stream; - reader(file f, uint64_t quantity) : stream(f) { indexes.reserve(quantity); } + reader(file f, uint64_t quantity, uint64_t estimated_size) : stream(f, estimated_size) { indexes.reserve(quantity); } }; - auto r = make_lw_shared(_index_file, quantity); + auto r = make_lw_shared(_index_file, quantity, estimated_size); r->stream.seek(position); @@ -1470,12 +1480,12 @@ sstable::component_type sstable::component_from_sstring(sstring &s) { return reverse_map(s, _component_map); } -input_stream sstable::data_stream_at(uint64_t pos) { +input_stream sstable::data_stream_at(uint64_t pos, uint64_t buf_size) { if (_compression) { return make_compressed_file_input_stream( _data_file, &_compression, pos); } else { - return make_file_input_stream(_data_file, pos); + return make_file_input_stream(_data_file, pos, buf_size); } } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 27e4b96d86..69e53068cd 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -334,7 +334,7 @@ private: future read_indexes(uint64_t summary_idx); - input_stream data_stream_at(uint64_t pos); + input_stream data_stream_at(uint64_t pos, uint64_t buf_size = 8192); // Read exactly the specific byte range from the data file (after // uncompression, if the file is compressed). This can be used to read