sstables: more efficient read of compressed data file

Before this patch, reading large ranges from a compressed data file involved
two inefficiencies:

 1.  The compressed data file was read one compressed chunk at a time.
     Such a chunk is around 30 KB in size, well below our desired sstable
     read-ahead size (sstable_buffer_size = 128 KB).

 2.  Because the compressed chunks have variable length (the uncompressed
     chunk has a fixed length) they are not aligned to disk blocks, so
     consecutive chunks have overlapping blocks which were unnecessarily
     read twice.

The fix for both issues is to build the compressed_file_input_stream on
an existing file_input_stream, instead of using direct file IO to read the
individual chunks. file_input_stream takes care of doing the appropriate
amount of read-ahead, and the compressed_file_input_stream layer does the
decompression of the data read from the underlying layer.

Fixes #992.

Historical note: Implementing compressed_file_input_stream on top of
file_input_stream was already tried in the past, and rejected. The problem
at that time was that compressed_file_input_stream's constructor did not
specify the *end* of the range to read, so that when we wanted to read
only a small range we got too much read-ahead beyond the exactly one
compressed chunk that we needed to read.  Following the fix to issue #964,
we now know on every streaming read also the intended *end* of the stream,
so we can now use this to stop reading at the end of the last required
chunk, even when we use a read-ahead buffer much larger than a chunk.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1457304335-8507-1-git-send-email-nyh@scylladb.com>
This commit is contained in:
Nadav Har'El
2016-03-07 00:45:35 +02:00
committed by Pekka Enberg
parent 8260b8fc6f
commit 2f56577794
3 changed files with 54 additions and 35 deletions

View File

@@ -22,8 +22,9 @@
#include <stdexcept>
#include <cstdlib>
#include "core/align.hh"
#include "core/unaligned.hh"
#include <seastar/core/align.hh>
#include <seastar/core/unaligned.hh>
#include <seastar/core/fstream.hh>
#include "compress.hh"
@@ -217,23 +218,46 @@ size_t compress_max_size_snappy(size_t input_len) {
}
class compressed_file_data_source_impl : public data_source_impl {
file _file;
input_stream<char> _input_stream;
sstables::compression* _compression_metadata;
uint64_t _pos = 0;
const io_priority_class* _pc;
uint64_t _pos;
uint64_t _beg_pos;
uint64_t _end_pos;
public:
compressed_file_data_source_impl(file f, const io_priority_class& pc,
sstables::compression* cm, uint64_t pos)
: _file(std::move(f)), _compression_metadata(cm)
, _pos(pos)
, _pc(&pc)
{}
compressed_file_data_source_impl(file f, sstables::compression* cm,
uint64_t pos, size_t len, file_input_stream_options options)
: _compression_metadata(cm)
{
_beg_pos = pos;
if (pos >= _compression_metadata->data_len) {
throw std::runtime_error("attempt to uncompress beyond end");
}
if (len <= _compression_metadata->data_len - pos) {
_end_pos = pos + len;
} else {
_end_pos = _compression_metadata->data_len;
}
// _beg_pos and _end_pos specify positions in the compressed stream.
// We need to translate them into a range of uncompressed chunks,
// and open a file_input_stream to read that range.
auto start = _compression_metadata->locate(_beg_pos);
auto end = _compression_metadata->locate(_end_pos - 1);
_input_stream = make_file_input_stream(std::move(f),
start.chunk_start, end.chunk_start + end.chunk_len,
std::move(options));
_pos = _beg_pos;
}
virtual future<temporary_buffer<char>> get() override {
if (_pos >= _compression_metadata->data_len) {
if (_pos >= _end_pos) {
return make_ready_future<temporary_buffer<char>>();
}
auto addr = _compression_metadata->locate(_pos);
return _file.dma_read_exactly<char>(addr.chunk_start, addr.chunk_len, *_pc).
// Uncompress the next chunk. We need to skip part of the first
// chunk, but then continue to read from beginning of chunks.
if (_pos != _beg_pos && addr.offset != 0) {
throw std::runtime_error("compressed reader out of sync");
}
return _input_stream.read_exactly(addr.chunk_len).
then([this, addr](temporary_buffer<char> buf) {
// The last 4 bytes of the chunk are the adler32 checksum
// of the rest of the (compressed) chunk.
@@ -265,16 +289,17 @@ public:
class compressed_file_data_source : public data_source {
public:
compressed_file_data_source(file f, const io_priority_class& pc,
sstables::compression* cm, uint64_t offset)
compressed_file_data_source(file f, sstables::compression* cm,
uint64_t offset, size_t len, file_input_stream_options options)
: data_source(std::make_unique<compressed_file_data_source_impl>(
std::move(f), pc, cm, offset))
std::move(f), cm, offset, len, std::move(options)))
{}
};
input_stream<char> make_compressed_file_input_stream(
file f, sstables::compression* cm, const io_priority_class& pc, uint64_t offset)
file f, sstables::compression* cm, uint64_t offset, size_t len,
file_input_stream_options options)
{
return input_stream<char>(compressed_file_data_source(
std::move(f), pc, cm, offset));
std::move(f), cm, offset, len, std::move(options)));
}

View File

@@ -202,4 +202,4 @@ public:
// as long as we have *sstables* work in progress, we need to keep the whole
// sstable alive, and the compression metadata is only a part of it.
input_stream<char> make_compressed_file_input_stream(
file f, sstables::compression *cm, const io_priority_class& pc, uint64_t offset = 0);
file f, sstables::compression *cm, uint64_t offset, size_t len, class file_input_stream_options options);

View File

@@ -1591,31 +1591,25 @@ sstable::component_type sstable::component_from_sstring(sstring &s) {
// ahead buffer before reaching the end, but not over-read at the end, so
// data_stream() is more efficient than data_stream_at().
input_stream<char> sstable::data_stream_at(uint64_t pos, uint64_t buf_size, const io_priority_class& pc) {
file_input_stream_options options;
options.buffer_size = buf_size;
options.io_priority_class = pc;
if (_compression) {
return make_compressed_file_input_stream(
_data_file, &_compression, pc, pos);
return make_compressed_file_input_stream(_data_file, &_compression,
pos, _compression.data_len - pos, std::move(options));
} else {
file_input_stream_options options;
options.buffer_size = buf_size;
options.io_priority_class = pc;
return make_file_input_stream(_data_file, pos, std::move(options));
}
}
input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc) {
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
if (_compression) {
// FIXME: we should pass "len" to make_compressed_file_input_stream
// to allow it to read-ahead several compressed chunks from disk, and
// stop reading ahead when reaching the intended end of the read.
// However, currently, the code in compress.cc is not as sophisticated
// as the code in fstream.cc, and it reads exactly one compressed
// chunk at a time, so the "len" parameter can't help it.
return make_compressed_file_input_stream(
_data_file, &_compression, pc, pos);
return make_compressed_file_input_stream(_data_file, &_compression,
pos, len, std::move(options));
} else {
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
return make_file_input_stream(_data_file, pos, len, std::move(options));
}
}