mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 22:13:19 +00:00
cdc: keep ts and tuuid inside transformer
Adds a `begin_timestamp` method which tells the `transformer` to start using the following timestamp and timeuuid when generating new log row mutations.
This commit is contained in:
98
cdc/log.cc
98
cdc/log.cc
@@ -836,16 +836,21 @@ private:
|
||||
|
||||
stats::part_type_set _touched_parts;
|
||||
|
||||
clustering_key set_pk_columns(const partition_key& pk, api::timestamp_type ts, bytes decomposed_tuuid, int batch_no, mutation& m) const {
|
||||
// The timestamp of changes being currently processed
|
||||
api::timestamp_type _ts;
|
||||
// The cdc$time value of changes being currently processed
|
||||
bytes _tuuid;
|
||||
|
||||
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) {
|
||||
const auto log_ck = clustering_key::from_exploded(
|
||||
*m.schema(), { decomposed_tuuid, int32_type->decompose(batch_no) });
|
||||
*m.schema(), { _tuuid, int32_type->decompose(batch_no) });
|
||||
auto pk_value = pk.explode(*_schema);
|
||||
size_t pos = 0;
|
||||
for (const auto& column : _schema->partition_key_columns()) {
|
||||
assert (pos < pk_value.size());
|
||||
auto cdef = m.schema()->get_column_definition(log_data_column_name_bytes(column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
ts,
|
||||
_ts,
|
||||
bytes_view(pk_value[pos]),
|
||||
_cdc_ttl_opt);
|
||||
m.set_cell(log_ck, *cdef, std::move(value));
|
||||
@@ -854,12 +859,12 @@ private:
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
void set_operation(const clustering_key& ck, api::timestamp_type ts, operation op, mutation& m) const {
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, ts, _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
|
||||
void set_operation(const clustering_key& ck, operation op, mutation& m) const {
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
void set_ttl(const clustering_key& ck, api::timestamp_type ts, gc_clock::duration ttl, mutation& m) const {
|
||||
m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt));
|
||||
void set_ttl(const clustering_key& ck, gc_clock::duration ttl, mutation& m) const {
|
||||
m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -934,10 +939,15 @@ public:
|
||||
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
|
||||
}
|
||||
|
||||
void begin_timestamp(api::timestamp_type ts, bytes tuuid) {
|
||||
_ts = ts;
|
||||
_tuuid = std::move(tuuid);
|
||||
}
|
||||
|
||||
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
|
||||
// more details like tombstones/ttl? Probably not but keep in mind.
|
||||
mutation transform(const mutation& m, const cql3::untyped_result_set* rs, api::timestamp_type ts, bytes tuuid, int& batch_no) {
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token());
|
||||
mutation transform(const mutation& m, const cql3::untyped_result_set* rs, int& batch_no) {
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(_ts, m.token());
|
||||
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
const auto preimage = _schema->cdc_options().preimage();
|
||||
const auto postimage = _schema->cdc_options().postimage();
|
||||
@@ -945,8 +955,8 @@ public:
|
||||
if (p.partition_tombstone()) {
|
||||
// Partition deletion
|
||||
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
set_operation(log_ck, ts, operation::partition_delete, res);
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(log_ck, operation::partition_delete, res);
|
||||
} else if (!p.row_tombstones().empty()) {
|
||||
// range deletion
|
||||
_touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
|
||||
@@ -960,7 +970,7 @@ public:
|
||||
}
|
||||
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
ts,
|
||||
_ts,
|
||||
bytes_view(exploded[pos]),
|
||||
_cdc_ttl_opt);
|
||||
res.set_cell(log_ck, *cdef, std::move(value));
|
||||
@@ -968,20 +978,20 @@ public:
|
||||
}
|
||||
};
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_bound(log_ck, rt.start);
|
||||
const auto start_operation = rt.start_kind == bound_kind::incl_start
|
||||
? operation::range_delete_start_inclusive
|
||||
: operation::range_delete_start_exclusive;
|
||||
set_operation(log_ck, ts, start_operation, res);
|
||||
set_operation(log_ck, start_operation, res);
|
||||
}
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_bound(log_ck, rt.end);
|
||||
const auto end_operation = rt.end_kind == bound_kind::incl_end
|
||||
? operation::range_delete_end_inclusive
|
||||
: operation::range_delete_end_exclusive;
|
||||
set_operation(log_ck, ts, end_operation, res);
|
||||
set_operation(log_ck, end_operation, res);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1111,7 +1121,7 @@ public:
|
||||
|
||||
if (deleted_elements) {
|
||||
auto* dc = _log_schema->get_column_definition(log_data_column_deleted_elements_name_bytes(cdef.name()));
|
||||
res.set_cell(log_ck, *dc, atomic_cell::make_live(*dc->type, ts, *deleted_elements, _cdc_ttl_opt));
|
||||
res.set_cell(log_ck, *dc, atomic_cell::make_live(*dc->type, _ts, *deleted_elements, _cdc_ttl_opt));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1120,11 +1130,11 @@ public:
|
||||
if (prev && pikey) {
|
||||
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
|
||||
assert(pikey->explode() != log_ck.explode());
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, *prev, _cdc_ttl_opt));
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _ts, *prev, _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
if (is_column_delete) {
|
||||
res.set_cell(log_ck, log_data_column_deleted_name_bytes(cdef.name()), data_value(true), ts, _cdc_ttl_opt);
|
||||
res.set_cell(log_ck, log_data_column_deleted_name_bytes(cdef.name()), data_value(true), _ts, _cdc_ttl_opt);
|
||||
if (!cdef.is_atomic()) {
|
||||
_non_atomic_column_deletes.insert(&cdef);
|
||||
}
|
||||
@@ -1133,19 +1143,19 @@ public:
|
||||
}
|
||||
|
||||
if (value) {
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, *value, _cdc_ttl_opt));
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _ts, *value, _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
if (poikey) {
|
||||
// keep track of actually assigning this already
|
||||
columns_assigned.emplace(id);
|
||||
if (cdef.is_atomic() && !is_column_delete && value) {
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, ts, *value, _cdc_ttl_opt));
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, _ts, *value, _cdc_ttl_opt));
|
||||
} else if (!cdef.is_atomic() && (value || (deleted_elements && prev))) {
|
||||
auto v = visit(*cdef.type, [&] (const auto& type) -> bytes {
|
||||
return merge(type, prev, value, deleted_elements);
|
||||
});
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, ts, v, _cdc_ttl_opt));
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, _ts, v, _cdc_ttl_opt));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -1157,7 +1167,7 @@ public:
|
||||
auto v = get_preimage_col_value(cdef, pirow);
|
||||
if (v) {
|
||||
auto dst = _log_schema->get_column_definition(log_data_column_name_bytes(cdef.name()));
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, ts, *v, _cdc_ttl_opt));
|
||||
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, _ts, *v, _cdc_ttl_opt));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1177,23 +1187,23 @@ public:
|
||||
}
|
||||
|
||||
if (preimage) {
|
||||
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
set_operation(*pikey, ts, operation::pre_image, res);
|
||||
pikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*pikey, operation::pre_image, res);
|
||||
}
|
||||
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
|
||||
if (postimage) {
|
||||
poikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
set_operation(*poikey, ts, operation::post_image, res);
|
||||
poikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*poikey, operation::post_image, res);
|
||||
}
|
||||
|
||||
auto ttl = process_cells(p.static_row().get(), column_kind::static_column, log_ck, pikey, pirow, poikey);
|
||||
|
||||
set_operation(log_ck, ts, operation::update, res);
|
||||
set_operation(log_ck, operation::update, res);
|
||||
|
||||
if (ttl) {
|
||||
set_ttl(log_ck, ts, *ttl, res);
|
||||
set_ttl(log_ck, *ttl, res);
|
||||
}
|
||||
} else {
|
||||
_touched_parts.set_if<stats::part_type::CLUSTERING_ROW>(!p.clustered_rows().empty());
|
||||
@@ -1222,28 +1232,28 @@ public:
|
||||
}
|
||||
|
||||
if (preimage) {
|
||||
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
set_operation(*pikey, ts, operation::pre_image, res);
|
||||
pikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*pikey, operation::pre_image, res);
|
||||
}
|
||||
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
|
||||
if (postimage) {
|
||||
poikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
|
||||
set_operation(*poikey, ts, operation::post_image, res);
|
||||
poikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*poikey, operation::post_image, res);
|
||||
}
|
||||
|
||||
size_t pos = 0;
|
||||
for (const auto& column : _schema->clustering_key_columns()) {
|
||||
assert (pos < ck_value.size());
|
||||
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
|
||||
if (pikey) {
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
}
|
||||
if (poikey) {
|
||||
res.set_cell(*poikey, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
res.set_cell(*poikey, *cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
++pos;
|
||||
@@ -1261,7 +1271,7 @@ public:
|
||||
|
||||
auto& cdef = *_log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
|
||||
auto value = get_preimage_col_value(column, pirow);
|
||||
res.set_cell(*pikey, cdef, atomic_cell::make_live(*column.type, ts, bytes_view(*value), _cdc_ttl_opt));
|
||||
res.set_cell(*pikey, cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(*value), _cdc_ttl_opt));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1274,10 +1284,10 @@ public:
|
||||
cdc_op = marker.is_live() ? operation::insert : operation::update;
|
||||
|
||||
if (ttl) {
|
||||
set_ttl(log_ck, ts, *ttl, res);
|
||||
set_ttl(log_ck, *ttl, res);
|
||||
}
|
||||
}
|
||||
set_operation(log_ck, ts, cdc_op, res);
|
||||
set_operation(log_ck, cdc_op, res);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1508,7 +1518,8 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
details.was_split = true;
|
||||
generated_count = 0;
|
||||
for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) {
|
||||
auto mut = trans.transform(std::move(mm), rs.get(), ts, tuuid, batch_no);
|
||||
trans.begin_timestamp(ts, std::move(tuuid));
|
||||
auto mut = trans.transform(std::move(mm), rs.get(), batch_no);
|
||||
mutations.push_back(std::move(mut));
|
||||
++generated_count;
|
||||
});
|
||||
@@ -1517,7 +1528,8 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
int batch_no = 0;
|
||||
auto ts = find_timestamp(*s, m);
|
||||
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
|
||||
auto mut = trans.transform(m, rs.get(), ts, tuuid, batch_no);
|
||||
trans.begin_timestamp(ts, std::move(tuuid));
|
||||
auto mut = trans.transform(m, rs.get(), batch_no);
|
||||
mutations.push_back(std::move(mut));
|
||||
generated_count = 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user