Merge "add support to snappy and deflate compressors" from Raphael

This commit is contained in:
Avi Kivity
2015-06-18 13:44:49 +03:00
2 changed files with 60 additions and 17 deletions

View File

@@ -135,8 +135,26 @@ size_t uncompress_deflate(const char* input, size_t input_len,
size_t compress_deflate(const char* input, size_t input_len,
char* output, size_t output_len) {
// FIXME: implement.
fail(unimplemented::cause::COMPRESSION);
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = 0;
zs.next_in = Z_NULL;
if (deflateInit(&zs, Z_DEFAULT_COMPRESSION) != Z_OK) {
throw std::runtime_error("deflate compression init failure");
}
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(input));
zs.avail_in = input_len;
zs.next_out = reinterpret_cast<unsigned char*>(output);
zs.avail_out = output_len;
auto res = deflate(&zs, Z_FINISH);
deflateEnd(&zs);
if (res == Z_STREAM_END) {
return output_len - zs.avail_out;
} else {
throw std::runtime_error("deflate compression failure");
}
}
size_t uncompress_snappy(const char* input, size_t input_len,
@@ -151,8 +169,11 @@ size_t uncompress_snappy(const char* input, size_t input_len,
size_t compress_snappy(const char* input, size_t input_len,
char* output, size_t output_len) {
// FIXME: implement.
fail(unimplemented::cause::COMPRESSION);
auto ret = snappy_compress(input, input_len, output, &output_len);
if (ret != SNAPPY_OK) {
throw std::runtime_error("snappy compression failure: snappy_compress() failed");
}
return output_len;
}
size_t compress_max_size_lz4(size_t input_len) {
@@ -160,13 +181,22 @@ size_t compress_max_size_lz4(size_t input_len) {
}
size_t compress_max_size_deflate(size_t input_len) {
// FIXME: implement.
fail(unimplemented::cause::COMPRESSION);
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = 0;
zs.next_in = Z_NULL;
if (deflateInit(&zs, Z_DEFAULT_COMPRESSION) != Z_OK) {
throw std::runtime_error("deflate compression init failure");
}
auto res = deflateBound(&zs, input_len);
deflateEnd(&zs);
return res;
}
size_t compress_max_size_snappy(size_t input_len) {
// FIXME: implement.
fail(unimplemented::cause::COMPRESSION);
return snappy_max_compressed_length(input_len);
}
class compressed_file_data_source_impl : public data_source_impl {

View File

@@ -889,12 +889,12 @@ SEASTAR_TEST_CASE(datafile_generation_12) {
});
}
SEASTAR_TEST_CASE(datafile_generation_13) {
return test_setup::do_with_test_directory([] {
static future<> sstable_compression_test(compressor c, unsigned generation) {
return test_setup::do_with_test_directory([c, generation] {
auto& cs = *complex_schema();
auto s = make_lw_shared(schema(cs));
// NOTE: set lz4 as compressor algorithm to schema.
s->set_compressor(compressor::lz4);
// NOTE: set a given compressor algorithm to schema.
s->set_compressor(c);
auto mtp = make_shared<memtable>(s);
@@ -907,9 +907,9 @@ SEASTAR_TEST_CASE(datafile_generation_13) {
m.partition().apply_delete(*s, cp, tomb);
mtp->apply(std::move(m));
auto sst = make_lw_shared<sstable>("tests/urchin/sstables/tests-temporary", 13, la, big);
return sst->write_components(*mtp).then([s, tomb] {
return reusable_sst("tests/urchin/sstables/tests-temporary", 13).then([s, tomb] (auto sstp) mutable {
auto sst = make_lw_shared<sstable>("tests/urchin/sstables/tests-temporary", generation, la, big);
return sst->write_components(*mtp).then([s, tomb, generation] {
return reusable_sst("tests/urchin/sstables/tests-temporary", generation).then([s, tomb] (auto sstp) mutable {
return do_with(sstables::key("key1"), [sstp, s, tomb] (auto& key) {
return sstp->read_row(s, key).then([sstp, s, tomb] (auto mutation) {
auto& mp = mutation->partition();
@@ -924,7 +924,19 @@ SEASTAR_TEST_CASE(datafile_generation_13) {
});
}
SEASTAR_TEST_CASE(datafile_generation_13) {
return sstable_compression_test(compressor::lz4, 13);
}
SEASTAR_TEST_CASE(datafile_generation_14) {
return sstable_compression_test(compressor::snappy, 14);
}
SEASTAR_TEST_CASE(datafile_generation_15) {
return sstable_compression_test(compressor::deflate, 15);
}
SEASTAR_TEST_CASE(datafile_generation_16) {
return test_setup::do_with_test_directory([] {
auto s = uncompressed_schema();
@@ -940,12 +952,13 @@ SEASTAR_TEST_CASE(datafile_generation_14) {
mtp->apply(std::move(m));
}
auto sst = make_lw_shared<sstable>("tests/urchin/sstables/tests-temporary", 14, la, big);
auto sst = make_lw_shared<sstable>("tests/urchin/sstables/tests-temporary", 16, la, big);
return sst->write_components(*mtp).then([s] {
return reusable_sst("tests/urchin/sstables/tests-temporary", 14).then([] (auto s) {
return reusable_sst("tests/urchin/sstables/tests-temporary", 16).then([] (auto s) {
// Not crashing is enough
return make_ready_future<>();
});
}).then([sst, mtp] {});
});
}