sstables: more efficient read of compressed data file
Before this patch, reading large ranges from a compressed data file involved
two inefficiencies:
1. The compressed data file was read one compressed chunk at a time.
Such a chunk is around 30 KB in size, well below our desired sstable
read-ahead size (sstable_buffer_size = 128 KB).
2. Because the compressed chunks have variable length (the uncompressed
chunk has a fixed length) they are not aligned to disk blocks, so
consecutive chunks have overlapping blocks which were unnecessarily
read twice.
The fix for both issues is to build the compressed_file_input_stream on
an existing file_input_stream, instead of using direct file IO to read the
individual chunks. file_input_stream takes care of doing the appropriate
amount of read-ahead, and the compressed_file_input_stream layer does the
decompression of the data read from the underlying layer.
Fixes #992.
Historical note: Implementing compressed_file_input_stream on top of
file_input_stream was already tried in the past, and rejected. The problem
at that time was that compressed_file_input_stream's constructor did not
specify the *end* of the range to read, so that when we wanted to read
only a small range we got too much read-ahead beyond the exactly one
compressed chunk that we needed to read. Following the fix to issue #964,
we now know on every streaming read also the intended *end* of the stream,
so we can now use this to stop reading at the end of the last required
chunk, even when we use a read-ahead buffer much larger than a chunk.
Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1457304335-8507-1-git-send-email-nyh@scylladb.com>
This commit is contained in:
committed by
Pekka Enberg
parent
8260b8fc6f
commit
2f56577794
@@ -22,8 +22,9 @@
|
||||
#include <stdexcept>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "core/align.hh"
|
||||
#include "core/unaligned.hh"
|
||||
#include <seastar/core/align.hh>
|
||||
#include <seastar/core/unaligned.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
|
||||
#include "compress.hh"
|
||||
|
||||
@@ -217,23 +218,46 @@ size_t compress_max_size_snappy(size_t input_len) {
|
||||
}
|
||||
|
||||
class compressed_file_data_source_impl : public data_source_impl {
|
||||
file _file;
|
||||
input_stream<char> _input_stream;
|
||||
sstables::compression* _compression_metadata;
|
||||
uint64_t _pos = 0;
|
||||
const io_priority_class* _pc;
|
||||
uint64_t _pos;
|
||||
uint64_t _beg_pos;
|
||||
uint64_t _end_pos;
|
||||
public:
|
||||
compressed_file_data_source_impl(file f, const io_priority_class& pc,
|
||||
sstables::compression* cm, uint64_t pos)
|
||||
: _file(std::move(f)), _compression_metadata(cm)
|
||||
, _pos(pos)
|
||||
, _pc(&pc)
|
||||
{}
|
||||
compressed_file_data_source_impl(file f, sstables::compression* cm,
|
||||
uint64_t pos, size_t len, file_input_stream_options options)
|
||||
: _compression_metadata(cm)
|
||||
{
|
||||
_beg_pos = pos;
|
||||
if (pos >= _compression_metadata->data_len) {
|
||||
throw std::runtime_error("attempt to uncompress beyond end");
|
||||
}
|
||||
if (len <= _compression_metadata->data_len - pos) {
|
||||
_end_pos = pos + len;
|
||||
} else {
|
||||
_end_pos = _compression_metadata->data_len;
|
||||
}
|
||||
// _beg_pos and _end_pos specify positions in the compressed stream.
|
||||
// We need to translate them into a range of uncompressed chunks,
|
||||
// and open a file_input_stream to read that range.
|
||||
auto start = _compression_metadata->locate(_beg_pos);
|
||||
auto end = _compression_metadata->locate(_end_pos - 1);
|
||||
_input_stream = make_file_input_stream(std::move(f),
|
||||
start.chunk_start, end.chunk_start + end.chunk_len,
|
||||
std::move(options));
|
||||
_pos = _beg_pos;
|
||||
}
|
||||
virtual future<temporary_buffer<char>> get() override {
|
||||
if (_pos >= _compression_metadata->data_len) {
|
||||
if (_pos >= _end_pos) {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
auto addr = _compression_metadata->locate(_pos);
|
||||
return _file.dma_read_exactly<char>(addr.chunk_start, addr.chunk_len, *_pc).
|
||||
// Uncompress the next chunk. We need to skip part of the first
|
||||
// chunk, but then continue to read from beginning of chunks.
|
||||
if (_pos != _beg_pos && addr.offset != 0) {
|
||||
throw std::runtime_error("compressed reader out of sync");
|
||||
}
|
||||
return _input_stream.read_exactly(addr.chunk_len).
|
||||
then([this, addr](temporary_buffer<char> buf) {
|
||||
// The last 4 bytes of the chunk are the adler32 checksum
|
||||
// of the rest of the (compressed) chunk.
|
||||
@@ -265,16 +289,17 @@ public:
|
||||
|
||||
class compressed_file_data_source : public data_source {
|
||||
public:
|
||||
compressed_file_data_source(file f, const io_priority_class& pc,
|
||||
sstables::compression* cm, uint64_t offset)
|
||||
compressed_file_data_source(file f, sstables::compression* cm,
|
||||
uint64_t offset, size_t len, file_input_stream_options options)
|
||||
: data_source(std::make_unique<compressed_file_data_source_impl>(
|
||||
std::move(f), pc, cm, offset))
|
||||
std::move(f), cm, offset, len, std::move(options)))
|
||||
{}
|
||||
};
|
||||
|
||||
input_stream<char> make_compressed_file_input_stream(
|
||||
file f, sstables::compression* cm, const io_priority_class& pc, uint64_t offset)
|
||||
file f, sstables::compression* cm, uint64_t offset, size_t len,
|
||||
file_input_stream_options options)
|
||||
{
|
||||
return input_stream<char>(compressed_file_data_source(
|
||||
std::move(f), pc, cm, offset));
|
||||
std::move(f), cm, offset, len, std::move(options)));
|
||||
}
|
||||
|
||||
@@ -202,4 +202,4 @@ public:
|
||||
// as long as we have *sstables* work in progress, we need to keep the whole
|
||||
// sstable alive, and the compression metadata is only a part of it.
|
||||
input_stream<char> make_compressed_file_input_stream(
|
||||
file f, sstables::compression *cm, const io_priority_class& pc, uint64_t offset = 0);
|
||||
file f, sstables::compression *cm, uint64_t offset, size_t len, class file_input_stream_options options);
|
||||
|
||||
@@ -1591,31 +1591,25 @@ sstable::component_type sstable::component_from_sstring(sstring &s) {
|
||||
// ahead buffer before reaching the end, but not over-read at the end, so
|
||||
// data_stream() is more efficient than data_stream_at().
|
||||
input_stream<char> sstable::data_stream_at(uint64_t pos, uint64_t buf_size, const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = buf_size;
|
||||
options.io_priority_class = pc;
|
||||
if (_compression) {
|
||||
return make_compressed_file_input_stream(
|
||||
_data_file, &_compression, pc, pos);
|
||||
return make_compressed_file_input_stream(_data_file, &_compression,
|
||||
pos, _compression.data_len - pos, std::move(options));
|
||||
} else {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = buf_size;
|
||||
options.io_priority_class = pc;
|
||||
return make_file_input_stream(_data_file, pos, std::move(options));
|
||||
}
|
||||
}
|
||||
|
||||
input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
if (_compression) {
|
||||
// FIXME: we should pass "len" to make_compressed_file_input_stream
|
||||
// to allow it to read-ahead several compressed chunks from disk, and
|
||||
// stop reading ahead when reaching the intended end of the read.
|
||||
// However, currently, the code in compress.cc is not as sophisticated
|
||||
// as the code in fstream.cc, and it reads exactly one compressed
|
||||
// chunk at a time, so the "len" parameter can't help it.
|
||||
return make_compressed_file_input_stream(
|
||||
_data_file, &_compression, pc, pos);
|
||||
return make_compressed_file_input_stream(_data_file, &_compression,
|
||||
pos, len, std::move(options));
|
||||
} else {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
return make_file_input_stream(_data_file, pos, len, std::move(options));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user