From a3a764fd10d883e4b0f3dd7825538e3ecdf39f9e Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 5 Feb 2020 16:19:00 +0000 Subject: [PATCH] cdc: Handle non-atomic columns Fixes #5669 This implements non-atomic collection and UDT handling for both cdc preimage + delta. To be able to express deltas in a meaningful way (and reconstruct using it), non-atomic values are represented somewhat differently from regular values: * maps - stored as is (frozen) * sets - stored as is (frozen) * lists - stored as map (frozen) this allows reconstructing the list, as otherwise things like list[0] = value cannot be represented in a meaningful way * udt - stored as tuple, tuple...> (frozen) UDTs are normally just tuples + metadata, but we need to distinguish the case of outer tuple element == null, meaning "no info/does not partake in mutation" from tuple element being a tuple(null) (i.e. empty tuple), meaning "set field to null" --- cdc/log.cc | 191 ++++++++++++++++++++++++++++++++----- test/boost/cdc_test.cc | 211 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 380 insertions(+), 22 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index 31dc62173b..7af0b805a9 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -44,6 +44,9 @@ #include "cql3/untyped_result_set.hh" #include "log.hh" #include "json.hh" +#include "types.hh" +#include "concrete_types.hh" +#include "types/listlike_partial_deserializing_iterator.hh" namespace std { @@ -265,7 +268,29 @@ static schema_ptr create_log_schema(const schema& s, std::optional for (const auto& column : columns) { auto type = column.type; if (is_data_col) { - type = tuple_type_impl::get_instance({ /* op */ data_type_for(), /* value */ type}); + type = visit(*type, make_visitor( + // lists are represented as map. Otherwise we cannot express delta + [] (const list_type_impl& type) { + return map_type_impl::get_instance(type.name_comparator(), type.value_comparator(), false); + }, + // user types are expressed as tuple, tuple...> + // the extra tuple allows us to distinguish null == + // from = set field to null + [] (const user_type_impl& type) -> data_type { + std::vector types; + types.reserve(type.size()); + for (auto& ft : type.field_types()) { + types.emplace_back(tuple_type_impl::get_instance({ ft })); + } + return tuple_type_impl::get_instance(std::move(types)); + }, + // everything else is just frozen self + [] (const abstract_type& type) { + return type.freeze(); + } + )); + + type = tuple_type_impl::get_instance({ /* op */ data_type_for(), /* value */ type->freeze()}); } b.with_column("_" + column.name(), type); } @@ -569,10 +594,10 @@ public: r.for_each_cell([&](column_id id, const atomic_cell_or_collection& cell) { auto& cdef = _schema->column_at(ckind, id); auto* dst = _log_schema->get_column_definition(to_bytes("_" + cdef.name())); - // TODO: collections. - if (cdef.is_atomic()) { - column_op op; + auto has_pirow = pirow && pirow->has(cdef.name_as_text()); + auto op = column_op::del; + if (cdef.is_atomic()) { values[1] = std::nullopt; auto view = cell.as_atomic_cell(cdef); if (view.is_live()) { @@ -581,23 +606,141 @@ public: if (view.is_live_and_has_ttl()) { ttl = view.ttl(); } - } else { - op = column_op::del; - } - - values[0] = data_type_for()->decompose(data_value(static_cast(op))); - res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt)); - - if (pirow && pirow->has(cdef.name_as_text())) { - values[0] = data_type_for()->decompose(data_value(static_cast(column_op::set))); - values[1] = pirow->get_blob(cdef.name_as_text()); - - assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck))); - assert(pikey->explode() != log_ck.explode()); - res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt)); - } + } } else { - cdc_log.warn("Non-atomic cell ignored {}.{}:{}", _schema->ks_name(), _schema->cf_name(), cdef.name_as_text()); + auto mv = cell.as_collection_mutation(); + std::optional oop; + op = column_op::add; + + std::vector buf; + values[1] = mv.with_deserialized(*cdef.type, [&](collection_mutation_view_description view) -> bytes_opt { + if (view.tomb) { + // there is a tombstone with timestamp before this mutation. + // this is how a assign collection = is represented. + oop = column_op::set; + } + auto process_cells = [&](auto value_callback) { + for (auto& [key, value] : view.cells) { + // note: we are assuming that all mutations coming here adhere to + // / are created by the cql machinery or similar, i.e. if we have + // the tombstone above, it preceeds the actual cells, and is in + // fact an "assign" marker. So we only check for explicitly + // dead cells, i.e. null markers. + auto live = value.is_live(); + if (!live) { + // technically mutations can express both add/set/delete + // even created by cql statements. + // But for now, assume we have homogeneous op, OR + // we're doing a full assign + // TODO: split into two log rows + if (oop == column_op::set) { + // dead cell in full assign? just don't include + continue; + } + if (oop.value_or(column_op::del) != column_op::del) { + cdc_log.warn("Unhandled case: mismatched set/add operations in multi cell type {}", cdef.type->name()); + continue; + } + oop = oop.value_or(column_op::del); + value_callback(key, bytes_view{}, live); + continue; + } + auto val = value.value().is_fragmented() + ? bytes_view{buf.emplace_back(value.value().linearize())} + : value.value().first_fragment() + ; + value_callback(key, val, live); + } + }; + + return visit(*cdef.type, make_visitor( + // maps and lists are just flattened + [&] (const collection_type_impl& type) { + std::vector> result; + process_cells([&](const bytes_view& key, const bytes_view& value, bool live) { + result.emplace_back(key, value); + }); + return map_type_impl::serialize_partially_deserialized_form(result, cql_serialization_format::internal()); + }, + // set need to transform from mutation view + [&] (const set_type_impl& type) { + std::vector result; + process_cells([&](const bytes_view& key, const bytes_view& value, bool live) { + result.emplace_back(key); + }); + return type.serialize_partially_deserialized_form(result, cql_serialization_format::internal()); + }, + // for user type we collect the fields in the mutation and set to + // tuple of value or tuple of null in case of delete. + // fields not in the mutation are null in the enclosing tuple, signifying "no info" + [&](const user_type_impl& type) { + std::vector res(type.size()); + std::array tmp; + // type of the actual value in log column tuple. + auto res_type = static_pointer_cast(dst->type)->type(1); + auto tt = static_pointer_cast(res_type); + process_cells([&](const bytes_view& key, const bytes_view& value, bool live) { + auto idx = deserialize_field_index(key); + tmp[0] = live ? bytes_view_opt{value} : std::nullopt; + res[idx].emplace(static_pointer_cast(tt->all_types()[idx])->build_value(tmp)); + }); + return tt->build_value(res); + }, + [&] (const abstract_type& o) -> bytes { + throw std::runtime_error(format("cdc transform: unknown type {}", o.name())); + } + )); + }); + + op = oop.value_or(op); + } + + values[0] = data_type_for()->decompose(data_value(static_cast(op))); + res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt)); + + if (has_pirow) { + values[0] = data_type_for()->decompose(data_value(static_cast(column_op::set))); + values[1] = cdef.is_atomic() + ? pirow->get_blob(cdef.name_as_text()) + : values[1] = visit(*cdef.type, make_visitor( + // flatten set + [&] (const set_type_impl& type) { + auto v = pirow->get_view(cdef.name_as_text()); + auto f = cql_serialization_format::internal(); + auto n = read_collection_size(v, f); + std::vector tmp; + tmp.reserve(n); + while (n--) { + tmp.emplace_back(read_collection_value(v, f)); // key + read_collection_value(v, f); // value. ignore. + } + return type.serialize_partially_deserialized_form(tmp, f); + }, + // transform udt similarly to above + [&](const user_type_impl& type) { + std::vector res(type.size()); + std::array tmp; + // type of the actual value in log column tuple. + auto res_type = static_pointer_cast(dst->type)->type(1); + auto tt = static_pointer_cast(res_type); + size_t i = 0; + for (auto& v : type.split(pirow->get_view(cdef.name_as_text()))) { + if (v) { + tmp[0] = v; + res[i] = static_pointer_cast(tt->all_types()[i])->build_value(tmp); + } + ++i; + } + return tt->build_value(res); + }, + [&] (const abstract_type& o) -> bytes { + return pirow->get_blob(cdef.name_as_text()); + } + )); + + assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck))); + assert(pikey->explode() != log_ck.explode()); + res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values), _cdc_ttl_opt)); } }); }; @@ -667,9 +810,13 @@ public: columns.emplace_back(&cdef); }); } - + auto selection = cql3::selection::selection::for_columns(_schema, std::move(columns)); - auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), selection->get_query_options()); + + auto opts = selection->get_query_options(); + opts.set(query::partition_slice::option::collections_as_maps); + + auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts)); auto command = ::make_lw_shared(_schema->id(), _schema->version(), partition_slice, query::max_partitions); return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then( diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index 815cce12d7..32e49e70dc 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -21,6 +21,7 @@ #include #include +#include #include "cdc/log.hh" #include "db/config.hh" @@ -33,6 +34,8 @@ #include "types.hh" #include "types/tuple.hh" #include "types/map.hh" +#include "types/list.hh" +#include "types/set.hh" #include "types/user.hh" using namespace std::string_literals; @@ -610,3 +613,211 @@ SEASTAR_THREAD_TEST_CASE(test_ttls) { test_ttl(10); }, mk_cdc_test_config()).get(); } + +// helper funcs + structs for collection testing +using translate_func = std::function; +struct col_test { + sstring update; + data_value prev; + data_value next; + cdc::column_op op = cdc::column_op::add; +}; + +// iterate a set of updates and verify pre and delta values. +static void test_collection(cql_test_env& e, data_type val_type, std::vector tests, translate_func f = [](data_value v) { return v; }) { + using op_ut = std::underlying_type_t; + auto col_type = tuple_type_impl::get_instance({ data_type_for(), val_type, long_type}); + + for (auto& t : tests) { + cquery_nofail(e, t.update); + + auto rows = select_log(e, "tbl"); + auto pre_image = to_bytes_filtered(*rows, cdc::operation::pre_image); + auto updates = to_bytes_filtered(*rows, cdc::operation::update); + + sort_by_time(*rows, updates); + sort_by_time(*rows, pre_image); + + auto val_index = column_index(*rows, "_val"); + + if (t.prev.is_null()) { + BOOST_REQUIRE(pre_image.empty()); + } else { + BOOST_REQUIRE_GT(pre_image.size(), 0); + auto val = *pre_image.back()[val_index]; + BOOST_REQUIRE_EQUAL(t.prev, f(value_cast(col_type->deserialize(bytes_view(val))).at(1))); + } + + auto val = *updates.back()[val_index]; + auto tup = value_cast(col_type->deserialize(bytes_view(val))); + + if (!t.next.is_null()) { + BOOST_REQUIRE_EQUAL(t.next, f(tup.at(1))); + } + + BOOST_REQUIRE_EQUAL(data_value(op_ut(t.op)), tup.at(0)); + } +} + +SEASTAR_THREAD_TEST_CASE(test_map_logging) { + do_with_cql_env_thread([](cql_test_env& e) { + cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val map, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s); + auto cleanup = defer([&] { + e.execute_cql("DROP TABLE ks.tbl").get(); + }); + + + auto map_type = map_type_impl::get_instance(utf8_type, utf8_type, false); + + test_collection(e, map_type, { + { + "UPDATE ks.tbl set val = { 'apa':'ko' } where pk=1 and pk2=11 and ck=111", + data_value::make_null(map_type), // no prev value + ::make_map_value(map_type, { { "apa", "ko" } }), // delta + cdc::column_op::set + }, + { + "UPDATE ks.tbl set val = val + { 'ninja':'mission' } where pk=1 and pk2=11 and ck=111", + ::make_map_value(map_type, { { "apa", "ko" } }), + ::make_map_value(map_type, { { "ninja", "mission" } }) + }, + { + "UPDATE ks.tbl set val['ninja'] = 'shuriken' where pk=1 and pk2=11 and ck=111", + ::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }), + ::make_map_value(map_type, { { "ninja", "shuriken" } }) + }, + { + "UPDATE ks.tbl set val['apa'] = null where pk=1 and pk2=11 and ck=111", + ::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }), + ::make_map_value(map_type, { { "apa", data_value::make_null(utf8_type) } }), + cdc::column_op::del + } + }); + }, mk_cdc_test_config()).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_set_logging) { + do_with_cql_env_thread([](cql_test_env& e) { + cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val set, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s); + auto cleanup = defer([&] { + e.execute_cql("DROP TABLE ks.tbl").get(); + }); + + auto set_type = set_type_impl::get_instance(utf8_type, false); + + test_collection(e, set_type, { + { + "UPDATE ks.tbl set val = { 'apa', 'ko' } where pk=1 and pk2=11 and ck=111", + data_value::make_null(set_type), ::make_set_value(set_type, { "apa", "ko" }), + cdc::column_op::set + }, + { + "UPDATE ks.tbl set val = val + { 'ninja', 'mission' } where pk=1 and pk2=11 and ck=111", + ::make_set_value(set_type, { "apa", "ko" }), + ::make_set_value(set_type, { "mission", "ninja" }) // note the sorting of sets + }, + { + "UPDATE ks.tbl set val = val - { 'apa' } where pk=1 and pk2=11 and ck=111", + ::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }), + ::make_set_value(set_type, { "apa" }), + cdc::column_op::del + } + }); + }, mk_cdc_test_config()).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_list_logging) { + do_with_cql_env_thread([](cql_test_env& e) { + cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val list, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s); + auto cleanup = defer([&] { + e.execute_cql("DROP TABLE ks.tbl").get(); + }); + + auto list_type = list_type_impl::get_instance(utf8_type, false); + auto val_type = map_type_impl::get_instance(list_type->name_comparator(), list_type->value_comparator(), false); + + test_collection(e, val_type, { + { + "UPDATE ks.tbl set val = [ 'apa', 'ko' ] where pk=1 and pk2=11 and ck=111", + data_value::make_null(list_type), ::make_list_value(list_type, { "apa", "ko" }), + cdc::column_op::set + }, + { + "UPDATE ks.tbl set val = val + [ 'ninja', 'mission' ] where pk=1 and pk2=11 and ck=111", + ::make_list_value(list_type, { "apa", "ko" }), + ::make_list_value(list_type, { "ninja", "mission" }) + }, + { + "UPDATE ks.tbl set val = [ 'bosse' ] + val where pk=1 and pk2=11 and ck=111", + ::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }), + ::make_list_value(list_type, { "bosse" }) + }, + { + "DELETE val[0] from ks.tbl where pk=1 and pk2=11 and ck=111", + ::make_list_value(list_type, { "bosse", "apa", "ko", "ninja", "mission" }), + data_value::make_null(list_type), // the record is the timeuuid, should maybe check, but... + cdc::column_op::del + }, + { + "UPDATE ks.tbl set val[0] = 'babar' where pk=1 and pk2=11 and ck=111", + ::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }), + ::make_list_value(list_type, { "babar" }), + } + }, [&](data_value v) { + auto map = value_cast(std::move(v)); + auto cpy = boost::copy_range>(map | boost::adaptors::map_values); + return ::make_list_value(list_type, std::move(cpy)); + }); + }, mk_cdc_test_config()).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_udt_logging) { + do_with_cql_env_thread([](cql_test_env& e) { + cquery_nofail(e, "CREATE TYPE ks.mytype (field0 int, field1 text)"s); + cquery_nofail(e, "CREATE TABLE ks.tbl (pk int, pk2 int, ck int, val mytype, PRIMARY KEY((pk, pk2), ck)) WITH cdc = {'enabled':'true', 'preimage':'true' }"s); + auto cleanup = defer([&] { + e.execute_cql("DROP TABLE ks.tbl").get(); + e.execute_cql("DROP TYPE ks.mytype").get(); + }); + + auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"), + { to_bytes("field0"), to_bytes("field1") }, + { int32_type, utf8_type }, + true + ); + auto f0_type = tuple_type_impl::get_instance({ int32_type }); + auto f1_type = tuple_type_impl::get_instance({ utf8_type }); + auto cdc_tuple_type = tuple_type_impl::get_instance({ f0_type, f1_type }); + + auto make_tuple = [&](std::optional> i, std::optional> s) { + return ::make_tuple_value(cdc_tuple_type, { + i ? ::make_tuple_value(f0_type, { *i }) : data_value::make_null(f0_type), + s ? ::make_tuple_value(f1_type, { *s }) : data_value::make_null(f1_type), + }); + }; + + test_collection(e, cdc_tuple_type, { + { + "UPDATE ks.tbl set val = { field0: 12, field1: 'ko' } where pk=1 and pk2=11 and ck=111", + data_value::make_null(cdc_tuple_type), make_tuple(12, "ko"), + cdc::column_op::set + }, + { + "UPDATE ks.tbl set val.field0 = 13 where pk=1 and pk2=11 and ck=111", + make_tuple(12, "ko"), + make_tuple(13, std::nullopt) + }, + { + "UPDATE ks.tbl set val.field1 = 'nils' where pk=1 and pk2=11 and ck=111", + make_tuple(13, "ko"), + make_tuple(std::nullopt, "nils") + }, + { + "UPDATE ks.tbl set val.field1 = null where pk=1 and pk2=11 and ck=111", + make_tuple(13, "nils"), + make_tuple(std::nullopt, std::optional>{ std::in_place, std::nullopt }), + cdc::column_op::del + }, + }); + }, mk_cdc_test_config()).get(); +}