From 5dff7489b173682e83f4be17b0af953306afabdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 5 Jan 2023 05:54:13 -0500 Subject: [PATCH] sstables: track decompressed buffers Convert decompressed temporary buffers into tracked buffers just before returning them to the upper layer. This ensures these buffers are known to the reader concurrency semaphore and it has an accurate view of the actual memory consumption of reads. Fixes: #12448 Closes #12454 (cherry picked from commit c4688563e331c142d56e68767e96cfe59c78a201) --- sstables/compress.cc | 23 +++++++++++++---------- sstables/compress.hh | 6 ++++-- sstables/sstables.cc | 6 +++--- test/boost/sstable_test.cc | 5 ++++- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/sstables/compress.cc b/sstables/compress.cc index af15ba9f8c..0dc886e3a8 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -21,6 +21,7 @@ #include "unimplemented.hh" #include "segmented_compress_params.hh" #include "utils/class_registrator.hh" +#include "reader_permit.hh" namespace sstables { @@ -338,16 +339,18 @@ class compressed_file_data_source_impl : public data_source_impl { sstables::compression* _compression_metadata; sstables::compression::segmented_offsets::accessor _offsets; sstables::local_compression _compression; + reader_permit _permit; uint64_t _underlying_pos; uint64_t _pos; uint64_t _beg_pos; uint64_t _end_pos; public: compressed_file_data_source_impl(file f, sstables::compression* cm, - uint64_t pos, size_t len, file_input_stream_options options) + uint64_t pos, size_t len, file_input_stream_options options, reader_permit permit) : _compression_metadata(cm) , _offsets(_compression_metadata->offsets.get_accessor()) , _compression(*cm) + , _permit(std::move(permit)) { _beg_pos = pos; if (pos > _compression_metadata->uncompressed_file_length()) { @@ -412,7 +415,7 @@ public: _pos += out.size(); _underlying_pos += addr.chunk_len; - return out; + return make_tracked_temporary_buffer(std::move(out), _permit); }); } @@ -444,9 +447,9 @@ requires ChecksumUtils class compressed_file_data_source : public data_source { public: compressed_file_data_source(file f, sstables::compression* cm, - uint64_t offset, size_t len, file_input_stream_options options) + uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit) : data_source(std::make_unique>( - std::move(f), cm, offset, len, std::move(options))) + std::move(f), cm, offset, len, std::move(options), std::move(permit))) {} }; @@ -454,10 +457,10 @@ template requires ChecksumUtils inline input_stream make_compressed_file_input_stream( file f, sstables::compression *cm, uint64_t offset, size_t len, - file_input_stream_options options) + file_input_stream_options options, reader_permit permit) { return input_stream(compressed_file_data_source( - std::move(f), cm, offset, len, std::move(options))); + std::move(f), cm, offset, len, std::move(options), std::move(permit))); } // For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks. @@ -569,15 +572,15 @@ inline output_stream make_compressed_file_output_stream(output_stream sstables::make_compressed_file_k_l_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options) + class file_input_stream_options options, reader_permit permit) { - return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options)); + return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options), std::move(permit)); } input_stream sstables::make_compressed_file_m_format_input_stream(file f, sstables::compression *cm, uint64_t offset, size_t len, - class file_input_stream_options options) { - return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options)); + class file_input_stream_options options, reader_permit permit) { + return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options), std::move(permit)); } output_stream sstables::make_compressed_file_m_format_output_stream(output_stream out, diff --git a/sstables/compress.hh b/sstables/compress.hh index a134784aab..e6d16078d6 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -47,6 +47,8 @@ #include "checksum_utils.hh" #include "../compress.hh" +class reader_permit; + class compression_parameters; class compressor; using compressor_ptr = shared_ptr; @@ -371,11 +373,11 @@ compressor_ptr get_sstable_compressor(const compression&); // sstable alive, and the compression metadata is only a part of it. input_stream make_compressed_file_k_l_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options); + class file_input_stream_options options, reader_permit permit); input_stream make_compressed_file_m_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options); + class file_input_stream_options options, reader_permit permit); output_stream make_compressed_file_m_format_output_stream(output_stream out, sstables::compression* cm, diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 029e30cee0..5bda0aee20 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2287,7 +2287,7 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior options.read_ahead = 4; options.dynamic_adjustments = std::move(history); - file f = make_tracked_file(_data_file, std::move(permit)); + file f = make_tracked_file(_data_file, permit); if (trace_state) { f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename())); } @@ -2296,10 +2296,10 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior if (_components->compression) { if (_version >= sstable_version_types::mc) { return make_compressed_file_m_format_input_stream(f, &_components->compression, - pos, len, std::move(options)); + pos, len, std::move(options), permit); } else { return make_compressed_file_k_l_format_input_stream(f, &_components->compression, - pos, len, std::move(options)); + pos, len, std::move(options), permit); } } diff --git a/test/boost/sstable_test.cc b/test/boost/sstable_test.cc index 40154a295d..fd25fa7d23 100644 --- a/test/boost/sstable_test.cc +++ b/test/boost/sstable_test.cc @@ -18,6 +18,7 @@ #include "compaction/compaction_manager.hh" #include "sstables/key.hh" #include "test/lib/sstable_utils.hh" +#include "test/lib/reader_concurrency_semaphore.hh" #include #include "schema.hh" #include "compress.hh" @@ -752,6 +753,8 @@ SEASTAR_TEST_CASE(sub_partitions_read) { SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) { return seastar::async([] { + tests::reader_concurrency_semaphore_wrapper semaphore; + tmpdir tmp; auto file_path = (tmp.path() / "test").string(); file f = open_file_dma(file_path, open_flags::create | open_flags::wo).get0(); @@ -787,7 +790,7 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) { auto make_is = [&] { f = open_file_dma(file_path, open_flags::ro).get0(); - return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts); + return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts, semaphore.make_permit()); }; auto expect = [] (input_stream& in, const temporary_buffer& buf) {