diff --git a/sstables/compress.cc b/sstables/compress.cc index fef9992c6a..9ea98e8afc 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -22,8 +22,9 @@ #include #include -#include "core/align.hh" -#include "core/unaligned.hh" +#include +#include +#include #include "compress.hh" @@ -217,23 +218,46 @@ size_t compress_max_size_snappy(size_t input_len) { } class compressed_file_data_source_impl : public data_source_impl { - file _file; + input_stream _input_stream; sstables::compression* _compression_metadata; - uint64_t _pos = 0; - const io_priority_class* _pc; + uint64_t _pos; + uint64_t _beg_pos; + uint64_t _end_pos; public: - compressed_file_data_source_impl(file f, const io_priority_class& pc, - sstables::compression* cm, uint64_t pos) - : _file(std::move(f)), _compression_metadata(cm) - , _pos(pos) - , _pc(&pc) - {} + compressed_file_data_source_impl(file f, sstables::compression* cm, + uint64_t pos, size_t len, file_input_stream_options options) + : _compression_metadata(cm) + { + _beg_pos = pos; + if (pos >= _compression_metadata->data_len) { + throw std::runtime_error("attempt to uncompress beyond end"); + } + if (len <= _compression_metadata->data_len - pos) { + _end_pos = pos + len; + } else { + _end_pos = _compression_metadata->data_len; + } + // _beg_pos and _end_pos specify positions in the compressed stream. + // We need to translate them into a range of uncompressed chunks, + // and open a file_input_stream to read that range. + auto start = _compression_metadata->locate(_beg_pos); + auto end = _compression_metadata->locate(_end_pos - 1); + _input_stream = make_file_input_stream(std::move(f), + start.chunk_start, end.chunk_start + end.chunk_len, + std::move(options)); + _pos = _beg_pos; + } virtual future> get() override { - if (_pos >= _compression_metadata->data_len) { + if (_pos >= _end_pos) { return make_ready_future>(); } auto addr = _compression_metadata->locate(_pos); - return _file.dma_read_exactly(addr.chunk_start, addr.chunk_len, *_pc). + // Uncompress the next chunk. We need to skip part of the first + // chunk, but then continue to read from beginning of chunks. + if (_pos != _beg_pos && addr.offset != 0) { + throw std::runtime_error("compressed reader out of sync"); + } + return _input_stream.read_exactly(addr.chunk_len). then([this, addr](temporary_buffer buf) { // The last 4 bytes of the chunk are the adler32 checksum // of the rest of the (compressed) chunk. @@ -265,16 +289,17 @@ public: class compressed_file_data_source : public data_source { public: - compressed_file_data_source(file f, const io_priority_class& pc, - sstables::compression* cm, uint64_t offset) + compressed_file_data_source(file f, sstables::compression* cm, + uint64_t offset, size_t len, file_input_stream_options options) : data_source(std::make_unique( - std::move(f), pc, cm, offset)) + std::move(f), cm, offset, len, std::move(options))) {} }; input_stream make_compressed_file_input_stream( - file f, sstables::compression* cm, const io_priority_class& pc, uint64_t offset) + file f, sstables::compression* cm, uint64_t offset, size_t len, + file_input_stream_options options) { return input_stream(compressed_file_data_source( - std::move(f), pc, cm, offset)); + std::move(f), cm, offset, len, std::move(options))); } diff --git a/sstables/compress.hh b/sstables/compress.hh index 1820a5bed5..9af3778334 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -202,4 +202,4 @@ public: // as long as we have *sstables* work in progress, we need to keep the whole // sstable alive, and the compression metadata is only a part of it. input_stream make_compressed_file_input_stream( - file f, sstables::compression *cm, const io_priority_class& pc, uint64_t offset = 0); + file f, sstables::compression *cm, uint64_t offset, size_t len, class file_input_stream_options options); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 95cfd712ae..9904589e75 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1591,31 +1591,25 @@ sstable::component_type sstable::component_from_sstring(sstring &s) { // ahead buffer before reaching the end, but not over-read at the end, so // data_stream() is more efficient than data_stream_at(). input_stream sstable::data_stream_at(uint64_t pos, uint64_t buf_size, const io_priority_class& pc) { + file_input_stream_options options; + options.buffer_size = buf_size; + options.io_priority_class = pc; if (_compression) { - return make_compressed_file_input_stream( - _data_file, &_compression, pc, pos); + return make_compressed_file_input_stream(_data_file, &_compression, + pos, _compression.data_len - pos, std::move(options)); } else { - file_input_stream_options options; - options.buffer_size = buf_size; - options.io_priority_class = pc; return make_file_input_stream(_data_file, pos, std::move(options)); } } input_stream sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc) { + file_input_stream_options options; + options.buffer_size = sstable_buffer_size; + options.io_priority_class = pc; if (_compression) { - // FIXME: we should pass "len" to make_compressed_file_input_stream - // to allow it to read-ahead several compressed chunks from disk, and - // stop reading ahead when reaching the intended end of the read. - // However, currently, the code in compress.cc is not as sophisticated - // as the code in fstream.cc, and it reads exactly one compressed - // chunk at a time, so the "len" parameter can't help it. - return make_compressed_file_input_stream( - _data_file, &_compression, pc, pos); + return make_compressed_file_input_stream(_data_file, &_compression, + pos, len, std::move(options)); } else { - file_input_stream_options options; - options.buffer_size = sstable_buffer_size; - options.io_priority_class = pc; return make_file_input_stream(_data_file, pos, len, std::move(options)); } }