diff --git a/cdc/change_visitor.hh b/cdc/change_visitor.hh index eb30cbf865..f6e893aa34 100644 --- a/cdc/change_visitor.hh +++ b/cdc/change_visitor.hh @@ -88,7 +88,7 @@ namespace cdc { template concept CollectionVisitor = requires(V v, const tombstone& t, - bytes_view key, + managed_bytes_view key, const atomic_cell_view& cell) { { v.collection_tombstone(t) } -> std::same_as; @@ -99,8 +99,8 @@ concept CollectionVisitor = requires(V v, struct dummy_collection_visitor { void collection_tombstone(const tombstone&) {} - void live_collection_cell(bytes_view, const atomic_cell_view&) {} - void dead_collection_cell(bytes_view, const atomic_cell_view&) {} + void live_collection_cell(managed_bytes_view, const atomic_cell_view&) {} + void dead_collection_cell(managed_bytes_view, const atomic_cell_view&) {} bool finished() { return false; } }; @@ -170,31 +170,31 @@ void inspect_row_cells(const schema& s, column_kind ckind, const row& r, V& v) { return stop_iteration(v.finished()); } - acoc.as_collection_mutation().with_deserialized(*cdef.type, [&v, &cdef] (collection_mutation_view_description view) { - v.collection_column(cdef, [&view] (CollectionVisitor auto& cv) { + auto cmv = acoc.as_collection_mutation(); + auto tomb = cmv.tomb(); + v.collection_column(cdef, [&cmv, &tomb] (CollectionVisitor auto& cv) { + if (cv.finished()) { + return; + } + + if (tomb) { + cv.collection_tombstone(tomb); if (cv.finished()) { return; } + } - if (view.tomb) { - cv.collection_tombstone(view.tomb); - if (cv.finished()) { - return; - } + for (auto& [key, cell] : cmv) { + if (cell.is_live()) { + cv.live_collection_cell(key, cell); + } else { + cv.dead_collection_cell(key, cell); } - for (auto& [key, cell]: view.cells) { - if (cell.is_live()) { - cv.live_collection_cell(key, cell); - } else { - cv.dead_collection_cell(key, cell); - } - - if (cv.finished()) { - return; - } + if (cv.finished()) { + return; } - }); + } }); return stop_iteration(v.finished()); diff --git a/cdc/log.cc b/cdc/log.cc index f6dac85690..97f3e716d9 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -1319,7 +1319,7 @@ struct process_row_visitor { _is_column_delete = true; } - void dead_collection_cell(bytes_view key, const atomic_cell_view&) { + void dead_collection_cell(managed_bytes_view key, const atomic_cell_view&) { _deleted_keys.push_back(key); } @@ -1338,7 +1338,7 @@ struct process_row_visitor { set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {} - void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { + void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); _added_keys.emplace_back(key); } @@ -1364,7 +1364,7 @@ struct process_row_visitor { udt_visitor(ttl_opt& ttl_column, size_t num_keys) : collection_visitor(ttl_column), _added_cells(num_keys) {} - void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { + void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); _added_cells[deserialize_field_index(key)].emplace(cell.value()); } @@ -1392,7 +1392,7 @@ struct process_row_visitor { map_or_list_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {} - void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { + void live_collection_cell(managed_bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); _added_cells.emplace_back(key, cell.value()); } diff --git a/cdc/split.cc b/cdc/split.cc index ee6ad17f31..c69ce6ede2 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -193,7 +193,7 @@ private: data_type get_value_type(bytes_view); */ - void cell(bytes_view key, const atomic_cell_view& c) { + void cell(managed_bytes_view key, const atomic_cell_view& c) { auto& entry = get_or_append_entry(c.timestamp(), get_ttl(c)); entry.cells.emplace_back(to_bytes(key), atomic_cell(*static_cast(*this).get_value_type(key), c)); } @@ -207,11 +207,11 @@ public: entry.t = t; } - void live_collection_cell(bytes_view key, const atomic_cell_view& c) { + void live_collection_cell(managed_bytes_view key, const atomic_cell_view& c) { cell(key, c); } - void dead_collection_cell(bytes_view key, const atomic_cell_view& c) { + void dead_collection_cell(managed_bytes_view key, const atomic_cell_view& c) { cell(key, c); } @@ -245,7 +245,7 @@ struct extract_row_visitor { collection_visitor(column_id id, std::map& updates, const collection_type_impl& ctype) : extract_collection_visitor(id, updates), _value_type(ctype.value_comparator()) {} - data_type get_value_type(bytes_view) { + data_type get_value_type(managed_bytes_view) { return _value_type; } } v(cdef.id, _updates, ctype); @@ -259,7 +259,7 @@ struct extract_row_visitor { udt_visitor(column_id id, std::map& updates, const user_type_impl& utype) : extract_collection_visitor(id, updates), _utype(utype) {} - data_type get_value_type(bytes_view key) { + data_type get_value_type(managed_bytes_view key) { return _utype.type(deserialize_field_index(key)); } } v(cdef.id, _updates, utype); @@ -430,8 +430,8 @@ struct find_timestamp_visitor { // with cdc$time using timestamp T + 1 instead of T. visit(t.timestamp + 1); } - void live_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); } - void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); } + void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); } + void dead_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); } void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); } void marker(const row_marker& rm) { visit(rm.timestamp()); } void static_row_cells(auto&& visit_row_cells) { visit_row_cells(*this); } @@ -516,14 +516,14 @@ struct should_split_visitor { void collection_tombstone(const tombstone& t) { visit(t.timestamp + 1); } - virtual void live_collection_cell(bytes_view, const atomic_cell_view& cell) { + virtual void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { if (_had_row_marker) { // nonatomic updates cannot be expressed with an INSERT. return stop(); } visit(cell); } - void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); } + void dead_collection_cell(managed_bytes_view, const atomic_cell_view& cell) { visit(cell); } void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); } virtual void marker(const row_marker& rm) { @@ -574,7 +574,7 @@ class alternator_should_split_visitor : public should_split_visitor { public: ~alternator_should_split_visitor() override = default; - void live_collection_cell(bytes_view, const atomic_cell_view& cell) override { + void live_collection_cell(managed_bytes_view, const atomic_cell_view& cell) override { visit(cell.timestamp()); }