cdc: Add an "end-of-record" column to

Fixes #7435

Adds an "eor" (end-of-record) column to cdc log. This is non-null only on
last-in-timestamp group rows, i.e. end of a singular source "event".

A client can use this as a shortcut to knowing whether or not he has a
full cdc "record" for a given source mutation (single row change).

Closes #7436

(cherry picked from commit 46ea8c9b8b)
This commit is contained in:
Calle Wilund
2020-10-14 13:14:47 +00:00
committed by Nadav Har'El
parent 6f324cb732
commit bbef05ae3c
4 changed files with 36 additions and 1 deletions

View File

@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
@@ -880,14 +881,26 @@ public:
return _base_schema;
}
clustering_key create_ck(int batch) const {
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
}
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
// The numbering of batch sequence numbers starts from 0.
clustering_key allocate_new_log_row() {
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
auto log_ck = create_ck(_batch_no++);
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
return log_ck;
}
bool has_rows() const {
return _batch_no != 0;
}
clustering_key last_row_key() const {
return create_ck(_batch_no - 1);
}
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
clustering_key allocate_new_log_row(operation op) {
auto log_ck = allocate_new_log_row();
@@ -944,6 +957,11 @@ public:
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
}
void end_record() {
if (has_rows()) {
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
}
}
private:
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
size_t pos = 0;
@@ -1519,6 +1537,11 @@ public:
cdc::inspect_mutation(m, v);
}
void end_record() override {
assert(_builder);
_builder->end_record();
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {

View File

@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
processor.produce_postimage(&ck);
}
}
processor.end_record();
}
}
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
processor.produce_postimage(&cr.key());
}
}
processor.end_record();
}
} // namespace cdc

View File

@@ -77,6 +77,10 @@ public:
// both columns have different timestamp or TTL set.
// m - the small mutation to be converted into CDC log rows.
virtual void process_change(const mutation& m) = 0;
// Tells processor we have reached end of record - last part
// of a given timestamp batch
virtual void end_record() = 0;
};
bool should_split(const mutation& base_mutation);

View File

@@ -326,6 +326,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
// cdc log clustering key
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
// pk
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
@@ -534,6 +535,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
auto val_type = int32_type;
auto val = *first[0][val_index];
@@ -583,10 +585,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
if (post_enabled) {
val = *post_image.back()[val_index];
val2 = *post_image.back()[val2_index];
auto eor = *post_image.back()[eor_index];
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
}
const auto& ttl_cell = second[second.size() - 2][ttl_index];