sstables: track decompressed buffers

Convert decompressed temporary buffers into tracked buffers just before
returning them to the upper layer. This ensures these buffers are known
to the reader concurrency semaphore and it has an accurate view of the
actual memory consumption of reads.

Fixes: #12448

Closes #12454

(cherry picked from commit c4688563e3)
This commit is contained in:
Botond Dénes
2023-01-05 05:54:13 -05:00
committed by Avi Kivity
parent 2775b1d136
commit 5dff7489b1
4 changed files with 24 additions and 16 deletions

View File

@@ -21,6 +21,7 @@
#include "unimplemented.hh"
#include "segmented_compress_params.hh"
#include "utils/class_registrator.hh"
#include "reader_permit.hh"
namespace sstables {
@@ -338,16 +339,18 @@ class compressed_file_data_source_impl : public data_source_impl {
sstables::compression* _compression_metadata;
sstables::compression::segmented_offsets::accessor _offsets;
sstables::local_compression _compression;
reader_permit _permit;
uint64_t _underlying_pos;
uint64_t _pos;
uint64_t _beg_pos;
uint64_t _end_pos;
public:
compressed_file_data_source_impl(file f, sstables::compression* cm,
uint64_t pos, size_t len, file_input_stream_options options)
uint64_t pos, size_t len, file_input_stream_options options, reader_permit permit)
: _compression_metadata(cm)
, _offsets(_compression_metadata->offsets.get_accessor())
, _compression(*cm)
, _permit(std::move(permit))
{
_beg_pos = pos;
if (pos > _compression_metadata->uncompressed_file_length()) {
@@ -412,7 +415,7 @@ public:
_pos += out.size();
_underlying_pos += addr.chunk_len;
return out;
return make_tracked_temporary_buffer(std::move(out), _permit);
});
}
@@ -444,9 +447,9 @@ requires ChecksumUtils<ChecksumType>
class compressed_file_data_source : public data_source {
public:
compressed_file_data_source(file f, sstables::compression* cm,
uint64_t offset, size_t len, file_input_stream_options options)
uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit)
: data_source(std::make_unique<compressed_file_data_source_impl<ChecksumType>>(
std::move(f), cm, offset, len, std::move(options)))
std::move(f), cm, offset, len, std::move(options), std::move(permit)))
{}
};
@@ -454,10 +457,10 @@ template <typename ChecksumType>
requires ChecksumUtils<ChecksumType>
inline input_stream<char> make_compressed_file_input_stream(
file f, sstables::compression *cm, uint64_t offset, size_t len,
file_input_stream_options options)
file_input_stream_options options, reader_permit permit)
{
return input_stream<char>(compressed_file_data_source<ChecksumType>(
std::move(f), cm, offset, len, std::move(options)));
std::move(f), cm, offset, len, std::move(options), std::move(permit)));
}
// For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks.
@@ -569,15 +572,15 @@ inline output_stream<char> make_compressed_file_output_stream(output_stream<char
input_stream<char> sstables::make_compressed_file_k_l_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options)
class file_input_stream_options options, reader_permit permit)
{
return make_compressed_file_input_stream<adler32_utils>(std::move(f), cm, offset, len, std::move(options));
return make_compressed_file_input_stream<adler32_utils>(std::move(f), cm, offset, len, std::move(options), std::move(permit));
}
input_stream<char> sstables::make_compressed_file_m_format_input_stream(file f,
sstables::compression *cm, uint64_t offset, size_t len,
class file_input_stream_options options) {
return make_compressed_file_input_stream<crc32_utils>(std::move(f), cm, offset, len, std::move(options));
class file_input_stream_options options, reader_permit permit) {
return make_compressed_file_input_stream<crc32_utils>(std::move(f), cm, offset, len, std::move(options), std::move(permit));
}
output_stream<char> sstables::make_compressed_file_m_format_output_stream(output_stream<char> out,

View File

@@ -47,6 +47,8 @@
#include "checksum_utils.hh"
#include "../compress.hh"
class reader_permit;
class compression_parameters;
class compressor;
using compressor_ptr = shared_ptr<compressor>;
@@ -371,11 +373,11 @@ compressor_ptr get_sstable_compressor(const compression&);
// sstable alive, and the compression metadata is only a part of it.
input_stream<char> make_compressed_file_k_l_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options);
class file_input_stream_options options, reader_permit permit);
input_stream<char> make_compressed_file_m_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options);
class file_input_stream_options options, reader_permit permit);
output_stream<char> make_compressed_file_m_format_output_stream(output_stream<char> out,
sstables::compression* cm,

View File

@@ -2287,7 +2287,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
options.read_ahead = 4;
options.dynamic_adjustments = std::move(history);
file f = make_tracked_file(_data_file, std::move(permit));
file f = make_tracked_file(_data_file, permit);
if (trace_state) {
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
}
@@ -2296,10 +2296,10 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
if (_components->compression) {
if (_version >= sstable_version_types::mc) {
return make_compressed_file_m_format_input_stream(f, &_components->compression,
pos, len, std::move(options));
pos, len, std::move(options), permit);
} else {
return make_compressed_file_k_l_format_input_stream(f, &_components->compression,
pos, len, std::move(options));
pos, len, std::move(options), permit);
}
}

View File

@@ -18,6 +18,7 @@
#include "compaction/compaction_manager.hh"
#include "sstables/key.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include <seastar/testing/test_case.hh>
#include "schema.hh"
#include "compress.hh"
@@ -752,6 +753,8 @@ SEASTAR_TEST_CASE(sub_partitions_read) {
SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
tmpdir tmp;
auto file_path = (tmp.path() / "test").string();
file f = open_file_dma(file_path, open_flags::create | open_flags::wo).get0();
@@ -787,7 +790,7 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
auto make_is = [&] {
f = open_file_dma(file_path, open_flags::ro).get0();
return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts);
return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts, semaphore.make_permit());
};
auto expect = [] (input_stream<char>& in, const temporary_buffer<char>& buf) {