From feeaae06cfbed150a53a9c0bbb76c9e9ef1bc6a6 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 May 2015 11:55:55 -0300 Subject: [PATCH] sstables: add support to write tombstone cells Signed-off-by: Raphael S. Carvalho --- sstables/sstables.cc | 14 +++++-- tests/urchin/sstable_test.cc | 72 ++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index e017a5b002..97b4aa392c 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -897,11 +897,9 @@ static future<> write_static_column_name(output_stream& out, const bytes& // Intended to write all cell components that follow column name. static future<> write_cell(output_stream& out, atomic_cell_view cell) { - // FIXME: deleted cell isn't supported yet. + // FIXME: range tombstone and counter cells aren't supported yet. uint64_t timestamp = cell.timestamp(); - disk_string_view cell_value; - cell_value.value = cell.value(); if (cell.is_live_and_has_ttl()) { // expiring cell @@ -909,14 +907,24 @@ static future<> write_cell(output_stream& out, atomic_cell_view cell) { column_mask mask = column_mask::expiration; uint32_t ttl = cell.ttl().count(); uint32_t expiration = cell.expiry().time_since_epoch().count(); + disk_string_view cell_value { cell.value() }; return do_with(std::move(cell_value), [&out, mask, ttl, expiration, timestamp] (auto& cell_value) { return write(out, mask, ttl, expiration, timestamp, cell_value); }); + } else if (cell.is_dead()) { + // tombstone cell + + column_mask mask = column_mask::deletion; + uint32_t deletion_time_size = sizeof(uint32_t); + uint32_t deletion_time = cell.deletion_time().time_since_epoch().count(); + + return write(out, mask, timestamp, deletion_time_size, deletion_time); } else { // regular cell column_mask mask = column_mask::none; + disk_string_view cell_value { cell.value() }; return do_with(std::move(cell_value), [&out, mask, timestamp] (auto& cell_value) { return write(out, mask, timestamp, cell_value); diff --git a/tests/urchin/sstable_test.cc b/tests/urchin/sstable_test.cc index f933335ec0..def3008afb 100644 --- a/tests/urchin/sstable_test.cc +++ b/tests/urchin/sstable_test.cc @@ -641,6 +641,78 @@ SEASTAR_TEST_CASE(datafile_generation_05) { }); } +atomic_cell make_dead_atomic_cell(uint32_t deletion_time) { + return atomic_cell::make_dead(0, gc_clock::time_point(gc_clock::duration(deletion_time))); +} + +SEASTAR_TEST_CASE(datafile_generation_06) { + // Data file with clustering key and tombstone cells. + // + // Respective CQL table and CQL insert: + // CREATE TABLE test ( + // p1 text, + // c1 text, + // r1 int, + // PRIMARY KEY (p1, c1) + // ) WITH compression = {}; + // INSERT INTO test (p1, c1, r1) VALUES ('key1', 'abc', 1); + // after flushed: + // DELETE r1 FROM test WHERE p1 = 'key1' AND c1 = 'abc'; + + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type)); + + memtable mt(s); + + const column_definition& r1_col = *s->get_column_definition("r1"); + + auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600)); + mt.apply(std::move(m)); + + auto mtp = make_shared(std::move(mt)); + + return sstables::write_datafile(*mtp, "tests/urchin/sstables/Data6.tmp.db").then([mtp, s] { + return engine().open_file_dma("tests/urchin/sstables/Data6.tmp.db", open_flags::ro).then([] (file f) { + auto bufptr = allocate_aligned_buffer(4096, 4096); + + auto fut = f.dma_read(0, bufptr.get(), 4096); + return std::move(fut).then([f = std::move(f), bufptr = std::move(bufptr)] (size_t size) { + auto buf = bufptr.get(); + size_t offset = 0; + std::vector key = { 0, 4, 'k', 'e', 'y', '1' }; + BOOST_REQUIRE(::memcmp(key.data(), &buf[offset], key.size()) == 0); + offset += key.size(); + std::vector deletion_time = { 0x7f, 0xff, 0xff, 0xff, 0x80, 0, 0, 0, 0, 0, 0, 0 }; + BOOST_REQUIRE(::memcmp(deletion_time.data(), &buf[offset], deletion_time.size()) == 0); + offset += deletion_time.size(); + std::vector row_mark = { /* name */ 0, 9, 0, 3, 'a', 'b', 'c', 0, 0, 0, 0 }; + // check if there is a row mark. + if (::memcmp(row_mark.data(), &buf[offset], row_mark.size()) == 0) { + BOOST_REQUIRE(::memcmp(row_mark.data(), &buf[offset], row_mark.size()) == 0); + offset += row_mark.size(); + offset += 13; // skip mask, timestamp and expiration (value) = 13 bytes. + } + // tombstone cell + std::vector row = { /* name */ 0, 0xb, 0, 3, 'a', 'b', 'c', 0, 0, 2, 'r', '1', 0, + /* mask */ 1, /* timestamp */ 0, 0, 0, 0, 0, 0, 0, 0, + /* expiration (value) */ 0, 0, 0, 4, 0, 0, 0xe, 0x10 }; + BOOST_REQUIRE(::memcmp(row.data(), &buf[offset], row.size()) == 0); + offset += row.size(); + std::vector end_of_row = { 0, 0 }; + BOOST_REQUIRE(::memcmp(end_of_row.data(), &buf[offset], end_of_row.size()) == 0); + offset += end_of_row.size(); + BOOST_REQUIRE(size == offset); + }); + }).then([] { + return remove_file("tests/urchin/sstables/Data6.tmp.db"); + }); + }); +} + SEASTAR_TEST_CASE(uncompressed_random_access_read) { return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) { // note: it's important to pass on a shared copy of sstp to prevent its