cdc: use a single timeuuid value for a batch of changes

If a batch update is performed with a sequence of changes with a single
timestamp, they will now show up in CDC with a single timeuuid in the
`time` column, distinguished by different `batch_seq_no` values.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Kamil Braun
2020-03-04 15:47:48 +01:00
committed by Piotr Jastrzebski
parent 292eba9da0
commit 3200d415da
5 changed files with 48 additions and 42 deletions

View File

@@ -475,7 +475,6 @@ api::timestamp_type find_timestamp(const schema& s, const mutation& m) {
* If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`),
* with unspecified nondeterministic ordering.
*/
// external linkage for testing
utils::UUID generate_timeuuid(api::timestamp_type t) {
return utils::UUID_gen::get_random_time_UUID_from_micros(t);
}
@@ -530,20 +529,18 @@ public:
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
mutation transform(const mutation& m, const cql3::untyped_result_set* rs) const {
auto ts = find_timestamp(*_schema, m);
mutation transform(const mutation& m, const cql3::untyped_result_set* rs, api::timestamp_type ts, bytes tuuid, int& batch_no) const {
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token());
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
auto& p = m.partition();
if (p.partition_tombstone()) {
// Partition deletion
auto log_ck = set_pk_columns(m.key(), ts, tuuid, 0, res);
set_operation(log_ck, ts, operation::partition_delete, res);
++batch_no;
} else if (!p.row_tombstones().empty()) {
// range deletion
int batch_no = 0;
for (auto& rt : p.row_tombstones()) {
auto set_bound = [&] (const clustering_key& log_ck, const clustering_key_prefix& ckp) {
auto exploded = ckp.explode(*_schema);
@@ -744,7 +741,6 @@ public:
return ttl;
};
int batch_no = 0;
if (!p.static_row().empty()) {
std::optional<clustering_key> pikey;
const cql3::untyped_result_set_row * pirow = nullptr;
@@ -966,11 +962,14 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
auto& m = mutations[idx];
auto& s = m.schema();
if (should_split(m, *s)) {
for_each_change(m, s, [&] (mutation mm) {
mutations.push_back(trans.transform(std::move(mm), rs.get()));
for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) {
mutations.push_back(trans.transform(std::move(mm), rs.get(), ts, tuuid, batch_no));
});
} else {
mutations.push_back(trans.transform(m, rs.get()));
int batch_no = 0;
auto ts = find_timestamp(*s, m);
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
mutations.push_back(trans.transform(m, rs.get(), ts, tuuid, batch_no));
}
});
}).then([](std::vector<mutation> mutations) {

View File

@@ -41,6 +41,7 @@
#include "exceptions/exceptions.hh"
#include "timestamp.hh"
#include "cdc_options.hh"
#include "utils/UUID.hh"
class schema;
using schema_ptr = seastar::lw_shared_ptr<const schema>;
@@ -140,4 +141,6 @@ bytes log_data_column_deleted_name_bytes(const bytes& column_name);
seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name);
bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name);
utils::UUID generate_timeuuid(api::timestamp_type t);
} // namespace cdc

View File

@@ -23,6 +23,7 @@
#include "schema.hh"
#include "split.hh"
#include "log.hh"
struct atomic_column_update {
column_id id;
@@ -376,11 +377,15 @@ bool should_split(const mutation& base_mutation, const schema& base_schema) {
return found_ts == api::missing_timestamp;
}
void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, seastar::noncopyable_function<void(mutation)> f) {
void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema,
seastar::noncopyable_function<void(mutation, api::timestamp_type, bytes, int&)> f) {
auto changes = extract_changes(base_mutation, *base_schema);
auto pk = base_mutation.key();
for (auto& [change_ts, btch] : changes) {
auto tuuid = timeuuid_type->decompose(generate_timeuuid(change_ts));
int batch_no = 0;
for (auto& sr_update : btch.static_updates) {
mutation m(base_schema, pk);
for (auto& atomic_update : sr_update.atomic_entries) {
@@ -395,7 +400,7 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem
auto& cdef = base_schema->column_at(column_kind::static_column, nonatomic_update.id);
m.set_static_cell(cdef, collection_mutation_description{{}, std::move(nonatomic_update.cells)}.serialize(*cdef.type));
}
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_insert : btch.clustered_inserts) {
@@ -412,7 +417,7 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem
}
row.apply(cr_insert.marker);
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_update : btch.clustered_updates) {
@@ -432,25 +437,25 @@ void for_each_change(const mutation& base_mutation, const schema_ptr& base_schem
row.apply(cdef, collection_mutation_description{{}, std::move(nonatomic_update.cells)}.serialize(*cdef.type));
}
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_delete : btch.clustered_row_deletions) {
mutation m(base_schema, pk);
m.partition().apply_delete(*base_schema, cr_delete.key, cr_delete.t);
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& crange_delete : btch.clustered_range_deletions) {
mutation m(base_schema, pk);
m.partition().apply_delete(*base_schema, crange_delete.rt);
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
if (btch.partition_deletions) {
mutation m(base_schema, pk);
m.partition().apply(btch.partition_deletions->t);
f(std::move(m));
f(std::move(m), change_ts, tuuid, batch_no);
}
}
}

View File

@@ -30,6 +30,7 @@ class mutation;
namespace cdc {
bool should_split(const mutation& base_mutation, const schema& base_schema);
void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema, seastar::noncopyable_function<void(mutation)>);
void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema,
seastar::noncopyable_function<void(mutation, api::timestamp_type, bytes, int&)>);
}

View File

@@ -1166,20 +1166,20 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
{
auto result = get_result(
{int32_type, int32_type, int32_type, boolean_type, m_type, keys_type, long_type},
"select \"cdc$batch_seq_no\", v1, v2, \"cdc$deleted_v2\", m, \"cdc$deleted_elements_m\", \"cdc$ttl\""
{int32_type, int32_type, boolean_type, m_type, keys_type, long_type},
"select v1, v2, \"cdc$deleted_v2\", m, \"cdc$deleted_elements_m\", \"cdc$ttl\""
" from ks.t_scylla_cdc_log where pk = 0 and ck = 1 allow filtering");
BOOST_REQUIRE_EQUAL(result.size(), 4);
std::vector<std::vector<data_value>> expected = {
// The following represents the "v1 = 5" change. The "v2 = null" change gets merged with a different change, see below
{int32_t(0), int32_t(5), int_null, bool_null, map_null, keys_null, int64_t(5)},
{int32_t(0), int_null, int_null, bool_null, vmap({{0,6},{1,6}}), keys_null, long_null /*FIXME: ttl = 6*/},
{int32_t(5), int_null, bool_null, map_null, keys_null, int64_t(5)},
{int_null, int_null, bool_null, vmap({{0,6},{1,6}}), keys_null, long_null /*FIXME: ttl = 6*/},
// The following represents the "m[2] = 7" change. The "m[3] = null" change gets merged with a different change, see below
{int32_t(0), int_null, int_null, bool_null, vmap({{2,7}}), keys_null, long_null /*FIXME: ttl = 7*/},
{int_null, int_null, bool_null, vmap({{2,7}}), keys_null, long_null /*FIXME: ttl = 7*/},
// The "v2 = null" and "v[3] = null" changes get merged with the "m[4] = 0" change, because dead cells
// don't have a "ttl" concept; thus we put them together with alive cells which don't have a ttl (so ttl column = null).
{int32_t(0), int_null, int_null, true, vmap({{4,0}}), vkeys({3}), long_null},
{int_null, int_null, true, vmap({{4,0}}), vkeys({3}), long_null},
};
// These changes have the same timestamp, so their relative order in CDC log is arbitrary
@@ -1190,6 +1190,13 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
}
}
{
auto result = get_result({int32_type},
"select \"cdc$batch_seq_no\" from ks.t_scylla_cdc_log where pk = 0 and ck = 1 allow filtering");
std::vector<std::vector<data_value>> expected = {{int32_t(0)}, {int32_t(1)}, {int32_t(2)}, {int32_t(3)}};
BOOST_REQUIRE_EQUAL(expected, result);
}
cquery_nofail(e, format(
"begin unlogged batch"
" delete from ks.t using timestamp {} where pk = 1;"
@@ -1204,16 +1211,16 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
{
auto result = get_result(
{int32_type, int32_type, m_type, boolean_type, oper_type},
"select v1, v2, m, \"cdc$deleted_m\", \"cdc$operation\""
{int32_type, int32_type, int32_type, m_type, boolean_type, oper_type},
"select \"cdc$batch_seq_no\", v1, v2, m, \"cdc$deleted_m\", \"cdc$operation\""
" from ks.t_scylla_cdc_log where pk = 1 allow filtering");
BOOST_REQUIRE_EQUAL(result.size(), 7);
std::vector<std::vector<data_value>> expected = {
{int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::partition_delete)},
{int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_start_inclusive)},
{int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_end_exclusive)},
{int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::row_delete)},
{int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::partition_delete)},
{int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_start_inclusive)},
{int32_t(1), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::range_delete_end_exclusive)},
{int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::row_delete)},
// The following sequence of operations:
// insert into ks.t (pk,ck,v1) values (1,0,1) using timestamp T;
@@ -1230,21 +1237,12 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
// and a {3:3} cell with timestamp T + 1. Thus we merge the tombstone into the T update,
// and we add a T + 1 update to express the addition of the {3:3} cell.
//
{int32_t(1), int32_t(2), map_null, true, oper_ut(cdc::operation::update)},
{int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::insert)},
{int_null, int_null, vmap({{3,3}}), bool_null, oper_ut(cdc::operation::update)},
{int32_t(0), int32_t(1), int32_t(2), map_null, true, oper_ut(cdc::operation::update)},
{int32_t(0), int_null, int_null, map_null, bool_null, oper_ut(cdc::operation::insert)},
{int32_t(1), int_null, int_null, vmap({{3,3}}), bool_null, oper_ut(cdc::operation::update)},
};
// The first 5 changes have different timestamps, so we can compare the order.
BOOST_REQUIRE(std::equal(expected.begin(), expected.begin() + 5, result.begin()));
// The last 2 changes have a higher timestamp than the other 5, but between the two the timestamp is the same.
// Thus their relative order in the CDC log is arbitrary.
for (auto it = expected.begin() + 5; it != expected.end(); ++it) {
BOOST_REQUIRE(std::find_if(result.begin() + 5, result.end(), [&] (const std::vector<data_value>& r) {
return *it == r;
}) != result.end());
}
BOOST_REQUIRE_EQUAL(expected, result);
}
cquery_nofail(e, "delete from ks.t where pk = 2 and ck < 1 and ck > 2;");