From 76a323a02ddb7ff0d95473c23d0dec39e9ee0910 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 29 Jun 2020 21:36:19 +0200 Subject: [PATCH] cdc: move preimage result set into a field of transformer Instead of passing the preimage result set in each `transform` call, it is now assigned to a field, and `transform` uses that field. --- cdc/log.cc | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index c896fb4e08..60bee0b0e0 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -840,6 +840,7 @@ private: api::timestamp_type _ts; // The cdc$time value of changes being currently processed bytes _tuuid; + lw_shared_ptr _preimage_select_result; clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) { const auto log_ck = clustering_key::from_exploded( @@ -946,9 +947,10 @@ public: // TODO: is pre-image data based on query enough. We only have actual column data. Do we need // more details like tombstones/ttl? Probably not but keep in mind. - mutation transform(const mutation& m, const cql3::untyped_result_set* rs, int& batch_no) { + mutation transform(const mutation& m, int& batch_no) { auto stream_id = _ctx._cdc_metadata.get_stream(_ts, m.token()); mutation res(_log_schema, stream_id.to_partition_key(*_log_schema)); + const auto rs = _preimage_select_result.get(); const auto preimage = _schema->cdc_options().preimage(); const auto postimage = _schema->cdc_options().postimage(); auto& p = m.partition(); @@ -1437,6 +1439,10 @@ public: } } + void set_preimage_select_result(lw_shared_ptr rs) { + _preimage_select_result = std::move(rs); + } + /** For preimage query use the same CL as for base write, except for CLs ANY and ALL. */ static db::consistency_level adjust_cl(db::consistency_level write_cl) { if (write_cl == db::consistency_level::ANY) { @@ -1509,6 +1515,11 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details] (lw_shared_ptr rs) mutable { auto& m = mutations[idx]; auto& s = m.schema(); + + if (rs) { + trans.set_preimage_select_result(std::move(rs)); + } + details.had_preimage |= s->cdc_options().preimage(); details.had_postimage |= s->cdc_options().postimage(); tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key()); @@ -1519,7 +1530,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, generated_count = 0; for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) { trans.begin_timestamp(ts, std::move(tuuid)); - auto mut = trans.transform(std::move(mm), rs.get(), batch_no); + auto mut = trans.transform(std::move(mm), batch_no); mutations.push_back(std::move(mut)); ++generated_count; }); @@ -1529,7 +1540,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, auto ts = find_timestamp(*s, m); auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts)); trans.begin_timestamp(ts, std::move(tuuid)); - auto mut = trans.transform(m, rs.get(), batch_no); + auto mut = trans.transform(m, batch_no); mutations.push_back(std::move(mut)); generated_count = 1; }