cdc: move away from collection_mutation_view::with_deserialized()

Use the collection_mutation_view directly.
Requires changing key type from bytes_view to managed_bytes_view, to
match collection_mutation_view::iterator::value_type and more
importantly to avoid unnecessary key linearization.
This commit is contained in:
Botond Dénes
2026-03-18 10:51:47 +02:00
parent e8a23362c7
commit 727cdeb951
3 changed files with 35 additions and 35 deletions

View File

@@ -88,7 +88,7 @@ namespace cdc {
template <typename V>
concept CollectionVisitor = requires(V v,
const tombstone& t,
bytes_view key,
managed_bytes_view key,
const atomic_cell_view& cell) {
{ v.collection_tombstone(t) } -> std::same_as<void>;
@@ -99,8 +99,8 @@ concept CollectionVisitor = requires(V v,
struct dummy_collection_visitor {
void collection_tombstone(const tombstone&) {}
void live_collection_cell(bytes_view, const atomic_cell_view&) {}
void dead_collection_cell(bytes_view, const atomic_cell_view&) {}
void live_collection_cell(managed_bytes_view, const atomic_cell_view&) {}
void dead_collection_cell(managed_bytes_view, const atomic_cell_view&) {}
bool finished() { return false; }
};
@@ -170,31 +170,31 @@ void inspect_row_cells(const schema& s, column_kind ckind, const row& r, V& v) {
return stop_iteration(v.finished());
}
acoc.as_collection_mutation().with_deserialized(*cdef.type, [&v, &cdef] (collection_mutation_view_description view) {
v.collection_column(cdef, [&view] (CollectionVisitor auto& cv) {
auto cmv = acoc.as_collection_mutation();
auto tomb = cmv.tomb();
v.collection_column(cdef, [&cmv, &tomb] (CollectionVisitor auto& cv) {
if (cv.finished()) {
return;
}
if (tomb) {
cv.collection_tombstone(tomb);
if (cv.finished()) {
return;
}
}
if (view.tomb) {
cv.collection_tombstone(view.tomb);
if (cv.finished()) {
return;
}
for (auto& [key, cell] : cmv) {
if (cell.is_live()) {
cv.live_collection_cell(key, cell);
} else {
cv.dead_collection_cell(key, cell);
}
for (auto& [key, cell]: view.cells) {
if (cell.is_live()) {
cv.live_collection_cell(key, cell);
} else {
cv.dead_collection_cell(key, cell);
}
if (cv.finished()) {
return;
}
if (cv.finished()) {
return;
}
});
}
});
return stop_iteration(v.finished());

View File

@@ -1319,7 +1319,7 @@ struct process_row_visitor {
_is_column_delete = true;
}
void dead_collection_cell(bytes_view key, const atomic_cell_view&) {
void dead_collection_cell(managed_bytes_view key, const atomic_cell_view&) {
_deleted_keys.push_back(key);
}
@@ -1338,7 +1338,7 @@ struct process_row_visitor {
set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_keys.emplace_back(key);
}
@@ -1364,7 +1364,7 @@ struct process_row_visitor {
udt_visitor(ttl_opt& ttl_column, size_t num_keys)
: collection_visitor(ttl_column), _added_cells(num_keys) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells[deserialize_field_index(key)].emplace(cell.value());
}
@@ -1392,7 +1392,7 @@ struct process_row_visitor {
map_or_list_visitor(ttl_opt& ttl_column)
: collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells.emplace_back(key, cell.value());
}

View File

@@ -193,7 +193,7 @@ private:
data_type get_value_type(bytes_view);
*/
void cell(bytes_view key, const atomic_cell_view& c) {
void cell(managed_bytes_view key, const atomic_cell_view& c) {
auto& entry = get_or_append_entry(c.timestamp(), get_ttl(c));
entry.cells.emplace_back(to_bytes(key), atomic_cell(*static_cast<V&>(*this).get_value_type(key), c));
}
@@ -207,11 +207,11 @@ public:
entry.t = t;
}
void live_collection_cell(bytes_view key, const atomic_cell_view& c) {
void live_collection_cell(managed_bytes_view key, const atomic_cell_view& c) {
cell(key, c);
}
void dead_collection_cell(bytes_view key, const atomic_cell_view& c) {
void dead_collection_cell(managed_bytes_view key, const atomic_cell_view& c) {
cell(key, c);
}
@@ -245,7 +245,7 @@ struct extract_row_visitor {
collection_visitor(column_id id, std::map<change_key_t, row_update>& updates, const collection_type_impl& ctype)
: extract_collection_visitor<collection_visitor>(id, updates), _value_type(ctype.value_comparator()) {}
data_type get_value_type(bytes_view) {
data_type get_value_type(managed_bytes_view) {
return _value_type;
}
} v(cdef.id, _updates, ctype);
@@ -259,7 +259,7 @@ struct extract_row_visitor {
udt_visitor(column_id id, std::map<change_key_t, row_update>& updates, const user_type_impl& utype)
: extract_collection_visitor<udt_visitor>(id, updates), _utype(utype) {}
data_type get_value_type(bytes_view key) {
data_type get_value_type(managed_bytes_view key) {
return _utype.type(deserialize_field_index(key));
}
} v(cdef.id, _updates, utype);
@@ -430,8 +430,8 @@ struct find_timestamp_visitor {
// with cdc$time using timestamp T + 1 instead of T.
visit(t.timestamp + 1);
}
void live_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); }
void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); }
void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); }
void dead_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); }
void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); }
void marker(const row_marker& rm) { visit(rm.timestamp()); }
void static_row_cells(auto&& visit_row_cells) { visit_row_cells(*this); }
@@ -516,14 +516,14 @@ struct should_split_visitor {
void collection_tombstone(const tombstone& t) { visit(t.timestamp + 1); }
virtual void live_collection_cell(bytes_view, const atomic_cell_view& cell) {
virtual void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) {
if (_had_row_marker) {
// nonatomic updates cannot be expressed with an INSERT.
return stop();
}
visit(cell);
}
void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); }
void dead_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); }
void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); }
virtual void marker(const row_marker& rm) {
@@ -574,7 +574,7 @@ class alternator_should_split_visitor : public should_split_visitor {
public:
~alternator_should_split_visitor() override = default;
void live_collection_cell(bytes_view, const atomic_cell_view& cell) override {
void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) override {
visit(cell.timestamp());
}