mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 22:13:19 +00:00
cdc: move management of generated mutations inside transformer
CDC log mutations are now stored inside `transformer`, and only moved to the final set of mutations at the end of `transformer`'s lifetime.
This commit is contained in:
95
cdc/log.cc
95
cdc/log.cc
@@ -754,6 +754,7 @@ class transformer final {
|
||||
private:
|
||||
db_context _ctx;
|
||||
schema_ptr _schema;
|
||||
dht::decorated_key _dk;
|
||||
schema_ptr _log_schema;
|
||||
const column_definition& _op_col;
|
||||
const column_definition& _ttl_col;
|
||||
@@ -836,16 +837,24 @@ private:
|
||||
|
||||
stats::part_type_set _touched_parts;
|
||||
|
||||
std::vector<mutation> _result_mutations;
|
||||
|
||||
// The timestamp of changes being currently processed
|
||||
api::timestamp_type _ts;
|
||||
// The cdc$time value of changes being currently processed
|
||||
bytes _tuuid;
|
||||
lw_shared_ptr<cql3::untyped_result_set> _preimage_select_result;
|
||||
|
||||
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) {
|
||||
mutation& current_mutation() {
|
||||
assert(!_result_mutations.empty());
|
||||
return _result_mutations.back();
|
||||
}
|
||||
|
||||
clustering_key set_pk_columns(int batch_no) {
|
||||
auto& m = current_mutation();
|
||||
const auto log_ck = clustering_key::from_exploded(
|
||||
*m.schema(), { _tuuid, int32_type->decompose(batch_no) });
|
||||
auto pk_value = pk.explode(*_schema);
|
||||
auto pk_value = _dk.key().explode(*_schema);
|
||||
size_t pos = 0;
|
||||
for (const auto& column : _schema->partition_key_columns()) {
|
||||
assert (pos < pk_value.size());
|
||||
@@ -860,18 +869,21 @@ private:
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
void set_operation(const clustering_key& ck, operation op, mutation& m) const {
|
||||
void set_operation(const clustering_key& ck, operation op) {
|
||||
auto& m = current_mutation();
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
void set_ttl(const clustering_key& ck, gc_clock::duration ttl, mutation& m) const {
|
||||
void set_ttl(const clustering_key& ck, gc_clock::duration ttl) {
|
||||
auto& m = current_mutation();
|
||||
m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
public:
|
||||
transformer(db_context ctx, schema_ptr s)
|
||||
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk)
|
||||
: _ctx(ctx)
|
||||
, _schema(std::move(s))
|
||||
, _dk(std::move(dk))
|
||||
, _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
|
||||
, _op_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("operation")))
|
||||
, _ttl_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("ttl")))
|
||||
@@ -941,15 +953,16 @@ public:
|
||||
}
|
||||
|
||||
void begin_timestamp(api::timestamp_type ts, bytes tuuid) {
|
||||
const auto stream_id = _ctx._cdc_metadata.get_stream(ts, _dk.token());
|
||||
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
_ts = ts;
|
||||
_tuuid = std::move(tuuid);
|
||||
}
|
||||
|
||||
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
|
||||
// more details like tombstones/ttl? Probably not but keep in mind.
|
||||
mutation transform(const mutation& m, int& batch_no) {
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(_ts, m.token());
|
||||
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
void transform(const mutation& m, int& batch_no) {
|
||||
mutation& res = current_mutation();
|
||||
const auto rs = _preimage_select_result.get();
|
||||
const auto preimage = _schema->cdc_options().preimage();
|
||||
const auto postimage = _schema->cdc_options().postimage();
|
||||
@@ -957,8 +970,8 @@ public:
|
||||
if (p.partition_tombstone()) {
|
||||
// Partition deletion
|
||||
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(log_ck, operation::partition_delete, res);
|
||||
auto log_ck = set_pk_columns(batch_no++);
|
||||
set_operation(log_ck, operation::partition_delete);
|
||||
} else if (!p.row_tombstones().empty()) {
|
||||
// range deletion
|
||||
_touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
|
||||
@@ -980,20 +993,20 @@ public:
|
||||
}
|
||||
};
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
auto log_ck = set_pk_columns(batch_no++);
|
||||
set_bound(log_ck, rt.start);
|
||||
const auto start_operation = rt.start_kind == bound_kind::incl_start
|
||||
? operation::range_delete_start_inclusive
|
||||
: operation::range_delete_start_exclusive;
|
||||
set_operation(log_ck, start_operation, res);
|
||||
set_operation(log_ck, start_operation);
|
||||
}
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
auto log_ck = set_pk_columns(batch_no++);
|
||||
set_bound(log_ck, rt.end);
|
||||
const auto end_operation = rt.end_kind == bound_kind::incl_end
|
||||
? operation::range_delete_end_inclusive
|
||||
: operation::range_delete_end_exclusive;
|
||||
set_operation(log_ck, end_operation, res);
|
||||
set_operation(log_ck, end_operation);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1189,23 +1202,23 @@ public:
|
||||
}
|
||||
|
||||
if (preimage) {
|
||||
pikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*pikey, operation::pre_image, res);
|
||||
pikey = set_pk_columns(batch_no++);
|
||||
set_operation(*pikey, operation::pre_image);
|
||||
}
|
||||
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
auto log_ck = set_pk_columns(batch_no++);
|
||||
|
||||
if (postimage) {
|
||||
poikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*poikey, operation::post_image, res);
|
||||
poikey = set_pk_columns(batch_no++);
|
||||
set_operation(*poikey, operation::post_image);
|
||||
}
|
||||
|
||||
auto ttl = process_cells(p.static_row().get(), column_kind::static_column, log_ck, pikey, pirow, poikey);
|
||||
|
||||
set_operation(log_ck, operation::update, res);
|
||||
set_operation(log_ck, operation::update);
|
||||
|
||||
if (ttl) {
|
||||
set_ttl(log_ck, *ttl, res);
|
||||
set_ttl(log_ck, *ttl);
|
||||
}
|
||||
} else {
|
||||
_touched_parts.set_if<stats::part_type::CLUSTERING_ROW>(!p.clustered_rows().empty());
|
||||
@@ -1234,15 +1247,15 @@ public:
|
||||
}
|
||||
|
||||
if (preimage) {
|
||||
pikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*pikey, operation::pre_image, res);
|
||||
pikey = set_pk_columns(batch_no++);
|
||||
set_operation(*pikey, operation::pre_image);
|
||||
}
|
||||
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no++, res);
|
||||
auto log_ck = set_pk_columns(batch_no++);
|
||||
|
||||
if (postimage) {
|
||||
poikey = set_pk_columns(m.key(), batch_no++, res);
|
||||
set_operation(*poikey, operation::post_image, res);
|
||||
poikey = set_pk_columns(batch_no++);
|
||||
set_operation(*poikey, operation::post_image);
|
||||
}
|
||||
|
||||
size_t pos = 0;
|
||||
@@ -1286,19 +1299,19 @@ public:
|
||||
cdc_op = marker.is_live() ? operation::insert : operation::update;
|
||||
|
||||
if (ttl) {
|
||||
set_ttl(log_ck, *ttl, res);
|
||||
set_ttl(log_ck, *ttl);
|
||||
}
|
||||
}
|
||||
set_operation(log_ck, cdc_op, res);
|
||||
set_operation(log_ck, cdc_op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
stats::part_type_set get_touched_parts() const {
|
||||
return _touched_parts;
|
||||
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
||||
// The `transformer` object on which this method was called on should not be used anymore.
|
||||
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
|
||||
return std::make_pair<std::vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
|
||||
}
|
||||
|
||||
bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
|
||||
@@ -1492,7 +1505,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
transformer trans(_ctxt, s);
|
||||
transformer trans(_ctxt, s, m.decorated_key());
|
||||
|
||||
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
|
||||
if (s->cdc_options().preimage() || s->cdc_options().postimage()) {
|
||||
@@ -1523,16 +1536,12 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
details.had_preimage |= s->cdc_options().preimage();
|
||||
details.had_postimage |= s->cdc_options().postimage();
|
||||
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
|
||||
int generated_count;
|
||||
if (should_split(m, *s)) {
|
||||
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
|
||||
details.was_split = true;
|
||||
generated_count = 0;
|
||||
for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) {
|
||||
trans.begin_timestamp(ts, std::move(tuuid));
|
||||
auto mut = trans.transform(std::move(mm), batch_no);
|
||||
mutations.push_back(std::move(mut));
|
||||
++generated_count;
|
||||
trans.transform(std::move(mm), batch_no);
|
||||
});
|
||||
} else {
|
||||
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
|
||||
@@ -1540,13 +1549,15 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
auto ts = find_timestamp(*s, m);
|
||||
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
|
||||
trans.begin_timestamp(ts, std::move(tuuid));
|
||||
auto mut = trans.transform(m, batch_no);
|
||||
mutations.push_back(std::move(mut));
|
||||
generated_count = 1;
|
||||
trans.transform(m, batch_no);
|
||||
}
|
||||
auto [log_mut, touched_parts] = std::move(trans).finish();
|
||||
const int generated_count = log_mut.size();
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
|
||||
// `m` might be invalidated at this point because of the push_back to the vector
|
||||
tracing::trace(tr_state, "CDC: Generated {} log mutations from {}", generated_count, mutations[idx].decorated_key());
|
||||
details.touched_parts.add(trans.get_touched_parts());
|
||||
details.touched_parts.add(touched_parts);
|
||||
});
|
||||
}).then([this, tr_state, &details](std::vector<mutation> mutations) {
|
||||
tracing::trace(tr_state, "CDC: Finished generating all log mutations");
|
||||
|
||||
Reference in New Issue
Block a user