cdc: move preimage result set into a field of transformer

Instead of passing the preimage result set in each `transform` call, it
is now assigned to a field, and `transform` uses that field.
This commit is contained in:
Piotr Dulikowski
2020-06-29 21:36:19 +02:00
parent 79eabc04a8
commit 76a323a02d

View File

@@ -840,6 +840,7 @@ private:
api::timestamp_type _ts;
// The cdc$time value of changes being currently processed
bytes _tuuid;
lw_shared_ptr<cql3::untyped_result_set> _preimage_select_result;
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) {
const auto log_ck = clustering_key::from_exploded(
@@ -946,9 +947,10 @@ public:
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
mutation transform(const mutation& m, const cql3::untyped_result_set* rs, int& batch_no) {
mutation transform(const mutation& m, int& batch_no) {
auto stream_id = _ctx._cdc_metadata.get_stream(_ts, m.token());
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
const auto rs = _preimage_select_result.get();
const auto preimage = _schema->cdc_options().preimage();
const auto postimage = _schema->cdc_options().postimage();
auto& p = m.partition();
@@ -1437,6 +1439,10 @@ public:
}
}
void set_preimage_select_result(lw_shared_ptr<cql3::untyped_result_set> rs) {
_preimage_select_result = std::move(rs);
}
/** For preimage query use the same CL as for base write, except for CLs ANY and ALL. */
static db::consistency_level adjust_cl(db::consistency_level write_cl) {
if (write_cl == db::consistency_level::ANY) {
@@ -1509,6 +1515,11 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
if (rs) {
trans.set_preimage_select_result(std::move(rs));
}
details.had_preimage |= s->cdc_options().preimage();
details.had_postimage |= s->cdc_options().postimage();
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
@@ -1519,7 +1530,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
generated_count = 0;
for_each_change(m, s, [&] (mutation mm, api::timestamp_type ts, bytes tuuid, int& batch_no) {
trans.begin_timestamp(ts, std::move(tuuid));
auto mut = trans.transform(std::move(mm), rs.get(), batch_no);
auto mut = trans.transform(std::move(mm), batch_no);
mutations.push_back(std::move(mut));
++generated_count;
});
@@ -1529,7 +1540,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
auto ts = find_timestamp(*s, m);
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
trans.begin_timestamp(ts, std::move(tuuid));
auto mut = trans.transform(m, rs.get(), batch_no);
auto mut = trans.transform(m, batch_no);
mutations.push_back(std::move(mut));
generated_count = 1;
}