cdc: Handle non-atomic columns

Fixes #5669

This implements non-atomic collection and UDT handling for
both cdc preimage + delta.

To be able to express deltas in a meaningful way (and reconstruct
using it), non-atomic values are represented somewhat
differently from regular values:

* maps - stored as is (frozen)
* sets - stored as is (frozen)
* lists - stored as map<timeuuid, value> (frozen)
  this allows reconstructing the list, as otherwise
  things like list[0] = value cannot be represented
  in a meaningful way
* udt - stored as tuple<tuple<field0>, tuple<field1>...> (frozen)
  UDTs are normally just tuples + metadata, but we need to
  distinguish the case of outer tuple element == null, meaning
  "no info/does not partake in mutation" from tuple element
  being a tuple(null) (i.e. empty tuple), meaning "set field to
  null"
This commit is contained in:
Calle Wilund
2020-02-05 16:19:00 +00:00
committed by Nadav Har'El
parent d17ebde46b
commit a3a764fd10
2 changed files with 380 additions and 22 deletions

View File

@@ -44,6 +44,9 @@
#include "cql3/untyped_result_set.hh"
#include "log.hh"
#include "json.hh"
#include "types.hh"
#include "concrete_types.hh"
#include "types/listlike_partial_deserializing_iterator.hh"
namespace std {
@@ -265,7 +268,29 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
for (const auto& column : columns) {
auto type = column.type;
if (is_data_col) {
type = tuple_type_impl::get_instance({ /* op */ data_type_for<column_op_native_type>(), /* value */ type});
type = visit(*type, make_visitor(
// lists are represented as map<timeuuid, value_type>. Otherwise we cannot express delta
[] (const list_type_impl& type) {
return map_type_impl::get_instance(type.name_comparator(), type.value_comparator(), false);
},
// user types are expressed as tuple<tuple<field0>, tuple<field1>...>
// the extra tuple allows us to distinguish null == <no info>
// from <null> = set field to null
[] (const user_type_impl& type) -> data_type {
std::vector<data_type> types;
types.reserve(type.size());
for (auto& ft : type.field_types()) {
types.emplace_back(tuple_type_impl::get_instance({ ft }));
}
return tuple_type_impl::get_instance(std::move(types));
},
// everything else is just frozen self
[] (const abstract_type& type) {
return type.freeze();
}
));
type = tuple_type_impl::get_instance({ /* op */ data_type_for<column_op_native_type>(), /* value */ type->freeze()});
}
b.with_column("_" + column.name(), type);
}
@@ -569,10 +594,10 @@ public:
r.for_each_cell([&](column_id id, const atomic_cell_or_collection& cell) {
auto& cdef = _schema->column_at(ckind, id);
auto* dst = _log_schema->get_column_definition(to_bytes("_" + cdef.name()));
// TODO: collections.
if (cdef.is_atomic()) {
column_op op;
auto has_pirow = pirow && pirow->has(cdef.name_as_text());
auto op = column_op::del;
if (cdef.is_atomic()) {
values[1] = std::nullopt;
auto view = cell.as_atomic_cell(cdef);
if (view.is_live()) {
@@ -581,23 +606,141 @@ public:
if (view.is_live_and_has_ttl()) {
ttl = view.ttl();
}
} else {
op = column_op::del;
}
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(op)));
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt));
if (pirow && pirow->has(cdef.name_as_text())) {
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(column_op::set)));
values[1] = pirow->get_blob(cdef.name_as_text());
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
assert(pikey->explode() != log_ck.explode());
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt));
}
}
} else {
cdc_log.warn("Non-atomic cell ignored {}.{}:{}", _schema->ks_name(), _schema->cf_name(), cdef.name_as_text());
auto mv = cell.as_collection_mutation();
std::optional<column_op> oop;
op = column_op::add;
std::vector<bytes> buf;
values[1] = mv.with_deserialized(*cdef.type, [&](collection_mutation_view_description view) -> bytes_opt {
if (view.tomb) {
// there is a tombstone with timestamp before this mutation.
// this is how a assign collection = <value> is represented.
oop = column_op::set;
}
auto process_cells = [&](auto value_callback) {
for (auto& [key, value] : view.cells) {
// note: we are assuming that all mutations coming here adhere to
// / are created by the cql machinery or similar, i.e. if we have
// the tombstone above, it preceeds the actual cells, and is in
// fact an "assign" marker. So we only check for explicitly
// dead cells, i.e. null markers.
auto live = value.is_live();
if (!live) {
// technically mutations can express both add/set/delete
// even created by cql statements.
// But for now, assume we have homogeneous op, OR
// we're doing a full assign
// TODO: split into two log rows
if (oop == column_op::set) {
// dead cell in full assign? just don't include
continue;
}
if (oop.value_or(column_op::del) != column_op::del) {
cdc_log.warn("Unhandled case: mismatched set/add operations in multi cell type {}", cdef.type->name());
continue;
}
oop = oop.value_or(column_op::del);
value_callback(key, bytes_view{}, live);
continue;
}
auto val = value.value().is_fragmented()
? bytes_view{buf.emplace_back(value.value().linearize())}
: value.value().first_fragment()
;
value_callback(key, val, live);
}
};
return visit(*cdef.type, make_visitor(
// maps and lists are just flattened
[&] (const collection_type_impl& type) {
std::vector<std::pair<bytes_view, bytes_view>> result;
process_cells([&](const bytes_view& key, const bytes_view& value, bool live) {
result.emplace_back(key, value);
});
return map_type_impl::serialize_partially_deserialized_form(result, cql_serialization_format::internal());
},
// set need to transform from mutation view
[&] (const set_type_impl& type) {
std::vector<bytes_view> result;
process_cells([&](const bytes_view& key, const bytes_view& value, bool live) {
result.emplace_back(key);
});
return type.serialize_partially_deserialized_form(result, cql_serialization_format::internal());
},
// for user type we collect the fields in the mutation and set to
// tuple of value or tuple of null in case of delete.
// fields not in the mutation are null in the enclosing tuple, signifying "no info"
[&](const user_type_impl& type) {
std::vector<bytes_opt> res(type.size());
std::array<bytes_view_opt, 1> tmp;
// type of the actual value in log column tuple.
auto res_type = static_pointer_cast<const tuple_type_impl>(dst->type)->type(1);
auto tt = static_pointer_cast<const tuple_type_impl>(res_type);
process_cells([&](const bytes_view& key, const bytes_view& value, bool live) {
auto idx = deserialize_field_index(key);
tmp[0] = live ? bytes_view_opt{value} : std::nullopt;
res[idx].emplace(static_pointer_cast<const tuple_type_impl>(tt->all_types()[idx])->build_value(tmp));
});
return tt->build_value(res);
},
[&] (const abstract_type& o) -> bytes {
throw std::runtime_error(format("cdc transform: unknown type {}", o.name()));
}
));
});
op = oop.value_or(op);
}
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(op)));
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt));
if (has_pirow) {
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(column_op::set)));
values[1] = cdef.is_atomic()
? pirow->get_blob(cdef.name_as_text())
: values[1] = visit(*cdef.type, make_visitor(
// flatten set
[&] (const set_type_impl& type) {
auto v = pirow->get_view(cdef.name_as_text());
auto f = cql_serialization_format::internal();
auto n = read_collection_size(v, f);
std::vector<bytes_view> tmp;
tmp.reserve(n);
while (n--) {
tmp.emplace_back(read_collection_value(v, f)); // key
read_collection_value(v, f); // value. ignore.
}
return type.serialize_partially_deserialized_form(tmp, f);
},
// transform udt similarly to above
[&](const user_type_impl& type) {
std::vector<bytes_opt> res(type.size());
std::array<bytes_view_opt, 1> tmp;
// type of the actual value in log column tuple.
auto res_type = static_pointer_cast<const tuple_type_impl>(dst->type)->type(1);
auto tt = static_pointer_cast<const tuple_type_impl>(res_type);
size_t i = 0;
for (auto& v : type.split(pirow->get_view(cdef.name_as_text()))) {
if (v) {
tmp[0] = v;
res[i] = static_pointer_cast<const tuple_type_impl>(tt->all_types()[i])->build_value(tmp);
}
++i;
}
return tt->build_value(res);
},
[&] (const abstract_type& o) -> bytes {
return pirow->get_blob(cdef.name_as_text());
}
));
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
assert(pikey->explode() != log_ck.explode());
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt));
}
});
};
@@ -667,9 +810,13 @@ public:
columns.emplace_back(&cdef);
});
}
auto selection = cql3::selection::selection::for_columns(_schema, std::move(columns));
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), selection->get_query_options());
auto opts = selection->get_query_options();
opts.set(query::partition_slice::option::collections_as_maps);
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts));
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_partitions);
return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then(

View File

@@ -21,6 +21,7 @@
#include <seastar/testing/thread_test_case.hh>
#include <string>
#include <boost/range/adaptor/map.hpp>
#include "cdc/log.hh"
#include "db/config.hh"
@@ -33,6 +34,8 @@
#include "types.hh"
#include "types/tuple.hh"
#include "types/map.hh"
#include "types/list.hh"
#include "types/set.hh"
#include "types/user.hh"
using namespace std::string_literals;
@@ -610,3 +613,211 @@ SEASTAR_THREAD_TEST_CASE(test_ttls) {
test_ttl(10);
}, mk_cdc_test_config()).get();
}
// helper funcs + structs for collection testing
using translate_func = std::function<data_value(data_value)>;
struct col_test {
sstring update;
data_value prev;
data_value next;
cdc::column_op op = cdc::column_op::add;
};
// iterate a set of updates and verify pre and delta values.
static void test_collection(cql_test_env& e, data_type val_type, std::vector<col_test> tests, translate_func f = [](data_value v) { return v; }) {
using op_ut = std::underlying_type_t<cdc::column_op>;
auto col_type = tuple_type_impl::get_instance({ data_type_for<op_ut>(), val_type, long_type});
for (auto& t : tests) {
cquery_nofail(e, t.update);
auto rows = select_log(e, "tbl");
auto pre_image = to_bytes_filtered(*rows, cdc::operation::pre_image);
auto updates = to_bytes_filtered(*rows, cdc::operation::update);
sort_by_time(*rows, updates);
sort_by_time(*rows, pre_image);
auto val_index = column_index(*rows, "_val");
if (t.prev.is_null()) {
BOOST_REQUIRE(pre_image.empty());
} else {
BOOST_REQUIRE_GT(pre_image.size(), 0);
auto val = *pre_image.back()[val_index];
BOOST_REQUIRE_EQUAL(t.prev, f(value_cast<tuple_type_impl::native_type>(col_type->deserialize(bytes_view(val))).at(1)));
}
auto val = *updates.back()[val_index];
auto tup = value_cast<tuple_type_impl::native_type>(col_type->deserialize(bytes_view(val)));
if (!t.next.is_null()) {
BOOST_REQUIRE_EQUAL(t.next, f(tup.at(1)));
}
BOOST_REQUIRE_EQUAL(data_value(op_ut(t.op)), tup.at(0));
}
}
SEASTAR_THREAD_TEST_CASE(test_map_logging) {
do_with_cql_env_thread([](cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val map<text, text>, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s);
auto cleanup = defer([&] {
e.execute_cql("DROP TABLE ks.tbl").get();
});
auto map_type = map_type_impl::get_instance(utf8_type, utf8_type, false);
test_collection(e, map_type, {
{
"UPDATE ks.tbl set val = { 'apa':'ko' } where pk=1 and pk2=11 and ck=111",
data_value::make_null(map_type), // no prev value
::make_map_value(map_type, { { "apa", "ko" } }), // delta
cdc::column_op::set
},
{
"UPDATE ks.tbl set val = val + { 'ninja':'mission' } where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" } }),
::make_map_value(map_type, { { "ninja", "mission" } })
},
{
"UPDATE ks.tbl set val['ninja'] = 'shuriken' where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }),
::make_map_value(map_type, { { "ninja", "shuriken" } })
},
{
"UPDATE ks.tbl set val['apa'] = null where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }),
::make_map_value(map_type, { { "apa", data_value::make_null(utf8_type) } }),
cdc::column_op::del
}
});
}, mk_cdc_test_config()).get();
}
SEASTAR_THREAD_TEST_CASE(test_set_logging) {
do_with_cql_env_thread([](cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val set<text>, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s);
auto cleanup = defer([&] {
e.execute_cql("DROP TABLE ks.tbl").get();
});
auto set_type = set_type_impl::get_instance(utf8_type, false);
test_collection(e, set_type, {
{
"UPDATE ks.tbl set val = { 'apa', 'ko' } where pk=1 and pk2=11 and ck=111",
data_value::make_null(set_type), ::make_set_value(set_type, { "apa", "ko" }),
cdc::column_op::set
},
{
"UPDATE ks.tbl set val = val + { 'ninja', 'mission' } where pk=1 and pk2=11 and ck=111",
::make_set_value(set_type, { "apa", "ko" }),
::make_set_value(set_type, { "mission", "ninja" }) // note the sorting of sets
},
{
"UPDATE ks.tbl set val = val - { 'apa' } where pk=1 and pk2=11 and ck=111",
::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }),
::make_set_value(set_type, { "apa" }),
cdc::column_op::del
}
});
}, mk_cdc_test_config()).get();
}
SEASTAR_THREAD_TEST_CASE(test_list_logging) {
do_with_cql_env_thread([](cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val list<text>, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s);
auto cleanup = defer([&] {
e.execute_cql("DROP TABLE ks.tbl").get();
});
auto list_type = list_type_impl::get_instance(utf8_type, false);
auto val_type = map_type_impl::get_instance(list_type->name_comparator(), list_type->value_comparator(), false);
test_collection(e, val_type, {
{
"UPDATE ks.tbl set val = [ 'apa', 'ko' ] where pk=1 and pk2=11 and ck=111",
data_value::make_null(list_type), ::make_list_value(list_type, { "apa", "ko" }),
cdc::column_op::set
},
{
"UPDATE ks.tbl set val = val + [ 'ninja', 'mission' ] where pk=1 and pk2=11 and ck=111",
::make_list_value(list_type, { "apa", "ko" }),
::make_list_value(list_type, { "ninja", "mission" })
},
{
"UPDATE ks.tbl set val = [ 'bosse' ] + val where pk=1 and pk2=11 and ck=111",
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
::make_list_value(list_type, { "bosse" })
},
{
"DELETE val[0] from ks.tbl where pk=1 and pk2=11 and ck=111",
::make_list_value(list_type, { "bosse", "apa", "ko", "ninja", "mission" }),
data_value::make_null(list_type), // the record is the timeuuid, should maybe check, but...
cdc::column_op::del
},
{
"UPDATE ks.tbl set val[0] = 'babar' where pk=1 and pk2=11 and ck=111",
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
::make_list_value(list_type, { "babar" }),
}
}, [&](data_value v) {
auto map = value_cast<map_type_impl::native_type>(std::move(v));
auto cpy = boost::copy_range<std::vector<data_value>>(map | boost::adaptors::map_values);
return ::make_list_value(list_type, std::move(cpy));
});
}, mk_cdc_test_config()).get();
}
SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
do_with_cql_env_thread([](cql_test_env& e) {
cquery_nofail(e, "CREATE TYPE ks.mytype (field0 int, field1 text)"s);
cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val mytype, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s);
auto cleanup = defer([&] {
e.execute_cql("DROP TABLE ks.tbl").get();
e.execute_cql("DROP TYPE ks.mytype").get();
});
auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"),
{ to_bytes("field0"), to_bytes("field1") },
{ int32_type, utf8_type },
true
);
auto f0_type = tuple_type_impl::get_instance({ int32_type });
auto f1_type = tuple_type_impl::get_instance({ utf8_type });
auto cdc_tuple_type = tuple_type_impl::get_instance({ f0_type, f1_type });
auto make_tuple = [&](std::optional<std::optional<int32_t>> i, std::optional<std::optional<sstring>> s) {
return ::make_tuple_value(cdc_tuple_type, {
i ? ::make_tuple_value(f0_type, { *i }) : data_value::make_null(f0_type),
s ? ::make_tuple_value(f1_type, { *s }) : data_value::make_null(f1_type),
});
};
test_collection(e, cdc_tuple_type, {
{
"UPDATE ks.tbl set val = { field0: 12, field1: 'ko' } where pk=1 and pk2=11 and ck=111",
data_value::make_null(cdc_tuple_type), make_tuple(12, "ko"),
cdc::column_op::set
},
{
"UPDATE ks.tbl set val.field0 = 13 where pk=1 and pk2=11 and ck=111",
make_tuple(12, "ko"),
make_tuple(13, std::nullopt)
},
{
"UPDATE ks.tbl set val.field1 = 'nils' where pk=1 and pk2=11 and ck=111",
make_tuple(13, "ko"),
make_tuple(std::nullopt, "nils")
},
{
"UPDATE ks.tbl set val.field1 = null where pk=1 and pk2=11 and ck=111",
make_tuple(13, "nils"),
make_tuple(std::nullopt, std::optional<std::optional<sstring>>{ std::in_place, std::nullopt }),
cdc::column_op::del
},
});
}, mk_cdc_test_config()).get();
}