sstables: Check for complex deletion when writing static rows.

It is possible to have collections in a static row so we need to check
for collection-wide tombstones like with clustering rows.

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-10-29 14:59:19 -07:00
parent 6b7003088a
commit 6bd738ceb1

View File

@@ -2828,7 +2828,7 @@ private:
void write_collection(file_writer& writer, const column_definition& cdef, collection_mutation_view collection,
const row_time_properties& properties, bool has_complex_deletion);
void write_cells(file_writer& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion = false);
void write_cells(file_writer& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion);
void write_row_body(file_writer& writer, const clustering_row& row, bool has_complex_deletion);
void write_static_row(const row& static_row);
@@ -3208,26 +3208,26 @@ void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker
void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef,
collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) {
auto& ctype = *static_pointer_cast<const collection_type_impl>(cdef.type);
collection.data.with_linearized([&] (bytes_view collection_bv) {
auto mview = ctype.deserialize_mutation_form(collection_bv);
if (has_complex_deletion) {
auto dt = to_deletion_time(mview.tomb);
write_delta_deletion_time(writer, dt);
if (mview.tomb) {
_c_stats.update_timestamp(dt.marked_for_delete_at);
_c_stats.update_local_deletion_time(dt.local_deletion_time);
collection.data.with_linearized([&] (bytes_view collection_bv) {
auto mview = ctype.deserialize_mutation_form(collection_bv);
if (has_complex_deletion) {
auto dt = to_deletion_time(mview.tomb);
write_delta_deletion_time(writer, dt);
if (mview.tomb) {
_c_stats.update_timestamp(dt.marked_for_delete_at);
_c_stats.update_local_deletion_time(dt.local_deletion_time);
}
}
}
write_vint(writer, mview.cells.size());
if (!mview.cells.empty()) {
++_c_stats.column_count;
}
for (const auto& [cell_path, cell]: mview.cells) {
++_c_stats.cells_count;
write_cell(writer, cell, cdef, properties, cell_path);
}
});
write_vint(writer, mview.cells.size());
if (!mview.cells.empty()) {
++_c_stats.column_count;
}
for (const auto& [cell_path, cell]: mview.cells) {
++_c_stats.cells_count;
write_cell(writer, cell, cdef, properties, cell_path);
}
});
}
void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body,
@@ -3293,6 +3293,27 @@ uint64_t calculate_write_size(Func&& func) {
return written_size;
}
// Find if any collection in the row contains a collection-wide tombstone
static bool row_has_complex_deletion(const schema& s, const row& r, column_kind kind) {
bool result = false;
r.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& c) {
auto&& cdef = s.column_at(kind, id);
if (cdef.is_atomic()) {
return stop_iteration::no;
}
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
return c.as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
auto mview = t->deserialize_mutation_form(c_bv);
if (mview.tomb) {
result = true;
}
return stop_iteration(static_cast<bool>(mview.tomb));
});
});
return result;
}
void sstable_writer_m::write_static_row(const row& static_row) {
assert(_schema.is_compound());
@@ -3302,13 +3323,16 @@ void sstable_writer_m::write_static_row(const row& static_row) {
if (static_row.size() == _schema.static_columns_count()) {
flags |= row_flags::has_all_columns;
}
bool has_complex_deletion = row_has_complex_deletion(_schema, static_row, column_kind::static_column);
if (has_complex_deletion) {
flags |= row_flags::has_complex_deletion;
}
write(_sst.get_version(), *_data_writer, flags);
write(_sst.get_version(), *_data_writer, row_extended_flags::is_static);
// Calculate the size of the row body
auto write_row = [this, &static_row] (file_writer& writer) {
write_cells(writer, column_kind::static_column, static_row, row_time_properties{});
auto write_row = [this, &static_row, has_complex_deletion] (file_writer& writer) {
write_cells(writer, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion);
};
uint64_t row_body_size = calculate_write_size(write_row) + unsigned_vint::serialized_size(0);
@@ -3329,27 +3353,6 @@ stop_iteration sstable_writer_m::consume(static_row&& sr) {
return stop_iteration::no;
}
// Find if any collection in the row contains a collection-wide tombstone
static bool row_has_complex_deletion(const schema& s, const row& r) {
bool result = false;
r.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& c) {
auto&& cdef = s.column_at(column_kind::regular_column, id);
if (cdef.is_atomic()) {
return stop_iteration::no;
}
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
return c.as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
auto mview = t->deserialize_mutation_form(c_bv);
if (mview.tomb) {
result = true;
}
return stop_iteration(static_cast<bool>(mview.tomb));
});
});
return result;
}
void sstable_writer_m::write_clustered(const clustering_row& clustered_row, uint64_t prev_row_size) {
row_flags flags = row_flags::none;
row_extended_flags ext_flags = row_extended_flags::none;
@@ -3372,7 +3375,7 @@ void sstable_writer_m::write_clustered(const clustering_row& clustered_row, uint
if (clustered_row.cells().size() == _schema.regular_columns_count()) {
flags |= row_flags::has_all_columns;
}
bool has_complex_deletion = row_has_complex_deletion(_schema, clustered_row.cells());
bool has_complex_deletion = row_has_complex_deletion(_schema, clustered_row.cells(), column_kind::regular_column);
if (has_complex_deletion) {
flags |= row_flags::has_complex_deletion;
}