diff --git a/sstables/compress.cc b/sstables/compress.cc index 8fbcb2ee9a..09a8de8924 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -135,8 +135,26 @@ size_t uncompress_deflate(const char* input, size_t input_len, size_t compress_deflate(const char* input, size_t input_len, char* output, size_t output_len) { - // FIXME: implement. - fail(unimplemented::cause::COMPRESSION); + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = 0; + zs.next_in = Z_NULL; + if (deflateInit(&zs, Z_DEFAULT_COMPRESSION) != Z_OK) { + throw std::runtime_error("deflate compression init failure"); + } + zs.next_in = reinterpret_cast(const_cast(input)); + zs.avail_in = input_len; + zs.next_out = reinterpret_cast(output); + zs.avail_out = output_len; + auto res = deflate(&zs, Z_FINISH); + deflateEnd(&zs); + if (res == Z_STREAM_END) { + return output_len - zs.avail_out; + } else { + throw std::runtime_error("deflate compression failure"); + } } size_t uncompress_snappy(const char* input, size_t input_len, @@ -151,8 +169,11 @@ size_t uncompress_snappy(const char* input, size_t input_len, size_t compress_snappy(const char* input, size_t input_len, char* output, size_t output_len) { - // FIXME: implement. - fail(unimplemented::cause::COMPRESSION); + auto ret = snappy_compress(input, input_len, output, &output_len); + if (ret != SNAPPY_OK) { + throw std::runtime_error("snappy compression failure: snappy_compress() failed"); + } + return output_len; } size_t compress_max_size_lz4(size_t input_len) { @@ -160,13 +181,22 @@ size_t compress_max_size_lz4(size_t input_len) { } size_t compress_max_size_deflate(size_t input_len) { - // FIXME: implement. - fail(unimplemented::cause::COMPRESSION); + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = 0; + zs.next_in = Z_NULL; + if (deflateInit(&zs, Z_DEFAULT_COMPRESSION) != Z_OK) { + throw std::runtime_error("deflate compression init failure"); + } + auto res = deflateBound(&zs, input_len); + deflateEnd(&zs); + return res; } size_t compress_max_size_snappy(size_t input_len) { - // FIXME: implement. - fail(unimplemented::cause::COMPRESSION); + return snappy_max_compressed_length(input_len); } class compressed_file_data_source_impl : public data_source_impl { diff --git a/tests/urchin/sstable_datafile_test.cc b/tests/urchin/sstable_datafile_test.cc index eac5ca50d1..ff1d4c36b6 100644 --- a/tests/urchin/sstable_datafile_test.cc +++ b/tests/urchin/sstable_datafile_test.cc @@ -889,12 +889,12 @@ SEASTAR_TEST_CASE(datafile_generation_12) { }); } -SEASTAR_TEST_CASE(datafile_generation_13) { - return test_setup::do_with_test_directory([] { +static future<> sstable_compression_test(compressor c, unsigned generation) { + return test_setup::do_with_test_directory([c, generation] { auto& cs = *complex_schema(); auto s = make_lw_shared(schema(cs)); - // NOTE: set lz4 as compressor algorithm to schema. - s->set_compressor(compressor::lz4); + // NOTE: set a given compressor algorithm to schema. + s->set_compressor(c); auto mtp = make_shared(s); @@ -907,9 +907,9 @@ SEASTAR_TEST_CASE(datafile_generation_13) { m.partition().apply_delete(*s, cp, tomb); mtp->apply(std::move(m)); - auto sst = make_lw_shared("tests/urchin/sstables/tests-temporary", 13, la, big); - return sst->write_components(*mtp).then([s, tomb] { - return reusable_sst("tests/urchin/sstables/tests-temporary", 13).then([s, tomb] (auto sstp) mutable { + auto sst = make_lw_shared("tests/urchin/sstables/tests-temporary", generation, la, big); + return sst->write_components(*mtp).then([s, tomb, generation] { + return reusable_sst("tests/urchin/sstables/tests-temporary", generation).then([s, tomb] (auto sstp) mutable { return do_with(sstables::key("key1"), [sstp, s, tomb] (auto& key) { return sstp->read_row(s, key).then([sstp, s, tomb] (auto mutation) { auto& mp = mutation->partition(); @@ -924,7 +924,19 @@ SEASTAR_TEST_CASE(datafile_generation_13) { }); } +SEASTAR_TEST_CASE(datafile_generation_13) { + return sstable_compression_test(compressor::lz4, 13); +} + SEASTAR_TEST_CASE(datafile_generation_14) { + return sstable_compression_test(compressor::snappy, 14); +} + +SEASTAR_TEST_CASE(datafile_generation_15) { + return sstable_compression_test(compressor::deflate, 15); +} + +SEASTAR_TEST_CASE(datafile_generation_16) { return test_setup::do_with_test_directory([] { auto s = uncompressed_schema(); @@ -940,12 +952,13 @@ SEASTAR_TEST_CASE(datafile_generation_14) { mtp->apply(std::move(m)); } - auto sst = make_lw_shared("tests/urchin/sstables/tests-temporary", 14, la, big); + auto sst = make_lw_shared("tests/urchin/sstables/tests-temporary", 16, la, big); return sst->write_components(*mtp).then([s] { - return reusable_sst("tests/urchin/sstables/tests-temporary", 14).then([] (auto s) { + return reusable_sst("tests/urchin/sstables/tests-temporary", 16).then([] (auto s) { // Not crashing is enough return make_ready_future<>(); }); }).then([sst, mtp] {}); }); } +