sstables: delegate compressor creation to the compressor factory

Remove `compressor::create()`. This enforces that compressors
are only created through the `sstable_compressor_factory`.

Unlike the synchronous `compressor::create()`, the factory will be able
to create dict-aware compressors.
This commit is contained in:
Michał Chojnowski
2025-02-14 01:39:59 +01:00
parent 30a9d471fa
commit b18ddcb92e
4 changed files with 9 additions and 21 deletions

View File

@@ -328,23 +328,6 @@ bool compressor::is_hidden_option_name(std::string_view sv) {
return sv.starts_with('.');
}
compressor_ptr compressor::create(const compression_parameters& params) {
using algorithm = compression_parameters::algorithm;
switch (params.get_algorithm()) {
case algorithm::lz4:
return lz4;
case algorithm::deflate:
return deflate;
case algorithm::snappy:
return snappy;
case algorithm::zstd: {
return seastar::make_shared<zstd_processor>(params);
}
case algorithm::none:
return nullptr;
}
}
thread_local const shared_ptr<compressor> compressor::lz4 = ::make_shared<lz4_processor>();
thread_local const shared_ptr<compressor> compressor::snappy = ::make_shared<snappy_processor>();
thread_local const shared_ptr<compressor> compressor::deflate = ::make_shared<deflate_processor>();

View File

@@ -61,8 +61,6 @@ public:
using ptr_type = shared_ptr<compressor>;
static ptr_type create(const compression_parameters& params);
static thread_local const ptr_type lz4;
static thread_local const ptr_type snappy;
static thread_local const ptr_type deflate;

View File

@@ -886,7 +886,7 @@ void writer::init_file_writers() {
if (!_compression_enabled) {
_data_writer = std::make_unique<crc32_checksummed_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.get_filename());
} else {
auto compressor = compressor::create(_sst._schema->get_compressor_params());
auto compressor = _sst.manager().get_compressor_factory().make_compressor_for_writing(_sst._schema).get();
_data_writer = std::make_unique<file_writer>(
make_compressed_file_m_format_output_stream(
output_stream<char>(std::move(out)),
@@ -1494,6 +1494,12 @@ void writer::consume_end_of_stream() {
_sst.write_filter();
_sst.write_statistics();
_sst.write_compression();
// Note: during the SSTable write, the `compressor` object in `_sst._components->compression`
// can only compress, not decompress. We have to create a decompressing `compressor` here.
// (The reason we split the two is that we don't want to keep the compressor-specific compression
// context after the write is over, because it hogs memory).
auto decompressor = _sst.manager().get_compressor_factory().make_compressor_for_reading(_sst._components->compression).get();
_sst._components->compression.set_compressor(std::move(decompressor));
run_identifier identifier{_run_identifier};
std::optional<scylla_metadata::large_data_stats> ld_stats(scylla_metadata::large_data_stats{
.map = {

View File

@@ -1053,7 +1053,8 @@ future<> sstable::read_compression() {
}
co_await read_simple<component_type::CompressionInfo>(_components->compression);
_components->compression.set_compressor(compressor::create(options_from_compression(_components->compression)));
auto compressor = co_await manager().get_compressor_factory().make_compressor_for_reading(_components->compression);
_components->compression.set_compressor(std::move(compressor));
}
void sstable::write_compression() {