From 3200d415da09039cf32d80cd6563ce863313fb1a Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 4 Mar 2020 15:47:48 +0100 Subject: [PATCH] cdc: use a single timeuuid value for a batch of changes If a batch update is performed with a sequence of changes with a single timestamp, they will now show up in CDC with a single timeuuid in the `time` column, distinguished by different `batch_seq_no` values. Signed-off-by: Piotr Jastrzebski --- cdc/log.cc | 17 +++++++-------- cdc/log.hh | 3 +++ cdc/split.cc | 19 +++++++++++------ cdc/split.hh | 3 ++- test/boost/cdc_test.cc | 48 ++++++++++++++++++++---------------------- 5 files changed, 48 insertions(+), 42 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index f98ffeef10..a32c2d4f8d 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -475,7 +475,6 @@ api::timestamp_type find_timestamp(const schema& s, const mutation& m) { * If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`), * with unspecified nondeterministic ordering. */ -// external linkage for testing utils::UUID generate_timeuuid(api::timestamp_type t) { return utils::UUID_gen::get_random_time_UUID_from_micros(t); } @@ -530,20 +529,18 @@ public: // TODO: is pre-image data based on query enough. We only have actual column data. Do we need // more details like tombstones/ttl? Probably not but keep in mind. - mutation transform(const mutation& m, const cql3::untyped_result_set* rs) const { - auto ts = find_timestamp(*_schema, m); + mutation transform(const mutation& m, const cql3::untyped_result_set* rs, api::timestamp_type ts, bytes tuuid, int& batch_no) const { auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token()); mutation res(_log_schema, stream_id.to_partition_key(*_log_schema)); - auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts)); auto& p = m.partition(); if (p.partition_tombstone()) { // Partition deletion auto log_ck = set_pk_columns(m.key(), ts, tuuid, 0, res); set_operation(log_ck, ts, operation::partition_delete, res); + ++batch_no; } else if (!p.row_tombstones().empty()) { // range deletion - int batch_no = 0; for (auto& rt : p.row_tombstones()) { auto set_bound = [&] (const clustering_key& log_ck, const clustering_key_prefix& ckp) { auto exploded = ckp.explode(*_schema); @@ -744,7 +741,6 @@ public: return ttl; }; - int batch_no = 0; if (!p.static_row().empty()) { std::optional pikey; const cql3::untyped_result_set_row * pirow = nullptr; @@ -966,11 +962,14 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, auto& m = mutations[idx]; auto& s = m.schema(); if (should_split(m, *s)) { - for_each_change(m, s, [&] (mutation mm) { - mutations.push_back(trans.transform(std::move(mm), rs.get())); + for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) { + mutations.push_back(trans.transform(std::move(mm), rs.get(), ts, tuuid, batch_no)); }); } else { - mutations.push_back(trans.transform(m, rs.get())); + int batch_no = 0; + auto ts = find_timestamp(*s, m); + auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts)); + mutations.push_back(trans.transform(m, rs.get(), ts, tuuid, batch_no)); } }); }).then([](std::vector mutations) { diff --git a/cdc/log.hh b/cdc/log.hh index 005101a98e..965d71cb4a 100644 --- a/cdc/log.hh +++ b/cdc/log.hh @@ -41,6 +41,7 @@ #include "exceptions/exceptions.hh" #include "timestamp.hh" #include "cdc_options.hh" +#include "utils/UUID.hh" class schema; using schema_ptr = seastar::lw_shared_ptr; @@ -140,4 +141,6 @@ bytes log_data_column_deleted_name_bytes(const bytes& column_name); seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name); bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name); +utils::UUID generate_timeuuid(api::timestamp_type t); + } // namespace cdc diff --git a/cdc/split.cc b/cdc/split.cc index c31adab07c..afd96d3a43 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -23,6 +23,7 @@ #include "schema.hh" #include "split.hh" +#include "log.hh" struct atomic_column_update { column_id id; @@ -376,11 +377,15 @@ bool should_split(const mutation& base_mutation, const schema& base_schema) { return found_ts == api::missing_timestamp; } -void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, seastar::noncopyable_function f) { +void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, + seastar::noncopyable_function f) { auto changes = extract_changes(base_mutation, *base_schema); auto pk = base_mutation.key(); for (auto& [change_ts, btch] : changes) { + auto tuuid = timeuuid_type->decompose(generate_timeuuid(change_ts)); + int batch_no = 0; + for (auto& sr_update : btch.static_updates) { mutation m(base_schema, pk); for (auto& atomic_update : sr_update.atomic_entries) { @@ -395,7 +400,7 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem auto& cdef = base_schema->column_at(column_kind::static_column, nonatomic_update.id); m.set_static_cell(cdef, collection_mutation_description{{}, std::move(nonatomic_update.cells)}.serialize(*cdef.type)); } - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } for (auto& cr_insert : btch.clustered_inserts) { @@ -412,7 +417,7 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem } row.apply(cr_insert.marker); - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } for (auto& cr_update : btch.clustered_updates) { @@ -432,25 +437,25 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem row.apply(cdef, collection_mutation_description{{}, std::move(nonatomic_update.cells)}.serialize(*cdef.type)); } - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } for (auto& cr_delete : btch.clustered_row_deletions) { mutation m(base_schema, pk); m.partition().apply_delete(*base_schema, cr_delete.key, cr_delete.t); - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } for (auto& crange_delete : btch.clustered_range_deletions) { mutation m(base_schema, pk); m.partition().apply_delete(*base_schema, crange_delete.rt); - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } if (btch.partition_deletions) { mutation m(base_schema, pk); m.partition().apply(btch.partition_deletions->t); - f(std::move(m)); + f(std::move(m), change_ts, tuuid, batch_no); } } } diff --git a/cdc/split.hh b/cdc/split.hh index ba3c807862..77382ac161 100644 --- a/cdc/split.hh +++ b/cdc/split.hh @@ -30,6 +30,7 @@ class mutation; namespace cdc { bool should_split(const mutation& base_mutation, const schema& base_schema); -void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, seastar::noncopyable_function); +void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, + seastar::noncopyable_function); } diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index 574acce803..a4637764c2 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -1166,20 +1166,20 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) { { auto result = get_result( - {int32_type, int32_type, int32_type, boolean_type, m_type, keys_type, long_type}, - "select \"cdc$batch_seq_no\", v1, v2, \"cdc$deleted_v2\", m, \"cdc$deleted_elements_m\", \"cdc$ttl\"" + {int32_type, int32_type, boolean_type, m_type, keys_type, long_type}, + "select v1, v2, \"cdc$deleted_v2\", m, \"cdc$deleted_elements_m\", \"cdc$ttl\"" " from ks.t_scylla_cdc_log where pk = 0 and ck = 1 allow filtering"); BOOST_REQUIRE_EQUAL(result.size(), 4); std::vector> expected = { // The following represents the "v1 = 5" change. The "v2 = null" change gets merged with a different change, see below - {int32_t(0), int32_t(5), int_null, bool_null, map_null, keys_null, int64_t(5)}, - {int32_t(0), int_null, int_null, bool_null, vmap({{0,6},{1,6}}), keys_null, long_null /*FIXME: ttl = 6*/}, + {int32_t(5), int_null, bool_null, map_null, keys_null, int64_t(5)}, + {int_null, int_null, bool_null, vmap({{0,6},{1,6}}), keys_null, long_null /*FIXME: ttl = 6*/}, // The following represents the "m[2] = 7" change. The "m[3] = null" change gets merged with a different change, see below - {int32_t(0), int_null, int_null, bool_null, vmap({{2,7}}), keys_null, long_null /*FIXME: ttl = 7*/}, + {int_null, int_null, bool_null, vmap({{2,7}}), keys_null, long_null /*FIXME: ttl = 7*/}, // The "v2 = null" and "v[3] = null" changes get merged with the "m[4] = 0" change, because dead cells // don't have a "ttl" concept; thus we put them together with alive cells which don't have a ttl (so ttl column = null). - {int32_t(0), int_null, int_null, true, vmap({{4,0}}), vkeys({3}), long_null}, + {int_null, int_null, true, vmap({{4,0}}), vkeys({3}), long_null}, }; // These changes have the same timestamp, so their relative order in CDC log is arbitrary @@ -1190,6 +1190,13 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) { } } + { + auto result = get_result({int32_type}, + "select \"cdc$batch_seq_no\" from ks.t_scylla_cdc_log where pk = 0 and ck = 1 allow filtering"); + std::vector> expected = {{int32_t(0)}, {int32_t(1)}, {int32_t(2)}, {int32_t(3)}}; + BOOST_REQUIRE_EQUAL(expected, result); + } + cquery_nofail(e, format( "begin unlogged batch" " delete from ks.t using timestamp {} where pk = 1;" @@ -1204,16 +1211,16 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) { { auto result = get_result( - {int32_type, int32_type, m_type, boolean_type, oper_type}, - "select v1, v2, m, \"cdc$deleted_m\", \"cdc$operation\"" + {int32_type, int32_type, int32_type, m_type, boolean_type, oper_type}, + "select \"cdc$batch_seq_no\", v1, v2, m, \"cdc$deleted_m\", \"cdc$operation\"" " from ks.t_scylla_cdc_log where pk = 1 allow filtering"); BOOST_REQUIRE_EQUAL(result.size(), 7); std::vector> expected = { - {int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::partition_delete)}, - {int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_start_inclusive)}, - {int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_end_exclusive)}, - {int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::row_delete)}, + {int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::partition_delete)}, + {int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_start_inclusive)}, + {int32_t(1), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_end_exclusive)}, + {int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::row_delete)}, // The following sequence of operations: // insert into ks.t (pk,ck,v1) values (1,0,1) using timestamp T; @@ -1230,21 +1237,12 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) { // and a {3:3} cell with timestamp T + 1. Thus we merge the tombstone into the T update, // and we add a T + 1 update to express the addition of the {3:3} cell. // - {int32_t(1), int32_t(2), map_null, true, oper_ut(cdc::operation::update)}, - {int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::insert)}, - {int_null, int_null, vmap({{3,3}}), bool_null, oper_ut(cdc::operation::update)}, + {int32_t(0), int32_t(1), int32_t(2), map_null, true, oper_ut(cdc::operation::update)}, + {int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::insert)}, + {int32_t(1), int_null, int_null, vmap({{3,3}}), bool_null, oper_ut(cdc::operation::update)}, }; - // The first 5 changes have different timestamps, so we can compare the order. - BOOST_REQUIRE(std::equal(expected.begin(), expected.begin() + 5, result.begin())); - - // The last 2 changes have a higher timestamp than the other 5, but between the two the timestamp is the same. - // Thus their relative order in the CDC log is arbitrary. - for (auto it = expected.begin() + 5; it != expected.end(); ++it) { - BOOST_REQUIRE(std::find_if(result.begin() + 5, result.end(), [&] (const std::vector& r) { - return *it == r; - }) != result.end()); - } + BOOST_REQUIRE_EQUAL(expected, result); } cquery_nofail(e, "delete from ks.t where pk = 2 and ck < 1 and ck > 2;");