Merge "Compression parameters in schema definitions" from Paweł

"When compressor_parameters was introduced it only performed properties
validation, but wasn't properly wired to the rest of the code and the
compression information never made it to the final schema object.

This patchset changes that, now compression parameters are correctly processed
by schema builder as well written and read from system tables. Updated test
case makes sure that not only incorrect values are rejected during validation,
but also that correct values really are set in the created schema."
This commit is contained in:
Avi Kivity
2015-06-29 12:00:30 +03:00
5 changed files with 78 additions and 28 deletions

View File

@@ -23,21 +23,25 @@ public:
static constexpr auto CRC_CHECK_CHANCE = "crc_check_chance";
private:
compressor _compressor = compressor::none;
int _chunk_length = DEFAULT_CHUNK_LENGTH;
double _crc_check_chance = DEFAULT_CRC_CHECK_CHANCE;
std::experimental::optional<int> _chunk_length;
std::experimental::optional<double> _crc_check_chance;
public:
compression_parameters() = default;
compression_parameters(compressor c) : _compressor(c) { }
compression_parameters(const std::map<sstring, sstring>& options) {
validate_options(options);
const auto& compressor_class = options.at(SSTABLE_COMPRESSION);
if (compressor_class == "LZ4Compressor") {
auto it = options.find(SSTABLE_COMPRESSION);
if (it == options.end()) {
return;
}
const auto& compressor_class = it->second;
if (is_compressor_class(compressor_class, "LZ4Compressor")) {
_compressor = compressor::lz4;
} else if (compressor_class == "SnappyCompressor") {
} else if (is_compressor_class(compressor_class, "SnappyCompressor")) {
_compressor = compressor::snappy;
} else if (compressor_class == "DeflateCompressor") {
_compressor = compressor::none;
} else if (is_compressor_class(compressor_class, "DeflateCompressor")) {
_compressor = compressor::deflate;
} else {
throw exceptions::configuration_exception(sstring("Unsupported compression class '") + compressor_class + "'.");
}
@@ -60,21 +64,39 @@ public:
}
compressor get_compressor() const { return _compressor; }
int32_t chunk_length() const { return _chunk_length; }
double crc_check_chance() const { return _crc_check_chance; }
int32_t chunk_length() const { return _chunk_length.value_or(int(DEFAULT_CHUNK_LENGTH)); }
double crc_check_chance() const { return _crc_check_chance.value_or(double(DEFAULT_CRC_CHECK_CHANCE)); }
void validate() {
if (_chunk_length <= 0) {
throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB);
if (_chunk_length) {
auto chunk_length = _chunk_length.value();
if (chunk_length <= 0) {
throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB);
}
// _chunk_length must be a power of two
if (chunk_length & (chunk_length - 1)) {
throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2.");
}
}
// _chunk_length must be a power of two
if (_chunk_length & (_chunk_length - 1)) {
throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2.");
}
if (_crc_check_chance < 0.0 || _crc_check_chance > 1.0) {
if (_crc_check_chance && (_crc_check_chance.value() < 0.0 || _crc_check_chance.value() > 1.0)) {
throw exceptions::configuration_exception(sstring(CRC_CHECK_CHANCE) + " must be between 0.0 and 1.0.");
}
}
std::map<sstring, sstring> get_options() const {
if (_compressor == compressor::none) {
return std::map<sstring, sstring>();
}
std::map<sstring, sstring> opts;
opts.emplace(sstring(SSTABLE_COMPRESSION), compressor_name());
if (_chunk_length) {
opts.emplace(sstring(CHUNK_LENGTH_KB), std::to_string(_chunk_length.value() / 1024));
}
if (_crc_check_chance) {
opts.emplace(sstring(CRC_CHECK_CHANCE), std::to_string(_crc_check_chance.value()));
}
return opts;
}
private:
void validate_options(const std::map<sstring, sstring>& options) {
// currently, there are no options specific to a particular compressor
@@ -89,4 +111,20 @@ private:
}
}
}
bool is_compressor_class(const sstring& value, const sstring& class_name) {
static const sstring namespace_prefix = "org.apache.cassandra.io.compress.";
return value == class_name || value == namespace_prefix + class_name;
}
sstring compressor_name() const {
switch (_compressor) {
case compressor::lz4:
return "org.apache.cassandra.io.compress.LZ4Compressor";
case compressor::snappy:
return "org.apache.cassandra.io.compress.SnappyCompressor";
case compressor::deflate:
return "org.apache.cassandra.io.compress.DeflateCompressor";
default:
abort();
}
}
};

View File

@@ -161,19 +161,10 @@ public:
}
#endif
// Keep this in sync with apply_to_schema().
void apply_to_builder(schema_builder& builder) {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
}
}
// Keep this in sync with apply_to_builder().
void apply_to_schema(schema* s) {
if (has_property(KW_COMMENT)) {
s->set_comment(get_string(KW_COMMENT, ""));
}
#if 0
cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepairChance()));
@@ -199,7 +190,7 @@ public:
cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
#endif
if (!get_compression_options().empty()) {
s->set_compressor_params(compression_parameters(get_compression_options()));
builder.set_compressor_params(compression_parameters(get_compression_options()));
}
#if 0
CachingOptions cachingOptions = getCachingOptions();

View File

@@ -1023,8 +1023,9 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
#if 0
adder.add("compaction_strategy_class", table.compactionStrategyClass.getName());
adder.add("compaction_strategy_options", json(table.compactionStrategyOptions));
adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions()));
#endif
const auto& compression_options = table->get_compressor_params();
m.set_clustered_cell(ckey, "compression_parameters", json::to_json(compression_options.get_options()), timestamp);
m.set_clustered_cell(ckey, "default_time_to_live", table->default_time_to_live().count(), timestamp);
#if 0
adder.add("default_validator", table.getDefaultValidator().toString());
@@ -1259,7 +1260,11 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
if (result.has("speculative_retry"))
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
#endif
auto comp_param = table_row.get_nonnull<sstring>("compression_parameters");
compression_parameters cp(json::to_map(comp_param));
builder.set_compressor_params(cp);
#if 0
cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
if (result.has("min_index_interval"))

View File

@@ -47,6 +47,9 @@ public:
gc_clock::duration default_time_to_live() const {
return _raw._default_time_to_live;
}
void set_compressor_params(const compression_parameters& cp) {
_raw._compressor_params = cp;
}
column_definition& find_column(const cql3::column_identifier&);
schema_builder& with_column(const column_definition& c);

View File

@@ -903,6 +903,7 @@ SEASTAR_TEST_CASE(test_table_compression) {
}).then_wrapped([&e] (auto f) {
assert(!f.failed());
e.require_table_exists("ks", "tb1");
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb1")->get_compressor_params().get_compressor() == compressor::none);
return e.execute_cql("create table tb2 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'LossyCompressor' };");
}).then_wrapped([&e] (auto f) {
assert_that_failed(f);
@@ -916,6 +917,18 @@ SEASTAR_TEST_CASE(test_table_compression) {
}).then_wrapped([&e] (auto f) {
assert(!f.failed());
e.require_table_exists("ks", "tb2");
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().get_compressor() == compressor::lz4);
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb2")->get_compressor_params().chunk_length() == 2 * 1024);
return e.execute_cql("create table tb3 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'DeflateCompressor' };");
}).then_wrapped([&e] (auto f) {
assert(!f.failed());
e.require_table_exists("ks", "tb3");
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb3")->get_compressor_params().get_compressor() == compressor::deflate);
return e.execute_cql("create table tb4 (foo text PRIMARY KEY, bar text) with compression = { 'sstable_compression' : 'org.apache.cassandra.io.compress.DeflateCompressor' };");
}).then_wrapped([&e] (auto f) {
assert(!f.failed());
e.require_table_exists("ks", "tb4");
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb4")->get_compressor_params().get_compressor() == compressor::deflate);
});
});
}