From bbef05ae3cc7813db0898429300afa5d476f84a8 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 14 Oct 2020 13:14:47 +0000 Subject: [PATCH] cdc: Add an "end-of-record" column to Fixes #7435 Adds an "eor" (end-of-record) column to cdc log. This is non-null only on last-in-timestamp group rows, i.e. end of a singular source "event". A client can use this as a shortcut to knowing whether or not he has a full cdc "record" for a given source mutation (single row change). Closes #7436 (cherry picked from commit 46ea8c9b8b067ccebacb1a3be020c3c8068dc684) --- cdc/log.cc | 25 ++++++++++++++++++++++++- cdc/split.cc | 4 ++++ cdc/split.hh | 4 ++++ test/boost/cdc_test.cc | 4 ++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/cdc/log.cc b/cdc/log.cc index 46c69ca66b..2941f3c5b8 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key); b.with_column(log_meta_column_name_bytes("operation"), data_type_for()); b.with_column(log_meta_column_name_bytes("ttl"), long_type); + b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type); b.set_caching_options(caching_options::get_disabled_caching_options()); auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) { for (const auto& column : columns) { @@ -880,14 +881,26 @@ public: return _base_schema; } + clustering_key create_ck(int batch) const { + return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) }); + } + // Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`. // The numbering of batch sequence numbers starts from 0. clustering_key allocate_new_log_row() { - auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) }); + auto log_ck = create_ck(_batch_no++); set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk); return log_ck; } + bool has_rows() const { + return _batch_no != 0; + } + + clustering_key last_row_key() const { + return create_ck(_batch_no - 1); + } + // A common pattern is to allocate a row and then immediately set its `cdc$operation` column. clustering_key allocate_new_log_row(operation op) { auto log_ck = allocate_new_log_row(); @@ -944,6 +957,11 @@ public: _log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl)); } + void end_record() { + if (has_rows()) { + _log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl); + } + } private: void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector& key) { size_t pos = 0; @@ -1519,6 +1537,11 @@ public: cdc::inspect_mutation(m, v); } + void end_record() override { + assert(_builder); + _builder->end_record(); + } + // Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime. // The `transformer` object on which this method was called on should not be used anymore. std::tuple, stats::part_type_set> finish() && { diff --git a/cdc/split.cc b/cdc/split.cc index ec0f2500bc..bb441467ae 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces processor.produce_postimage(&ck); } } + + processor.end_record(); } } @@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro processor.produce_postimage(&cr.key()); } } + + processor.end_record(); } } // namespace cdc diff --git a/cdc/split.hh b/cdc/split.hh index 6fc9619ce9..4647b52a23 100644 --- a/cdc/split.hh +++ b/cdc/split.hh @@ -77,6 +77,10 @@ public: // both columns have different timestamp or TTL set. // m - the small mutation to be converted into CDC log rows. virtual void process_change(const mutation& m) = 0; + + // Tells processor we have reached end of record - last part + // of a given timestamp batch + virtual void end_record() = 0; }; bool should_split(const mutation& base_mutation); diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index 679458a309..e5cedc89ce 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -326,6 +326,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) { // cdc log clustering key assert_has_column(cdc::log_meta_column_name("operation"), byte_type); assert_has_column(cdc::log_meta_column_name("ttl"), long_type); + assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type); // pk assert_has_column(cdc::log_data_column_name("pk"), int32_type); @@ -534,6 +535,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { auto val_index = column_index(*rows, cdc::log_data_column_name("val")); auto val2_index = column_index(*rows, cdc::log_data_column_name("val2")); auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl")); + auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch")); auto val_type = int32_type; auto val = *first[0][val_index]; @@ -583,10 +585,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { if (post_enabled) { val = *post_image.back()[val_index]; val2 = *post_image.back()[val2_index]; + auto eor = *post_image.back()[eor_index]; BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]); BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val))); BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2))); + BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor))); } const auto& ttl_cell = second[second.size() - 2][ttl_index];