cdc: move the extractor of pirow columns into separate method
Because it will be used more than once.
This commit is contained in:
44
cdc/log.cc
44
cdc/log.cc
@@ -711,26 +711,7 @@ public:
|
||||
}
|
||||
|
||||
if (has_pirow) {
|
||||
value = cdef.is_atomic()
|
||||
? pirow->get_blob(cdef.name_as_text())
|
||||
: visit(*cdef.type, make_visitor(
|
||||
// flatten set
|
||||
[&] (const set_type_impl& type) {
|
||||
auto v = pirow->get_view(cdef.name_as_text());
|
||||
auto f = cql_serialization_format::internal();
|
||||
auto n = read_collection_size(v, f);
|
||||
std::vector<bytes_view> tmp;
|
||||
tmp.reserve(n);
|
||||
while (n--) {
|
||||
tmp.emplace_back(read_collection_value(v, f)); // key
|
||||
read_collection_value(v, f); // value. ignore.
|
||||
}
|
||||
return set_type_impl::serialize_partially_deserialized_form(tmp, f);
|
||||
},
|
||||
[&] (const abstract_type& o) -> bytes {
|
||||
return pirow->get_blob(cdef.name_as_text());
|
||||
}
|
||||
));
|
||||
value = get_preimage_col_value(cdef, pirow);
|
||||
|
||||
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
|
||||
assert(pikey->explode() != log_ck.explode());
|
||||
@@ -833,6 +814,29 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
static bytes get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
|
||||
return cdef.is_atomic()
|
||||
? pirow->get_blob(cdef.name_as_text())
|
||||
: visit(*cdef.type, make_visitor(
|
||||
// flatten set
|
||||
[&] (const set_type_impl& type) {
|
||||
auto v = pirow->get_view(cdef.name_as_text());
|
||||
auto f = cql_serialization_format::internal();
|
||||
auto n = read_collection_size(v, f);
|
||||
std::vector<bytes_view> tmp;
|
||||
tmp.reserve(n);
|
||||
while (n--) {
|
||||
tmp.emplace_back(read_collection_value(v, f)); // key
|
||||
read_collection_value(v, f); // value. ignore.
|
||||
}
|
||||
return set_type_impl::serialize_partially_deserialized_form(tmp, f);
|
||||
},
|
||||
[&] (const abstract_type& o) -> bytes {
|
||||
return pirow->get_blob(cdef.name_as_text());
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
static db::timeout_clock::time_point default_timeout() {
|
||||
return db::timeout_clock::now() + 10s;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user