From 2f565777945801f99622b3155cdbbae702d52bbd Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Mon, 7 Mar 2016 00:45:35 +0200 Subject: [PATCH] sstables: more efficient read of compressed data file Before this patch, reading large ranges from a compressed data file involved two inefficiencies: 1. The compressed data file was read one compressed chunk at a time. Such a chunk is around 30 KB in size, well below our desired sstable read-ahead size (sstable_buffer_size = 128 KB). 2. Because the compressed chunks have variable length (the uncompressed chunk has a fixed length) they are not aligned to disk blocks, so consecutive chunks have overlapping blocks which were unnecessarily read twice. The fix for both issues is to build the compressed_file_input_stream on an existing file_input_stream, instead of using direct file IO to read the individual chunks. file_input_stream takes care of doing the appropriate amount of read-ahead, and the compressed_file_input_stream layer does the decompression of the data read from the underlying layer. Fixes #992. Historical note: Implementing compressed_file_input_stream on top of file_input_stream was already tried in the past, and rejected. The problem at that time was that compressed_file_input_stream's constructor did not specify the *end* of the range to read, so that when we wanted to read only a small range we got too much read-ahead beyond the exactly one compressed chunk that we needed to read. Following the fix to issue #964, we now know on every streaming read also the intended *end* of the stream, so we can now use this to stop reading at the end of the last required chunk, even when we use a read-ahead buffer much larger than a chunk. Signed-off-by: Nadav Har'El Message-Id: <1457304335-8507-1-git-send-email-nyh@scylladb.com> --- sstables/compress.cc | 61 +++++++++++++++++++++++++++++++------------- sstables/compress.hh | 2 +- sstables/sstables.cc | 26 ++++++++----------- 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/sstables/compress.cc b/sstables/compress.cc index fef9992c6a..9ea98e8afc 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -22,8 +22,9 @@ #include #include -#include "core/align.hh" -#include "core/unaligned.hh" +#include +#include +#include #include "compress.hh" @@ -217,23 +218,46 @@ size_t compress_max_size_snappy(size_t input_len) { } class compressed_file_data_source_impl : public data_source_impl { - file _file; + input_stream _input_stream; sstables::compression* _compression_metadata; - uint64_t _pos = 0; - const io_priority_class* _pc; + uint64_t _pos; + uint64_t _beg_pos; + uint64_t _end_pos; public: - compressed_file_data_source_impl(file f, const io_priority_class& pc, - sstables::compression* cm, uint64_t pos) - : _file(std::move(f)), _compression_metadata(cm) - , _pos(pos) - , _pc(&pc) - {} + compressed_file_data_source_impl(file f, sstables::compression* cm, + uint64_t pos, size_t len, file_input_stream_options options) + : _compression_metadata(cm) + { + _beg_pos = pos; + if (pos >= _compression_metadata->data_len) { + throw std::runtime_error("attempt to uncompress beyond end"); + } + if (len <= _compression_metadata->data_len - pos) { + _end_pos = pos + len; + } else { + _end_pos = _compression_metadata->data_len; + } + // _beg_pos and _end_pos specify positions in the compressed stream. + // We need to translate them into a range of uncompressed chunks, + // and open a file_input_stream to read that range. + auto start = _compression_metadata->locate(_beg_pos); + auto end = _compression_metadata->locate(_end_pos - 1); + _input_stream = make_file_input_stream(std::move(f), + start.chunk_start, end.chunk_start + end.chunk_len, + std::move(options)); + _pos = _beg_pos; + } virtual future> get() override { - if (_pos >= _compression_metadata->data_len) { + if (_pos >= _end_pos) { return make_ready_future>(); } auto addr = _compression_metadata->locate(_pos); - return _file.dma_read_exactly(addr.chunk_start, addr.chunk_len, *_pc). + // Uncompress the next chunk. We need to skip part of the first + // chunk, but then continue to read from beginning of chunks. + if (_pos != _beg_pos && addr.offset != 0) { + throw std::runtime_error("compressed reader out of sync"); + } + return _input_stream.read_exactly(addr.chunk_len). then([this, addr](temporary_buffer buf) { // The last 4 bytes of the chunk are the adler32 checksum // of the rest of the (compressed) chunk. @@ -265,16 +289,17 @@ public: class compressed_file_data_source : public data_source { public: - compressed_file_data_source(file f, const io_priority_class& pc, - sstables::compression* cm, uint64_t offset) + compressed_file_data_source(file f, sstables::compression* cm, + uint64_t offset, size_t len, file_input_stream_options options) : data_source(std::make_unique( - std::move(f), pc, cm, offset)) + std::move(f), cm, offset, len, std::move(options))) {} }; input_stream make_compressed_file_input_stream( - file f, sstables::compression* cm, const io_priority_class& pc, uint64_t offset) + file f, sstables::compression* cm, uint64_t offset, size_t len, + file_input_stream_options options) { return input_stream(compressed_file_data_source( - std::move(f), pc, cm, offset)); + std::move(f), cm, offset, len, std::move(options))); } diff --git a/sstables/compress.hh b/sstables/compress.hh index 1820a5bed5..9af3778334 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -202,4 +202,4 @@ public: // as long as we have *sstables* work in progress, we need to keep the whole // sstable alive, and the compression metadata is only a part of it. input_stream make_compressed_file_input_stream( - file f, sstables::compression *cm, const io_priority_class& pc, uint64_t offset = 0); + file f, sstables::compression *cm, uint64_t offset, size_t len, class file_input_stream_options options); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 95cfd712ae..9904589e75 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1591,31 +1591,25 @@ sstable::component_type sstable::component_from_sstring(sstring &s) { // ahead buffer before reaching the end, but not over-read at the end, so // data_stream() is more efficient than data_stream_at(). input_stream sstable::data_stream_at(uint64_t pos, uint64_t buf_size, const io_priority_class& pc) { + file_input_stream_options options; + options.buffer_size = buf_size; + options.io_priority_class = pc; if (_compression) { - return make_compressed_file_input_stream( - _data_file, &_compression, pc, pos); + return make_compressed_file_input_stream(_data_file, &_compression, + pos, _compression.data_len - pos, std::move(options)); } else { - file_input_stream_options options; - options.buffer_size = buf_size; - options.io_priority_class = pc; return make_file_input_stream(_data_file, pos, std::move(options)); } } input_stream sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc) { + file_input_stream_options options; + options.buffer_size = sstable_buffer_size; + options.io_priority_class = pc; if (_compression) { - // FIXME: we should pass "len" to make_compressed_file_input_stream - // to allow it to read-ahead several compressed chunks from disk, and - // stop reading ahead when reaching the intended end of the read. - // However, currently, the code in compress.cc is not as sophisticated - // as the code in fstream.cc, and it reads exactly one compressed - // chunk at a time, so the "len" parameter can't help it. - return make_compressed_file_input_stream( - _data_file, &_compression, pc, pos); + return make_compressed_file_input_stream(_data_file, &_compression, + pos, len, std::move(options)); } else { - file_input_stream_options options; - options.buffer_size = sstable_buffer_size; - options.io_priority_class = pc; return make_file_input_stream(_data_file, pos, len, std::move(options)); } }