From c51a430020e302add172092e4ce0334e7e05ab75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:11:47 +0200 Subject: [PATCH 1/9] compress: "DeflateCompressor" is not compressor::none MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- compress.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compress.hh b/compress.hh index b92de65d1d..62f977d6b1 100644 --- a/compress.hh +++ b/compress.hh @@ -37,7 +37,7 @@ public: } else if (compressor_class == "SnappyCompressor") { _compressor = compressor::snappy; } else if (compressor_class == "DeflateCompressor") { - _compressor = compressor::none; + _compressor = compressor::deflate; } else { throw exceptions::configuration_exception(sstring("Unsupported compression class '") + compressor_class + "'."); } From 9134381638f3d39c11dab418a285ddfb52bbb027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Mon, 29 Jun 2015 09:05:50 +0200 Subject: [PATCH 2/9] compress: accept both qualified and unqualified class names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- compress.hh | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/compress.hh b/compress.hh index 62f977d6b1..1e8c47d4fc 100644 --- a/compress.hh +++ b/compress.hh @@ -32,11 +32,11 @@ public: validate_options(options); const auto& compressor_class = options.at(SSTABLE_COMPRESSION); - if (compressor_class == "LZ4Compressor") { + if (is_compressor_class(compressor_class, "LZ4Compressor")) { _compressor = compressor::lz4; - } else if (compressor_class == "SnappyCompressor") { + } else if (is_compressor_class(compressor_class, "SnappyCompressor")) { _compressor = compressor::snappy; - } else if (compressor_class == "DeflateCompressor") { + } else if (is_compressor_class(compressor_class, "DeflateCompressor")) { _compressor = compressor::deflate; } else { throw exceptions::configuration_exception(sstring("Unsupported compression class '") + compressor_class + "'."); @@ -89,4 +89,8 @@ private: } } } + bool is_compressor_class(const sstring& value, const sstring& class_name) { + static const sstring namespace_prefix = "org.apache.cassandra.io.compress."; + return value == class_name || value == namespace_prefix + class_name; + } }; From f4ce125422c295b11dff13e920d998c8369d51d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Mon, 29 Jun 2015 09:30:31 +0200 Subject: [PATCH 3/9] compress: use std::optional for chunk length and crc check chance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- compress.hh | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/compress.hh b/compress.hh index 1e8c47d4fc..2a207bab81 100644 --- a/compress.hh +++ b/compress.hh @@ -23,8 +23,8 @@ public: static constexpr auto CRC_CHECK_CHANCE = "crc_check_chance"; private: compressor _compressor = compressor::none; - int _chunk_length = DEFAULT_CHUNK_LENGTH; - double _crc_check_chance = DEFAULT_CRC_CHECK_CHANCE; + std::experimental::optional _chunk_length; + std::experimental::optional _crc_check_chance; public: compression_parameters() = default; compression_parameters(compressor c) : _compressor(c) { } @@ -60,18 +60,21 @@ public: } compressor get_compressor() const { return _compressor; } - int32_t chunk_length() const { return _chunk_length; } - double crc_check_chance() const { return _crc_check_chance; } + int32_t chunk_length() const { return _chunk_length.value_or(int(DEFAULT_CHUNK_LENGTH)); } + double crc_check_chance() const { return _crc_check_chance.value_or(double(DEFAULT_CRC_CHECK_CHANCE)); } void validate() { - if (_chunk_length <= 0) { - throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB); + if (_chunk_length) { + auto chunk_length = _chunk_length.value(); + if (chunk_length <= 0) { + throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB); + } + // _chunk_length must be a power of two + if (chunk_length & (chunk_length - 1)) { + throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2."); + } } - // _chunk_length must be a power of two - if (_chunk_length & (_chunk_length - 1)) { - throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2."); - } - if (_crc_check_chance < 0.0 || _crc_check_chance > 1.0) { + if (_crc_check_chance && (_crc_check_chance.value() < 0.0 || _crc_check_chance.value() > 1.0)) { throw exceptions::configuration_exception(sstring(CRC_CHECK_CHANCE) + " must be between 0.0 and 1.0."); } } From b520ef6172b032e537b8c247275eba5eb09508c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:37:23 +0200 Subject: [PATCH 4/9] compress: generate a std::map of options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- compress.hh | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/compress.hh b/compress.hh index 2a207bab81..9b2ba05d05 100644 --- a/compress.hh +++ b/compress.hh @@ -78,6 +78,21 @@ public: throw exceptions::configuration_exception(sstring(CRC_CHECK_CHANCE) + " must be between 0.0 and 1.0."); } } + + std::map get_options() const { + if (_compressor == compressor::none) { + return std::map(); + } + std::map opts; + opts.emplace(sstring(SSTABLE_COMPRESSION), compressor_name()); + if (_chunk_length) { + opts.emplace(sstring(CHUNK_LENGTH_KB), std::to_string(_chunk_length.value() / 1024)); + } + if (_crc_check_chance) { + opts.emplace(sstring(CRC_CHECK_CHANCE), std::to_string(_crc_check_chance.value())); + } + return opts; + } private: void validate_options(const std::map& options) { // currently, there are no options specific to a particular compressor @@ -96,4 +111,16 @@ private: static const sstring namespace_prefix = "org.apache.cassandra.io.compress."; return value == class_name || value == namespace_prefix + class_name; } + sstring compressor_name() const { + switch (_compressor) { + case compressor::lz4: + return "org.apache.cassandra.io.compress.LZ4Compressor"; + case compressor::snappy: + return "org.apache.cassandra.io.compress.SnappyCompressor"; + case compressor::deflate: + return "org.apache.cassandra.io.compress.DeflateCompressor"; + default: + abort(); + } + } }; From a0424d5d2734c6e67e76677eed8c19782346d321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:50:43 +0200 Subject: [PATCH 5/9] compressor: allow an empty map of options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- compress.hh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/compress.hh b/compress.hh index 9b2ba05d05..887aacef44 100644 --- a/compress.hh +++ b/compress.hh @@ -31,7 +31,11 @@ public: compression_parameters(const std::map& options) { validate_options(options); - const auto& compressor_class = options.at(SSTABLE_COMPRESSION); + auto it = options.find(SSTABLE_COMPRESSION); + if (it == options.end()) { + return; + } + const auto& compressor_class = it->second; if (is_compressor_class(compressor_class, "LZ4Compressor")) { _compressor = compressor::lz4; } else if (is_compressor_class(compressor_class, "SnappyCompressor")) { From 9606b3f9111a52195ae7ab0f5a2d41ca692a94c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:39:25 +0200 Subject: [PATCH 6/9] schema_builder: add set_compressor_params() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- schema_builder.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/schema_builder.hh b/schema_builder.hh index 3204b4b056..176360d689 100644 --- a/schema_builder.hh +++ b/schema_builder.hh @@ -47,6 +47,9 @@ public: gc_clock::duration default_time_to_live() const { return _raw._default_time_to_live; } + void set_compressor_params(const compression_parameters& cp) { + _raw._compressor_params = cp; + } column_definition& find_column(const cql3::column_identifier&); schema_builder& with_column(const column_definition& c); From bad8f7cc72853c6fdee25caffedcf6a3cf02c94d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:39:41 +0200 Subject: [PATCH 7/9] db/legacy_schema_tables: support compression_parameters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- db/legacy_schema_tables.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index c5a9e95dea..463210cba9 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -1023,8 +1023,9 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE #if 0 adder.add("compaction_strategy_class", table.compactionStrategyClass.getName()); adder.add("compaction_strategy_options", json(table.compactionStrategyOptions)); - adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions())); #endif + const auto& compression_options = table->get_compressor_params(); + m.set_clustered_cell(ckey, "compression_parameters", json::to_json(compression_options.get_options()), timestamp); m.set_clustered_cell(ckey, "default_time_to_live", table->default_time_to_live().count(), timestamp); #if 0 adder.add("default_validator", table.getDefaultValidator().toString()); @@ -1259,7 +1260,11 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE if (result.has("speculative_retry")) cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry"))); cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class"))); - cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); +#endif + auto comp_param = table_row.get_nonnull("compression_parameters"); + compression_parameters cp(json::to_map(comp_param)); + builder.set_compressor_params(cp); +#if 0 cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); if (result.has("min_index_interval")) From 9164e20a5d1ad803a2fcfb0ad67ee398fcef027a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 26 Jun 2015 22:51:59 +0200 Subject: [PATCH 8/9] cql3: remove cf_prop_defs::apply_to_schema() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This function isn't called by anything, all schema creation logic should be in apply_to_builder(). Signed-off-by: Paweł Dziepak --- cql3/statements/cf_prop_defs.hh | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh index c9fd953091..2bd0de27ef 100644 --- a/cql3/statements/cf_prop_defs.hh +++ b/cql3/statements/cf_prop_defs.hh @@ -161,19 +161,10 @@ public: } #endif - // Keep this in sync with apply_to_schema(). void apply_to_builder(schema_builder& builder) { if (has_property(KW_COMMENT)) { builder.set_comment(get_string(KW_COMMENT, "")); } - } - - // Keep this in sync with apply_to_builder(). - void apply_to_schema(schema* s) { - if (has_property(KW_COMMENT)) { - s->set_comment(get_string(KW_COMMENT, "")); - } - #if 0 cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance())); cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepairChance())); @@ -199,7 +190,7 @@ public: cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance())); #endif if (!get_compression_options().empty()) { - s->set_compressor_params(compression_parameters(get_compression_options())); + builder.set_compressor_params(compression_parameters(get_compression_options())); } #if 0 CachingOptions cachingOptions = getCachingOptions(); From 1c84a7231ac84dd61ce8593105028af95909110f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Mon, 29 Jun 2015 09:43:14 +0200 Subject: [PATCH 9/9] tests/cql: improve compression tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check whether the created tables have actually the appropriate compression parameters set. Signed-off-by: Paweł Dziepak --- tests/urchin/cql_query_test.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index 85a1cc3cf2..8dd5e8631d 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -903,6 +903,7 @@ SEASTAR_TEST_CASE(test_table_compression) { }).then_wrapped([&e] (auto f) { assert(!f.failed()); e.require_table_exists("ks", "tb1"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb1")->get_compressor_params().get_compressor() == compressor::none); return e.execute_cql("create table tb2 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'LossyCompressor' };"); }).then_wrapped([&e] (auto f) { assert_that_failed(f); @@ -916,6 +917,18 @@ SEASTAR_TEST_CASE(test_table_compression) { }).then_wrapped([&e] (auto f) { assert(!f.failed()); e.require_table_exists("ks", "tb2"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().get_compressor() == compressor::lz4); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().chunk_length() == 2 * 1024); + return e.execute_cql("create table tb3 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'DeflateCompressor' };"); + }).then_wrapped([&e] (auto f) { + assert(!f.failed()); + e.require_table_exists("ks", "tb3"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb3")->get_compressor_params().get_compressor() == compressor::deflate); + return e.execute_cql("create table tb4 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'org.apache.cassandra.io.compress.DeflateCompressor' };"); + }).then_wrapped([&e] (auto f) { + assert(!f.failed()); + e.require_table_exists("ks", "tb4"); + BOOST_REQUIRE(e.local_db().find_schema("ks", "tb4")->get_compressor_params().get_compressor() == compressor::deflate); }); }); } \ No newline at end of file