diff --git a/compress.cc b/compress.cc index 56a4599ddb..614414c722 100644 --- a/compress.cc +++ b/compress.cc @@ -328,23 +328,6 @@ bool compressor::is_hidden_option_name(std::string_view sv) { return sv.starts_with('.'); } -compressor_ptr compressor::create(const compression_parameters& params) { - using algorithm = compression_parameters::algorithm; - switch (params.get_algorithm()) { - case algorithm::lz4: - return lz4; - case algorithm::deflate: - return deflate; - case algorithm::snappy: - return snappy; - case algorithm::zstd: { - return seastar::make_shared(params); - } - case algorithm::none: - return nullptr; - } -} - thread_local const shared_ptr compressor::lz4 = ::make_shared(); thread_local const shared_ptr compressor::snappy = ::make_shared(); thread_local const shared_ptr compressor::deflate = ::make_shared(); diff --git a/compress.hh b/compress.hh index 0e94f22ad9..7e7b1afa4a 100644 --- a/compress.hh +++ b/compress.hh @@ -61,8 +61,6 @@ public: using ptr_type = shared_ptr; - static ptr_type create(const compression_parameters& params); - static thread_local const ptr_type lz4; static thread_local const ptr_type snappy; static thread_local const ptr_type deflate; diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 3246a8323e..743e12bd01 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -886,7 +886,7 @@ void writer::init_file_writers() { if (!_compression_enabled) { _data_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, _sst.get_filename()); } else { - auto compressor = compressor::create(_sst._schema->get_compressor_params()); + auto compressor = _sst.manager().get_compressor_factory().make_compressor_for_writing(_sst._schema).get(); _data_writer = std::make_unique( make_compressed_file_m_format_output_stream( output_stream(std::move(out)), @@ -1494,6 +1494,12 @@ void writer::consume_end_of_stream() { _sst.write_filter(); _sst.write_statistics(); _sst.write_compression(); + // Note: during the SSTable write, the `compressor` object in `_sst._components->compression` + // can only compress, not decompress. We have to create a decompressing `compressor` here. + // (The reason we split the two is that we don't want to keep the compressor-specific compression + // context after the write is over, because it hogs memory). + auto decompressor = _sst.manager().get_compressor_factory().make_compressor_for_reading(_sst._components->compression).get(); + _sst._components->compression.set_compressor(std::move(decompressor)); run_identifier identifier{_run_identifier}; std::optional ld_stats(scylla_metadata::large_data_stats{ .map = { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index ace11554f8..3dc771c808 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1053,7 +1053,8 @@ future<> sstable::read_compression() { } co_await read_simple(_components->compression); - _components->compression.set_compressor(compressor::create(options_from_compression(_components->compression))); + auto compressor = co_await manager().get_compressor_factory().make_compressor_for_reading(_components->compression); + _components->compression.set_compressor(std::move(compressor)); } void sstable::write_compression() {