diff --git a/compress.hh b/compress.hh index b92de65d1d..887aacef44 100644 --- a/compress.hh +++ b/compress.hh @@ -23,21 +23,25 @@ public: static constexpr auto CRC_CHECK_CHANCE = "crc_check_chance"; private: compressor _compressor = compressor::none; - int _chunk_length = DEFAULT_CHUNK_LENGTH; - double _crc_check_chance = DEFAULT_CRC_CHECK_CHANCE; + std::experimental::optional _chunk_length; + std::experimental::optional _crc_check_chance; public: compression_parameters() = default; compression_parameters(compressor c) : _compressor(c) { } compression_parameters(const std::map& options) { validate_options(options); - const auto& compressor_class = options.at(SSTABLE_COMPRESSION); - if (compressor_class == "LZ4Compressor") { + auto it = options.find(SSTABLE_COMPRESSION); + if (it == options.end()) { + return; + } + const auto& compressor_class = it->second; + if (is_compressor_class(compressor_class, "LZ4Compressor")) { _compressor = compressor::lz4; - } else if (compressor_class == "SnappyCompressor") { + } else if (is_compressor_class(compressor_class, "SnappyCompressor")) { _compressor = compressor::snappy; - } else if (compressor_class == "DeflateCompressor") { - _compressor = compressor::none; + } else if (is_compressor_class(compressor_class, "DeflateCompressor")) { + _compressor = compressor::deflate; } else { throw exceptions::configuration_exception(sstring("Unsupported compression class '") + compressor_class + "'."); } @@ -60,21 +64,39 @@ public: } compressor get_compressor() const { return _compressor; } - int32_t chunk_length() const { return _chunk_length; } - double crc_check_chance() const { return _crc_check_chance; } + int32_t chunk_length() const { return _chunk_length.value_or(int(DEFAULT_CHUNK_LENGTH)); } + double crc_check_chance() const { return _crc_check_chance.value_or(double(DEFAULT_CRC_CHECK_CHANCE)); } void validate() { - if (_chunk_length <= 0) { - throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB); + if (_chunk_length) { + auto chunk_length = _chunk_length.value(); + if (chunk_length <= 0) { + throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB); + } + // _chunk_length must be a power of two + if (chunk_length & (chunk_length - 1)) { + throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2."); + } } - // _chunk_length must be a power of two - if (_chunk_length & (_chunk_length - 1)) { - throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2."); - } - if (_crc_check_chance < 0.0 || _crc_check_chance > 1.0) { + if (_crc_check_chance && (_crc_check_chance.value() < 0.0 || _crc_check_chance.value() > 1.0)) { throw exceptions::configuration_exception(sstring(CRC_CHECK_CHANCE) + " must be between 0.0 and 1.0."); } } + + std::map get_options() const { + if (_compressor == compressor::none) { + return std::map(); + } + std::map opts; + opts.emplace(sstring(SSTABLE_COMPRESSION), compressor_name()); + if (_chunk_length) { + opts.emplace(sstring(CHUNK_LENGTH_KB), std::to_string(_chunk_length.value() / 1024)); + } + if (_crc_check_chance) { + opts.emplace(sstring(CRC_CHECK_CHANCE), std::to_string(_crc_check_chance.value())); + } + return opts; + } private: void validate_options(const std::map& options) { // currently, there are no options specific to a particular compressor @@ -89,4 +111,20 @@ private: } } } + bool is_compressor_class(const sstring& value, const sstring& class_name) { + static const sstring namespace_prefix = "org.apache.cassandra.io.compress."; + return value == class_name || value == namespace_prefix + class_name; + } + sstring compressor_name() const { + switch (_compressor) { + case compressor::lz4: + return "org.apache.cassandra.io.compress.LZ4Compressor"; + case compressor::snappy: + return "org.apache.cassandra.io.compress.SnappyCompressor"; + case compressor::deflate: + return "org.apache.cassandra.io.compress.DeflateCompressor"; + default: + abort(); + } + } }; diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh index c9fd953091..2bd0de27ef 100644 --- a/cql3/statements/cf_prop_defs.hh +++ b/cql3/statements/cf_prop_defs.hh @@ -161,19 +161,10 @@ public: } #endif - // Keep this in sync with apply_to_schema(). void apply_to_builder(schema_builder& builder) { if (has_property(KW_COMMENT)) { builder.set_comment(get_string(KW_COMMENT, "")); } - } - - // Keep this in sync with apply_to_builder(). - void apply_to_schema(schema* s) { - if (has_property(KW_COMMENT)) { - s->set_comment(get_string(KW_COMMENT, "")); - } - #if 0 cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance())); cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepairChance())); @@ -199,7 +190,7 @@ public: cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance())); #endif if (!get_compression_options().empty()) { - s->set_compressor_params(compression_parameters(get_compression_options())); + builder.set_compressor_params(compression_parameters(get_compression_options())); } #if 0 CachingOptions cachingOptions = getCachingOptions(); diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index c5a9e95dea..463210cba9 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -1023,8 +1023,9 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE #if 0 adder.add("compaction_strategy_class", table.compactionStrategyClass.getName()); adder.add("compaction_strategy_options", json(table.compactionStrategyOptions)); - adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions())); #endif + const auto& compression_options = table->get_compressor_params(); + m.set_clustered_cell(ckey, "compression_parameters", json::to_json(compression_options.get_options()), timestamp); m.set_clustered_cell(ckey, "default_time_to_live", table->default_time_to_live().count(), timestamp); #if 0 adder.add("default_validator", table.getDefaultValidator().toString()); @@ -1259,7 +1260,11 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE if (result.has("speculative_retry")) cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry"))); cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class"))); - cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); +#endif + auto comp_param = table_row.get_nonnull("compression_parameters"); + compression_parameters cp(json::to_map(comp_param)); + builder.set_compressor_params(cp); +#if 0 cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); if (result.has("min_index_interval")) diff --git a/schema_builder.hh b/schema_builder.hh index 3204b4b056..176360d689 100644 --- a/schema_builder.hh +++ b/schema_builder.hh @@ -47,6 +47,9 @@ public: gc_clock::duration default_time_to_live() const { return _raw._default_time_to_live; } + void set_compressor_params(const compression_parameters& cp) { + _raw._compressor_params = cp; + } column_definition& find_column(const cql3::column_identifier&); schema_builder& with_column(const column_definition& c); diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index 85a1cc3cf2..8dd5e8631d 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -903,6 +903,7 @@ SEASTAR_TEST_CASE(test_table_compression) { }).then_wrapped([&e] (auto f) { assert(!f.failed()); e.require_table_exists("ks", "tb1"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb1")->get_compressor_params().get_compressor() == compressor::none); return e.execute_cql("create table tb2 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'LossyCompressor' };"); }).then_wrapped([&e] (auto f) { assert_that_failed(f); @@ -916,6 +917,18 @@ SEASTAR_TEST_CASE(test_table_compression) { }).then_wrapped([&e] (auto f) { assert(!f.failed()); e.require_table_exists("ks", "tb2"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().get_compressor() == compressor::lz4); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().chunk_length() == 2 * 1024); + return e.execute_cql("create table tb3 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'DeflateCompressor' };"); + }).then_wrapped([&e] (auto f) { + assert(!f.failed()); + e.require_table_exists("ks", "tb3"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb3")->get_compressor_params().get_compressor() == compressor::deflate); + return e.execute_cql("create table tb4 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'org.apache.cassandra.io.compress.DeflateCompressor' };"); + }).then_wrapped([&e] (auto f) { + assert(!f.failed()); + e.require_table_exists("ks", "tb4"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb4")->get_compressor_params().get_compressor() == compressor::deflate); }); }); } \ No newline at end of file