diff --git a/sstables/compress.cc b/sstables/compress.cc index af15ba9f8c..0dc886e3a8 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -21,6 +21,7 @@ #include "unimplemented.hh" #include "segmented_compress_params.hh" #include "utils/class_registrator.hh" +#include "reader_permit.hh" namespace sstables { @@ -338,16 +339,18 @@ class compressed_file_data_source_impl : public data_source_impl { sstables::compression* _compression_metadata; sstables::compression::segmented_offsets::accessor _offsets; sstables::local_compression _compression; + reader_permit _permit; uint64_t _underlying_pos; uint64_t _pos; uint64_t _beg_pos; uint64_t _end_pos; public: compressed_file_data_source_impl(file f, sstables::compression* cm, - uint64_t pos, size_t len, file_input_stream_options options) + uint64_t pos, size_t len, file_input_stream_options options, reader_permit permit) : _compression_metadata(cm) , _offsets(_compression_metadata->offsets.get_accessor()) , _compression(*cm) + , _permit(std::move(permit)) { _beg_pos = pos; if (pos > _compression_metadata->uncompressed_file_length()) { @@ -412,7 +415,7 @@ public: _pos += out.size(); _underlying_pos += addr.chunk_len; - return out; + return make_tracked_temporary_buffer(std::move(out), _permit); }); } @@ -444,9 +447,9 @@ requires ChecksumUtils class compressed_file_data_source : public data_source { public: compressed_file_data_source(file f, sstables::compression* cm, - uint64_t offset, size_t len, file_input_stream_options options) + uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit) : data_source(std::make_unique>( - std::move(f), cm, offset, len, std::move(options))) + std::move(f), cm, offset, len, std::move(options), std::move(permit))) {} }; @@ -454,10 +457,10 @@ template requires ChecksumUtils inline input_stream make_compressed_file_input_stream( file f, sstables::compression *cm, uint64_t offset, size_t len, - file_input_stream_options options) + file_input_stream_options options, reader_permit permit) { return input_stream(compressed_file_data_source( - std::move(f), cm, offset, len, std::move(options))); + std::move(f), cm, offset, len, std::move(options), std::move(permit))); } // For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks. @@ -569,15 +572,15 @@ inline output_stream make_compressed_file_output_stream(output_stream sstables::make_compressed_file_k_l_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options) + class file_input_stream_options options, reader_permit permit) { - return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options)); + return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options), std::move(permit)); } input_stream sstables::make_compressed_file_m_format_input_stream(file f, sstables::compression *cm, uint64_t offset, size_t len, - class file_input_stream_options options) { - return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options)); + class file_input_stream_options options, reader_permit permit) { + return make_compressed_file_input_stream(std::move(f), cm, offset, len, std::move(options), std::move(permit)); } output_stream sstables::make_compressed_file_m_format_output_stream(output_stream out, diff --git a/sstables/compress.hh b/sstables/compress.hh index a134784aab..e6d16078d6 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -47,6 +47,8 @@ #include "checksum_utils.hh" #include "../compress.hh" +class reader_permit; + class compression_parameters; class compressor; using compressor_ptr = shared_ptr; @@ -371,11 +373,11 @@ compressor_ptr get_sstable_compressor(const compression&); // sstable alive, and the compression metadata is only a part of it. input_stream make_compressed_file_k_l_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options); + class file_input_stream_options options, reader_permit permit); input_stream make_compressed_file_m_format_input_stream(file f, sstables::compression* cm, uint64_t offset, size_t len, - class file_input_stream_options options); + class file_input_stream_options options, reader_permit permit); output_stream make_compressed_file_m_format_output_stream(output_stream out, sstables::compression* cm, diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 029e30cee0..5bda0aee20 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2287,7 +2287,7 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior options.read_ahead = 4; options.dynamic_adjustments = std::move(history); - file f = make_tracked_file(_data_file, std::move(permit)); + file f = make_tracked_file(_data_file, permit); if (trace_state) { f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename())); } @@ -2296,10 +2296,10 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior if (_components->compression) { if (_version >= sstable_version_types::mc) { return make_compressed_file_m_format_input_stream(f, &_components->compression, - pos, len, std::move(options)); + pos, len, std::move(options), permit); } else { return make_compressed_file_k_l_format_input_stream(f, &_components->compression, - pos, len, std::move(options)); + pos, len, std::move(options), permit); } } diff --git a/test/boost/sstable_test.cc b/test/boost/sstable_test.cc index 40154a295d..fd25fa7d23 100644 --- a/test/boost/sstable_test.cc +++ b/test/boost/sstable_test.cc @@ -18,6 +18,7 @@ #include "compaction/compaction_manager.hh" #include "sstables/key.hh" #include "test/lib/sstable_utils.hh" +#include "test/lib/reader_concurrency_semaphore.hh" #include #include "schema.hh" #include "compress.hh" @@ -752,6 +753,8 @@ SEASTAR_TEST_CASE(sub_partitions_read) { SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) { return seastar::async([] { + tests::reader_concurrency_semaphore_wrapper semaphore; + tmpdir tmp; auto file_path = (tmp.path() / "test").string(); file f = open_file_dma(file_path, open_flags::create | open_flags::wo).get0(); @@ -787,7 +790,7 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) { auto make_is = [&] { f = open_file_dma(file_path, open_flags::ro).get0(); - return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts); + return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts, semaphore.make_permit()); }; auto expect = [] (input_stream& in, const temporary_buffer& buf) {