cdc: log: switch the remaining usages of bytes to managed_bytes in collection_visitor

This commit is contained in:
Michał Chojnowski
2021-04-07 16:20:09 +02:00
parent 2760382a68
commit 061f72166c

View File

@@ -981,12 +981,6 @@ static managed_bytes get_managed_bytes(const atomic_cell_view& acv) {
return managed_bytes(acv.value());
}
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::forward_list<bytes>& buf) {
return acv.value().is_fragmented()
? bytes_view{buf.emplace_front(to_bytes(acv.value()))}
: acv.value().current_fragment();
}
static ttl_opt get_ttl(const atomic_cell_view& acv) {
return acv.is_live_and_has_ttl() ? std::optional{acv.ttl()} : std::nullopt;
}
@@ -1106,26 +1100,26 @@ struct process_row_visitor {
// cdc$deleted_col, cdc$deleted_elements_col, col
using result_t = std::tuple<bool, std::vector<managed_bytes_view>, bytes_opt>;
using result_t = std::tuple<bool, std::vector<managed_bytes_view>, managed_bytes_opt>;
auto result = visit(*cdef.type, make_visitor(
[&] (const set_type_impl&) -> result_t {
_touched_parts.set<stats::part_type::SET>();
struct set_visitor : public collection_visitor {
std::vector<bytes_view> _added_keys;
std::vector<managed_bytes_view> _added_keys;
set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_keys.push_back(key);
_added_keys.emplace_back(key);
}
} v(_ttl_column);
visit_collection(v);
bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
std::optional{set_type_impl::serialize_partially_deserialized_form(v._added_keys, cql_serialization_format::internal())};
managed_bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
std::optional{set_type_impl::serialize_partially_deserialized_form_fragmented(v._added_keys, cql_serialization_format::internal())};
return {
v._is_column_delete,
@@ -1137,25 +1131,23 @@ struct process_row_visitor {
_touched_parts.set<stats::part_type::UDT>();
struct udt_visitor : public collection_visitor {
std::vector<bytes_view_opt> _added_cells;
std::forward_list<bytes>& _buf;
std::vector<managed_bytes_view_opt> _added_cells;
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::forward_list<bytes>& buf)
: collection_visitor(ttl_column), _added_cells(num_keys), _buf(buf) {}
udt_visitor(ttl_opt& ttl_column, size_t num_keys)
: collection_visitor(ttl_column), _added_cells(num_keys) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells[deserialize_field_index(key)].emplace(get_bytes_view(cell, _buf));
_added_cells[deserialize_field_index(key)].emplace(cell.value());
}
};
std::forward_list<bytes> buf;
udt_visitor v(_ttl_column, type.size(), buf);
udt_visitor v(_ttl_column, type.size());
visit_collection(v);
bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{type.build_value(v._added_cells)};
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{type.build_value_fragmented(v._added_cells)};
return {
v._is_column_delete,
@@ -1167,25 +1159,23 @@ struct process_row_visitor {
_touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP);
struct map_or_list_visitor : public collection_visitor {
std::vector<std::pair<bytes_view, bytes_view>> _added_cells;
std::forward_list<bytes>& _buf;
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> _added_cells;
map_or_list_visitor(ttl_opt& ttl_column, std::forward_list<bytes>& buf)
: collection_visitor(ttl_column), _buf(buf) {}
map_or_list_visitor(ttl_opt& ttl_column)
: collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells.emplace_back(key, get_bytes_view(cell, _buf));
_added_cells.emplace_back(key, cell.value());
}
};
std::forward_list<bytes> buf;
map_or_list_visitor v(_ttl_column, buf);
map_or_list_visitor v(_ttl_column);
visit_collection(v);
bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{map_type_impl::serialize_partially_deserialized_form(v._added_cells, cql_serialization_format::internal())};
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{map_type_impl::serialize_partially_deserialized_form_fragmented(v._added_cells, cql_serialization_format::internal())};
return {
v._is_column_delete,
@@ -1219,21 +1209,19 @@ struct process_row_visitor {
}
if (added_cells) {
_builder.set_value(_log_ck, cdef, bytes_view(*added_cells));
_builder.set_value(_log_ck, cdef, *added_cells);
}
}
// images
if (_enable_updating_state) {
// FIXME: get rid of these conversions in the next commits.
auto to_managed_bytes_opt = [] (const bytes_opt& b) -> managed_bytes_opt { return b ? std::optional(managed_bytes(*b)) : std::nullopt; };
// A column delete overwrites any data we gathered until now.
managed_bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef);
managed_bytes_opt next;
if (added_cells || (deleted_elements && prev)) {
next = visit(*cdef.type, [&] (const auto& type) -> managed_bytes {
return merge(type, prev, to_managed_bytes_opt(added_cells), deleted_elements);
return merge(type, prev, added_cells, deleted_elements);
});
}