From fa00ea996a12ad7fff8c219062e31bd883dadbe2 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 29 Jun 2020 22:38:19 +0200 Subject: [PATCH] cdc: move management of generated mutations inside transformer CDC log mutations are now stored inside `transformer`, and only moved to the final set of mutations at the end of `transformer`'s lifetime. --- cdc/log.cc | 95 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 42 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index 60bee0b0e0..0119aca45d 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -754,6 +754,7 @@ class transformer final { private: db_context _ctx; schema_ptr _schema; + dht::decorated_key _dk; schema_ptr _log_schema; const column_definition& _op_col; const column_definition& _ttl_col; @@ -836,16 +837,24 @@ private: stats::part_type_set _touched_parts; + std::vector _result_mutations; + // The timestamp of changes being currently processed api::timestamp_type _ts; // The cdc$time value of changes being currently processed bytes _tuuid; lw_shared_ptr _preimage_select_result; - clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) { + mutation& current_mutation() { + assert(!_result_mutations.empty()); + return _result_mutations.back(); + } + + clustering_key set_pk_columns(int batch_no) { + auto& m = current_mutation(); const auto log_ck = clustering_key::from_exploded( *m.schema(), { _tuuid, int32_type->decompose(batch_no) }); - auto pk_value = pk.explode(*_schema); + auto pk_value = _dk.key().explode(*_schema); size_t pos = 0; for (const auto& column : _schema->partition_key_columns()) { assert (pos < pk_value.size()); @@ -860,18 +869,21 @@ private: return log_ck; } - void set_operation(const clustering_key& ck, operation op, mutation& m) const { + void set_operation(const clustering_key& ck, operation op) { + auto& m = current_mutation(); m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt)); } - void set_ttl(const clustering_key& ck, gc_clock::duration ttl, mutation& m) const { + void set_ttl(const clustering_key& ck, gc_clock::duration ttl) { + auto& m = current_mutation(); m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt)); } public: - transformer(db_context ctx, schema_ptr s) + transformer(db_context ctx, schema_ptr s, dht::decorated_key dk) : _ctx(ctx) , _schema(std::move(s)) + , _dk(std::move(dk)) , _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name()))) , _op_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("operation"))) , _ttl_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("ttl"))) @@ -941,15 +953,16 @@ public: } void begin_timestamp(api::timestamp_type ts, bytes tuuid) { + const auto stream_id = _ctx._cdc_metadata.get_stream(ts, _dk.token()); + _result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema)); _ts = ts; _tuuid = std::move(tuuid); } // TODO: is pre-image data based on query enough. We only have actual column data. Do we need // more details like tombstones/ttl? Probably not but keep in mind. - mutation transform(const mutation& m, int& batch_no) { - auto stream_id = _ctx._cdc_metadata.get_stream(_ts, m.token()); - mutation res(_log_schema, stream_id.to_partition_key(*_log_schema)); + void transform(const mutation& m, int& batch_no) { + mutation& res = current_mutation(); const auto rs = _preimage_select_result.get(); const auto preimage = _schema->cdc_options().preimage(); const auto postimage = _schema->cdc_options().postimage(); @@ -957,8 +970,8 @@ public: if (p.partition_tombstone()) { // Partition deletion _touched_parts.set(); - auto log_ck = set_pk_columns(m.key(), batch_no++, res); - set_operation(log_ck, operation::partition_delete, res); + auto log_ck = set_pk_columns(batch_no++); + set_operation(log_ck, operation::partition_delete); } else if (!p.row_tombstones().empty()) { // range deletion _touched_parts.set(); @@ -980,20 +993,20 @@ public: } }; { - auto log_ck = set_pk_columns(m.key(), batch_no++, res); + auto log_ck = set_pk_columns(batch_no++); set_bound(log_ck, rt.start); const auto start_operation = rt.start_kind == bound_kind::incl_start ? operation::range_delete_start_inclusive : operation::range_delete_start_exclusive; - set_operation(log_ck, start_operation, res); + set_operation(log_ck, start_operation); } { - auto log_ck = set_pk_columns(m.key(), batch_no++, res); + auto log_ck = set_pk_columns(batch_no++); set_bound(log_ck, rt.end); const auto end_operation = rt.end_kind == bound_kind::incl_end ? operation::range_delete_end_inclusive : operation::range_delete_end_exclusive; - set_operation(log_ck, end_operation, res); + set_operation(log_ck, end_operation); } } } else { @@ -1189,23 +1202,23 @@ public: } if (preimage) { - pikey = set_pk_columns(m.key(), batch_no++, res); - set_operation(*pikey, operation::pre_image, res); + pikey = set_pk_columns(batch_no++); + set_operation(*pikey, operation::pre_image); } - auto log_ck = set_pk_columns(m.key(), batch_no++, res); + auto log_ck = set_pk_columns(batch_no++); if (postimage) { - poikey = set_pk_columns(m.key(), batch_no++, res); - set_operation(*poikey, operation::post_image, res); + poikey = set_pk_columns(batch_no++); + set_operation(*poikey, operation::post_image); } auto ttl = process_cells(p.static_row().get(), column_kind::static_column, log_ck, pikey, pirow, poikey); - set_operation(log_ck, operation::update, res); + set_operation(log_ck, operation::update); if (ttl) { - set_ttl(log_ck, *ttl, res); + set_ttl(log_ck, *ttl); } } else { _touched_parts.set_if(!p.clustered_rows().empty()); @@ -1234,15 +1247,15 @@ public: } if (preimage) { - pikey = set_pk_columns(m.key(), batch_no++, res); - set_operation(*pikey, operation::pre_image, res); + pikey = set_pk_columns(batch_no++); + set_operation(*pikey, operation::pre_image); } - auto log_ck = set_pk_columns(m.key(), batch_no++, res); + auto log_ck = set_pk_columns(batch_no++); if (postimage) { - poikey = set_pk_columns(m.key(), batch_no++, res); - set_operation(*poikey, operation::post_image, res); + poikey = set_pk_columns(batch_no++); + set_operation(*poikey, operation::post_image); } size_t pos = 0; @@ -1286,19 +1299,19 @@ public: cdc_op = marker.is_live() ? operation::insert : operation::update; if (ttl) { - set_ttl(log_ck, *ttl, res); + set_ttl(log_ck, *ttl); } } - set_operation(log_ck, cdc_op, res); + set_operation(log_ck, cdc_op); } } } - - return res; } - stats::part_type_set get_touched_parts() const { - return _touched_parts; + // Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime. + // The `transformer` object on which this method was called on should not be used anymore. + std::tuple, stats::part_type_set> finish() && { + return std::make_pair, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts)); } bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) { @@ -1492,7 +1505,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, return make_ready_future<>(); } - transformer trans(_ctxt, s); + transformer trans(_ctxt, s, m.decorated_key()); auto f = make_ready_future>(nullptr); if (s->cdc_options().preimage() || s->cdc_options().postimage()) { @@ -1523,16 +1536,12 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, details.had_preimage |= s->cdc_options().preimage(); details.had_postimage |= s->cdc_options().postimage(); tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key()); - int generated_count; if (should_split(m, *s)) { tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key()); details.was_split = true; - generated_count = 0; for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) { trans.begin_timestamp(ts, std::move(tuuid)); - auto mut = trans.transform(std::move(mm), batch_no); - mutations.push_back(std::move(mut)); - ++generated_count; + trans.transform(std::move(mm), batch_no); }); } else { tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key()); @@ -1540,13 +1549,15 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, auto ts = find_timestamp(*s, m); auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts)); trans.begin_timestamp(ts, std::move(tuuid)); - auto mut = trans.transform(m, batch_no); - mutations.push_back(std::move(mut)); - generated_count = 1; + trans.transform(m, batch_no); } + auto [log_mut, touched_parts] = std::move(trans).finish(); + const int generated_count = log_mut.size(); + mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end())); + // `m` might be invalidated at this point because of the push_back to the vector tracing::trace(tr_state, "CDC: Generated {} log mutations from {}", generated_count, mutations[idx].decorated_key()); - details.touched_parts.add(trans.get_touched_parts()); + details.touched_parts.add(touched_parts); }); }).then([this, tr_state, &details](std::vector mutations) { tracing::trace(tr_state, "CDC: Finished generating all log mutations");