sstables: add support to write tombstone cells
Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
This commit is contained in:
@@ -897,11 +897,9 @@ static future<> write_static_column_name(output_stream<char>& out, const bytes&
|
||||
|
||||
// Intended to write all cell components that follow column name.
|
||||
static future<> write_cell(output_stream<char>& out, atomic_cell_view cell) {
|
||||
// FIXME: deleted cell isn't supported yet.
|
||||
// FIXME: range tombstone and counter cells aren't supported yet.
|
||||
|
||||
uint64_t timestamp = cell.timestamp();
|
||||
disk_string_view<uint32_t> cell_value;
|
||||
cell_value.value = cell.value();
|
||||
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
// expiring cell
|
||||
@@ -909,14 +907,24 @@ static future<> write_cell(output_stream<char>& out, atomic_cell_view cell) {
|
||||
column_mask mask = column_mask::expiration;
|
||||
uint32_t ttl = cell.ttl().count();
|
||||
uint32_t expiration = cell.expiry().time_since_epoch().count();
|
||||
disk_string_view<uint32_t> cell_value { cell.value() };
|
||||
|
||||
return do_with(std::move(cell_value), [&out, mask, ttl, expiration, timestamp] (auto& cell_value) {
|
||||
return write(out, mask, ttl, expiration, timestamp, cell_value);
|
||||
});
|
||||
} else if (cell.is_dead()) {
|
||||
// tombstone cell
|
||||
|
||||
column_mask mask = column_mask::deletion;
|
||||
uint32_t deletion_time_size = sizeof(uint32_t);
|
||||
uint32_t deletion_time = cell.deletion_time().time_since_epoch().count();
|
||||
|
||||
return write(out, mask, timestamp, deletion_time_size, deletion_time);
|
||||
} else {
|
||||
// regular cell
|
||||
|
||||
column_mask mask = column_mask::none;
|
||||
disk_string_view<uint32_t> cell_value { cell.value() };
|
||||
|
||||
return do_with(std::move(cell_value), [&out, mask, timestamp] (auto& cell_value) {
|
||||
return write(out, mask, timestamp, cell_value);
|
||||
|
||||
@@ -641,6 +641,78 @@ SEASTAR_TEST_CASE(datafile_generation_05) {
|
||||
});
|
||||
}
|
||||
|
||||
atomic_cell make_dead_atomic_cell(uint32_t deletion_time) {
|
||||
return atomic_cell::make_dead(0, gc_clock::time_point(gc_clock::duration(deletion_time)));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(datafile_generation_06) {
|
||||
// Data file with clustering key and tombstone cells.
|
||||
//
|
||||
// Respective CQL table and CQL insert:
|
||||
// CREATE TABLE test (
|
||||
// p1 text,
|
||||
// c1 text,
|
||||
// r1 int,
|
||||
// PRIMARY KEY (p1, c1)
|
||||
// ) WITH compression = {};
|
||||
// INSERT INTO test (p1, c1, r1) VALUES ('key1', 'abc', 1);
|
||||
// after flushed:
|
||||
// DELETE r1 FROM test WHERE p1 = 'key1' AND c1 = 'abc';
|
||||
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type));
|
||||
|
||||
memtable mt(s);
|
||||
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
|
||||
|
||||
mutation m(key, s);
|
||||
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
|
||||
mt.apply(std::move(m));
|
||||
|
||||
auto mtp = make_shared<memtable>(std::move(mt));
|
||||
|
||||
return sstables::write_datafile(*mtp, "tests/urchin/sstables/Data6.tmp.db").then([mtp, s] {
|
||||
return engine().open_file_dma("tests/urchin/sstables/Data6.tmp.db", open_flags::ro).then([] (file f) {
|
||||
auto bufptr = allocate_aligned_buffer<char>(4096, 4096);
|
||||
|
||||
auto fut = f.dma_read(0, bufptr.get(), 4096);
|
||||
return std::move(fut).then([f = std::move(f), bufptr = std::move(bufptr)] (size_t size) {
|
||||
auto buf = bufptr.get();
|
||||
size_t offset = 0;
|
||||
std::vector<uint8_t> key = { 0, 4, 'k', 'e', 'y', '1' };
|
||||
BOOST_REQUIRE(::memcmp(key.data(), &buf[offset], key.size()) == 0);
|
||||
offset += key.size();
|
||||
std::vector<uint8_t> deletion_time = { 0x7f, 0xff, 0xff, 0xff, 0x80, 0, 0, 0, 0, 0, 0, 0 };
|
||||
BOOST_REQUIRE(::memcmp(deletion_time.data(), &buf[offset], deletion_time.size()) == 0);
|
||||
offset += deletion_time.size();
|
||||
std::vector<uint8_t> row_mark = { /* name */ 0, 9, 0, 3, 'a', 'b', 'c', 0, 0, 0, 0 };
|
||||
// check if there is a row mark.
|
||||
if (::memcmp(row_mark.data(), &buf[offset], row_mark.size()) == 0) {
|
||||
BOOST_REQUIRE(::memcmp(row_mark.data(), &buf[offset], row_mark.size()) == 0);
|
||||
offset += row_mark.size();
|
||||
offset += 13; // skip mask, timestamp and expiration (value) = 13 bytes.
|
||||
}
|
||||
// tombstone cell
|
||||
std::vector<uint8_t> row = { /* name */ 0, 0xb, 0, 3, 'a', 'b', 'c', 0, 0, 2, 'r', '1', 0,
|
||||
/* mask */ 1, /* timestamp */ 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
/* expiration (value) */ 0, 0, 0, 4, 0, 0, 0xe, 0x10 };
|
||||
BOOST_REQUIRE(::memcmp(row.data(), &buf[offset], row.size()) == 0);
|
||||
offset += row.size();
|
||||
std::vector<uint8_t> end_of_row = { 0, 0 };
|
||||
BOOST_REQUIRE(::memcmp(end_of_row.data(), &buf[offset], end_of_row.size()) == 0);
|
||||
offset += end_of_row.size();
|
||||
BOOST_REQUIRE(size == offset);
|
||||
});
|
||||
}).then([] {
|
||||
return remove_file("tests/urchin/sstables/Data6.tmp.db");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
|
||||
return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) {
|
||||
// note: it's important to pass on a shared copy of sstp to prevent its
|
||||
|
||||
Reference in New Issue
Block a user