Files
scylladb/cdc/log.cc
Nadav Har'El 2dcb6294da merge: cdc: New delta modes: off, keys, fulll
Merged pull request https://github.com/scylladb/scylla/pull/6914
by By Juliusz Stasiewicz:

The goal is to have finer control over CDC "delta" rows, i.e.:

    disable them totally (mode off);
    record only base PK+CK columns (mode keys);
    make them behave as usual (mode full, default).

The editing of log rows is performed at the stage of finishing CDC mutation.

Fixes #6838

  tests: Added CQL test for `delta mode`
  cdc: Implementations of `delta_mode::off/keys`
  cdc: Infrastructure for controlling `delta_mode`
2020-08-03 14:10:15 +03:00

1632 lines
74 KiB
C++

/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <utility>
#include <algorithm>
#include <boost/range/irange.hpp>
#include <seastar/util/defer.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/metrics.hh>
#include "cdc/log.hh"
#include "cdc/generation.hh"
#include "cdc/split.hh"
#include "bytes.hh"
#include "database.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "partition_slice_builder.hh"
#include "schema.hh"
#include "schema_builder.hh"
#include "service/migration_listener.hh"
#include "service/storage_service.hh"
#include "types/tuple.hh"
#include "cql3/statements/select_statement.hh"
#include "cql3/multi_column_relation.hh"
#include "cql3/tuples.hh"
#include "cql3/untyped_result_set.hh"
#include "log.hh"
#include "utils/rjson.hh"
#include "types.hh"
#include "concrete_types.hh"
#include "types/listlike_partial_deserializing_iterator.hh"
#include "tracing/trace_state.hh"
#include "stats.hh"
#include "compaction_strategy.hh"
namespace std {
template<> struct hash<std::pair<net::inet_address, unsigned int>> {
std::size_t operator()(const std::pair<net::inet_address, unsigned int> &p) const {
return std::hash<net::inet_address>{}(p.first) ^ std::hash<int>{}(p.second);
}
};
}
using namespace std::chrono_literals;
logging::logger cdc_log("cdc");
namespace cdc {
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
}
static constexpr auto cdc_group_name = "cdc";
void cdc::stats::parts_touched_stats::register_metrics(seastar::metrics::metric_groups& metrics, std::string_view suffix) {
namespace sm = seastar::metrics;
auto register_part = [&] (part_type part, sstring part_name) {
metrics.add_group(cdc_group_name, {
sm::make_total_operations(format("operations_on_{}_performed_{}", part_name, suffix), count[(size_t)part],
sm::description(format("number of {} CDC operations that processed a {}", suffix, part_name)),
{})
});
};
register_part(part_type::STATIC_ROW, "static_row");
register_part(part_type::CLUSTERING_ROW, "clustering_row");
register_part(part_type::MAP, "map");
register_part(part_type::SET, "set");
register_part(part_type::LIST, "list");
register_part(part_type::UDT, "udt");
register_part(part_type::RANGE_TOMBSTONE, "range_tombstone");
register_part(part_type::PARTITION_DELETE, "partition_delete");
register_part(part_type::ROW_DELETE, "row_delete");
}
cdc::stats::stats() {
namespace sm = seastar::metrics;
auto register_counters = [this] (counters& counters, sstring kind) {
const auto split_label = sm::label("split");
_metrics.add_group(cdc_group_name, {
sm::make_total_operations("operations_" + kind, counters.unsplit_count,
sm::description(format("number of {} CDC operations", kind)),
{split_label(false)}),
sm::make_total_operations("operations_" + kind, counters.split_count,
sm::description(format("number of {} CDC operations", kind)),
{split_label(true)}),
sm::make_total_operations("preimage_selects_" + kind, counters.preimage_selects,
sm::description(format("number of {} preimage queries performed", kind)),
{}),
sm::make_total_operations("operations_with_preimage_" + kind, counters.with_preimage_count,
sm::description(format("number of {} operations that included preimage", kind)),
{}),
sm::make_total_operations("operations_with_postimage_" + kind, counters.with_postimage_count,
sm::description(format("number of {} operations that included postimage", kind)),
{})
});
counters.touches.register_metrics(_metrics, kind);
};
register_counters(counters_total, "total");
register_counters(counters_failed, "failed");
}
cdc::operation_result_tracker::~operation_result_tracker() {
auto update_stats = [this] (stats::counters& counters) {
if (_details.was_split) {
counters.split_count++;
} else {
counters.unsplit_count++;
}
counters.touches.apply(_details.touched_parts);
if (_details.had_preimage) {
counters.with_preimage_count++;
}
if (_details.had_postimage) {
counters.with_postimage_count++;
}
};
update_stats(_stats.counters_total);
if (_failed) {
update_stats(_stats.counters_failed);
}
}
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
friend cdc_service;
db_context _ctxt;
bool _stopped = false;
public:
impl(db_context ctxt)
: _ctxt(std::move(ctxt))
{
_ctxt._migration_notifier.register_listener(this);
}
~impl() {
assert(_stopped);
}
future<> stop() {
return _ctxt._migration_notifier.unregister_listener(this).then([this] {
_stopped = true;
});
}
void on_before_create_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
if (schema.cdc_options().enabled()) {
auto& db = _ctxt._proxy.get_db().local();
auto logname = log_name(schema.cf_name());
check_that_cdc_log_table_does_not_exist(db, schema, logname);
ensure_that_table_has_no_counter_columns(schema);
// in seastar thread
auto log_schema = create_log_schema(schema);
auto& keyspace = db.find_keyspace(schema.ks_name());
auto log_mut = db::schema_tables::make_create_table_mutations(keyspace.metadata(), log_schema, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
}
}
void on_before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
bool is_cdc = new_schema.cdc_options().enabled();
bool was_cdc = old_schema.cdc_options().enabled();
// we need to create or modify the log & stream schemas iff either we changed cdc status (was != is)
// or if cdc is on now unconditionally, since then any actual base schema changes will affect the column
// etc.
if (was_cdc || is_cdc) {
auto& db = _ctxt._proxy.get_db().local();
if (!was_cdc) {
check_that_cdc_log_table_does_not_exist(db, new_schema, log_name(new_schema.cf_name()));
}
if (is_cdc) {
check_for_attempt_to_create_nested_cdc_log(new_schema);
ensure_that_table_has_no_counter_columns(new_schema);
}
auto logname = log_name(old_schema.cf_name());
auto& keyspace = db.find_keyspace(old_schema.ks_name());
auto log_schema = was_cdc ? db.find_column_family(old_schema.ks_name(), logname).schema() : nullptr;
if (!is_cdc) {
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
return;
}
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
: db::schema_tables::make_create_table_mutations(keyspace.metadata(), new_log_schema, timestamp)
;
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
}
}
void on_before_drop_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
if (schema.cdc_options().enabled()) {
auto logname = log_name(schema.cf_name());
auto& db = _ctxt._proxy.get_db().local();
auto& keyspace = db.find_keyspace(schema.ks_name());
auto log_schema = db.find_column_family(schema.ks_name(), logname).schema();
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
}
}
future<std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>> augment_mutation_call(
lowres_clock::time_point timeout,
std::vector<mutation>&& mutations,
tracing::trace_state_ptr tr_state,
db::consistency_level write_cl
);
template<typename Iter>
future<> append_mutations(Iter i, Iter e, schema_ptr s, lowres_clock::time_point, std::vector<mutation>&);
private:
static void check_for_attempt_to_create_nested_cdc_log(const schema& schema) {
const auto& cf_name = schema.cf_name();
const auto cf_name_view = std::string_view(cf_name.data(), cf_name.size());
if (is_log_for_some_table(schema.ks_name(), cf_name_view)) {
throw exceptions::invalid_request_exception(format("Cannot create a CDC log for a table {}.{}, because creating nested CDC logs is not allowed",
schema.ks_name(), schema.cf_name()));
}
}
static void check_that_cdc_log_table_does_not_exist(database& db, const schema& schema, const sstring& logname) {
if (db.has_schema(schema.ks_name(), logname)) {
throw exceptions::invalid_request_exception(format("Cannot create CDC log table for table {}.{} because a table of name {}.{} already exists",
schema.ks_name(), schema.cf_name(),
schema.ks_name(), logname));
}
}
static void ensure_that_table_has_no_counter_columns(const schema& schema) {
if (schema.is_counter()) {
throw exceptions::invalid_request_exception(format("Cannot create CDC log for table {}.{}. Counter support not implemented",
schema.ks_name(), schema.cf_name()));
}
}
};
cdc::cdc_service::cdc_service(service::storage_proxy& proxy)
: cdc_service(db_context::builder(proxy).build())
{}
cdc::cdc_service::cdc_service(db_context ctxt)
: _impl(std::make_unique<impl>(std::move(ctxt)))
{
_impl->_ctxt._proxy.set_cdc_service(this);
}
future<> cdc::cdc_service::stop() {
_impl->_ctxt._proxy.set_cdc_service(nullptr);
return _impl->stop();
}
cdc::cdc_service::~cdc_service() = default;
namespace {
const sstring delta_mode_string_off = "off";
const sstring delta_mode_string_keys = "keys";
const sstring delta_mode_string_full = "full";
sstring to_string(cdc::delta_mode dm) {
switch (dm) {
case cdc::delta_mode::off : return delta_mode_string_off;
case cdc::delta_mode::keys : return delta_mode_string_keys;
case cdc::delta_mode::full : return delta_mode_string_full;
}
throw std::logic_error("Impossible value of cdc::delta_mode");
}
} // anon. namespace
cdc::options::options(const std::map<sstring, sstring>& map) {
if (map.find("enabled") == std::end(map)) {
return;
}
for (auto& p : map) {
if (p.first == "enabled") {
_enabled = p.second == "true";
} else if (p.first == "preimage") {
_preimage = p.second == "true";
} else if (p.first == "postimage") {
_postimage = p.second == "true";
} else if (p.first == "delta") {
if (p.second == delta_mode_string_keys) {
_delta_mode = delta_mode::keys;
} else if (p.second == delta_mode_string_off) {
_delta_mode = delta_mode::off;
} else if (p.second != delta_mode_string_full) {
throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second);
}
} else if (p.first == "ttl") {
_ttl = std::stoi(p.second);
if (_ttl < 0) {
throw exceptions::configuration_exception("Invalid CDC option: ttl must be >= 0");
}
} else {
throw exceptions::configuration_exception("Invalid CDC option: " + p.first);
}
}
if (_enabled && !_preimage && !_postimage && _delta_mode == delta_mode::off) {
throw exceptions::configuration_exception("Invalid combination of CDC options: neither of"
" {preimage, postimage, delta} is enabled");
}
}
std::map<sstring, sstring> cdc::options::to_map() const {
if (!_enabled) {
return {};
}
return {
{ "enabled", _enabled ? "true" : "false" },
{ "preimage", _preimage ? "true" : "false" },
{ "postimage", _postimage ? "true" : "false" },
{ "delta", to_string(_delta_mode) },
{ "ttl", std::to_string(_ttl) },
};
}
sstring cdc::options::to_sstring() const {
return rjson::print(rjson::from_string_map(to_map()));
}
bool cdc::options::operator==(const options& o) const {
return _enabled == o._enabled && _preimage == o._preimage && _postimage == o._postimage && _ttl == o._ttl
&& _delta_mode == o._delta_mode;
}
bool cdc::options::operator!=(const options& o) const {
return !(*this == o);
}
namespace cdc {
using operation_native_type = std::underlying_type_t<operation>;
static const sstring cdc_log_suffix = "_scylla_cdc_log";
static const sstring cdc_meta_column_prefix = "cdc$";
static const sstring cdc_deleted_column_prefix = cdc_meta_column_prefix + "deleted_";
static const sstring cdc_deleted_elements_column_prefix = cdc_meta_column_prefix + "deleted_elements_";
static bool is_log_name(const std::string_view& table_name) {
return boost::ends_with(table_name, cdc_log_suffix);
}
bool is_cdc_metacolumn_name(const sstring& name) {
return name.compare(0, cdc_meta_column_prefix.size(), cdc_meta_column_prefix) == 0;
}
bool is_log_for_some_table(const sstring& ks_name, const std::string_view& table_name) {
auto base_schema = get_base_table(service::get_local_storage_proxy().get_db().local(), ks_name, table_name);
if (!base_schema) {
return false;
}
return base_schema->cdc_options().enabled();
}
schema_ptr get_base_table(const database& db, const schema& s) {
return get_base_table(db, s.ks_name(), s.cf_name());
}
schema_ptr get_base_table(const database& db, sstring_view ks_name,std::string_view table_name) {
if (!is_log_name(table_name)) {
return nullptr;
}
// Note: It is legal for a user to directly create a table with name
// `X_scylla_cdc_log`. A table with name `X` might be present (but with
// cdc log disabled), or not present at all when creating `X_scylla_cdc_log`.
// Therefore, existence of `X_scylla_cdc_log` does not imply existence of `X`
// and, in order not to throw, we explicitly need to check for existence of 'X'.
const auto table_base_name = base_name(table_name);
if (!db.has_schema(ks_name, table_base_name)) {
return nullptr;
}
return db.find_schema(sstring(ks_name), table_base_name);
}
seastar::sstring base_name(std::string_view log_name) {
assert(is_log_name(log_name));
return sstring(log_name.data(), log_name.size() - cdc_log_suffix.size());
}
sstring log_name(std::string_view table_name) {
return sstring(table_name) + cdc_log_suffix;
}
sstring log_data_column_name(std::string_view column_name) {
return sstring(column_name);
}
seastar::sstring log_meta_column_name(std::string_view column_name) {
return cdc_meta_column_prefix + sstring(column_name);
}
bytes log_data_column_name_bytes(const bytes& column_name) {
return column_name;
}
bytes log_meta_column_name_bytes(const bytes& column_name) {
return to_bytes(cdc_meta_column_prefix) + column_name;
}
seastar::sstring log_data_column_deleted_name(std::string_view column_name) {
return cdc_deleted_column_prefix + sstring(column_name);
}
bytes log_data_column_deleted_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_column_prefix) + column_name;
}
seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name) {
return cdc_deleted_elements_column_prefix + sstring(column_name);
}
bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
b.set_comment(sprint("CDC log for %s.%s", s.ks_name(), s.cf_name()));
auto ttl_seconds = s.cdc_options().ttl();
if (ttl_seconds > 0) {
b.set_gc_grace_seconds(0);
auto ceil = [] (int dividend, int divisor) {
return dividend / divisor + (dividend % divisor == 0 ? 0 : 1);
};
auto seconds_to_minutes = [] (int seconds_value) {
using namespace std::chrono;
return std::chrono::ceil<minutes>(seconds(seconds_value)).count();
};
// What's the minimum window that won't create more than 24 sstables.
auto window_seconds = ceil(ttl_seconds, 24);
auto window_minutes = seconds_to_minutes(window_seconds);
b.set_compaction_strategy_options({
{"compaction_window_unit", "MINUTES"},
{"compaction_window_size", std::to_string(window_minutes)},
// A new SSTable will become fully expired every
// `window_seconds` seconds so we shouldn't check for expired
// sstables too often.
{"expired_sstable_check_frequency_seconds",
std::to_string(std::max(1, window_seconds / 2))},
});
}
b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key);
b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
auto type = column.type;
if (is_data_col && type->is_multi_cell()) {
type = visit(*type, make_visitor(
// non-frozen lists are represented as map<timeuuid, value_type>. Otherwise we cannot express delta
[] (const list_type_impl& type) -> data_type {
return map_type_impl::get_instance(type.name_comparator(), type.value_comparator(), false);
},
// everything else is just frozen self
[] (const abstract_type& type) {
return type.freeze();
}
));
}
b.with_column(log_data_column_name_bytes(column.name()), type);
if (is_data_col) {
b.with_column(log_data_column_deleted_name_bytes(column.name()), boolean_type);
}
if (column.type->is_multi_cell()) {
auto dtype = visit(*type, make_visitor(
// all collection deletes are set<key_type> (i.e. timeuuid for lists)
[] (const collection_type_impl& type) -> data_type {
return set_type_impl::get_instance(type.name_comparator(), false);
},
// user types deletes are a set of the indices removed
[] (const user_type_impl& type) -> data_type {
return set_type_impl::get_instance(short_type, false);
},
[] (const abstract_type& type) -> data_type {
throw std::invalid_argument("Should not reach");
}
));
b.with_column(log_data_column_deleted_elements_name_bytes(column.name()), dtype);
}
}
};
add_columns(s.partition_key_columns());
add_columns(s.clustering_key_columns());
add_columns(s.static_columns(), true);
add_columns(s.regular_columns(), true);
if (uuid) {
b.set_uuid(*uuid);
}
return b.build();
}
db_context::builder::builder(service::storage_proxy& proxy)
: _proxy(proxy)
{}
db_context::builder& db_context::builder::with_migration_notifier(service::migration_notifier& migration_notifier) {
_migration_notifier = migration_notifier;
return *this;
}
db_context::builder& db_context::builder::with_token_metadata(locator::token_metadata& token_metadata) {
_token_metadata = token_metadata;
return *this;
}
db_context::builder& db_context::builder::with_cdc_metadata(cdc::metadata& cdc_metadata) {
_cdc_metadata = cdc_metadata;
return *this;
}
db_context db_context::builder::build() {
return db_context{
_proxy,
_migration_notifier ? _migration_notifier->get() : service::get_local_storage_service().get_migration_notifier(),
_token_metadata ? _token_metadata->get() : service::get_local_storage_service().get_token_metadata(),
_cdc_metadata ? _cdc_metadata->get() : service::get_local_storage_service().get_cdc_metadata(),
};
}
// iterators for collection merge
template<typename T>
class collection_iterator : public std::iterator<std::input_iterator_tag, const T> {
bytes_view _v, _next;
size_t _rem = 0;
T _current;
public:
collection_iterator(bytes_view_opt v = {})
: _v(v.value_or(bytes_view{}))
, _rem(_v.empty() ? 0 : read_collection_size(_v, cql_serialization_format::internal()))
{
if (_rem != 0) {
parse();
}
}
collection_iterator(const collection_iterator&) = default;
const T& operator*() const {
return _current;
}
const T* operator->() const {
return &_current;
}
collection_iterator& operator++() {
next();
if (_rem != 0) {
parse();
} else {
_current = {};
}
return *this;
}
collection_iterator operator++(int) {
auto v = *this;
++(*this);
return v;
}
bool operator==(const collection_iterator& x) const {
return _v == x._v;
}
bool operator!=(const collection_iterator& x) const {
return !(*this == x);
}
private:
void next() {
--_rem;
_v = _next;
}
void parse();
};
template<>
void collection_iterator<std::pair<bytes_view, bytes_view>>::parse() {
assert(_rem > 0);
_next = _v;
auto k = read_collection_value(_next, cql_serialization_format::internal());
auto v = read_collection_value(_next, cql_serialization_format::internal());
_current = std::make_pair(k, v);
}
template<>
void collection_iterator<bytes_view>::parse() {
assert(_rem > 0);
_next = _v;
auto k = read_collection_value(_next, cql_serialization_format::internal());
_current = k;
}
template<typename Container, typename T>
class maybe_back_insert_iterator : public std::back_insert_iterator<Container> {
const abstract_type& _type;
collection_iterator<T> _s, _e;
public:
using value_type = typename Container::value_type;
maybe_back_insert_iterator(Container& c, const abstract_type& type, collection_iterator<T> s)
: std::back_insert_iterator<Container>(c)
, _type(type)
, _s(s)
{}
maybe_back_insert_iterator& operator*() {
return *this;
}
maybe_back_insert_iterator& operator=(const value_type& v) {
if (!find(v)) {
std::back_insert_iterator<Container>::operator=(v);
}
return *this;
}
maybe_back_insert_iterator& operator=(value_type&& v) {
if (!find(v)) {
std::back_insert_iterator<Container>::operator=(std::move(v));
}
return *this;
}
private:
bool find(const value_type& v) {
// cheating - reducing search span, because we know we only append unique values (see below).
while (_s != _e) {
auto n = compare(*_s, v);
if (n <= 0) {
++_s;
}
if (n == 0) {
return true;
}
if (n > 0) {
break;
}
}
return false;
}
bool compare(const T&, const value_type& v);
};
template<>
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v.first);
}
template<>
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v);
}
template<typename Container, typename T>
auto make_maybe_back_inserter(Container& c, const abstract_type& type, collection_iterator<T> s) {
return maybe_back_insert_iterator<Container, T>(c, type, s);
}
/* Given a timestamp, generates a timeuuid with the following properties:
* 1. `t1` < `t2` implies timeuuid_type->less(timeuuid_type->decompose(generate_timeuuid(`t1`)),
* timeuuid_type->decompose(generate_timeuuid(`t2`))),
* 2. utils::UUID_gen::micros_timestamp(generate_timeuuid(`t`)) == `t`.
*
* If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`),
* with unspecified nondeterministic ordering.
*/
utils::UUID generate_timeuuid(api::timestamp_type t) {
return utils::UUID_gen::get_random_time_UUID_from_micros(t);
}
class transformer final : public change_processor {
private:
db_context _ctx;
schema_ptr _schema;
dht::decorated_key _dk;
schema_ptr _log_schema;
const column_definition& _op_col;
const column_definition& _ttl_col;
ttl_opt _cdc_ttl_opt;
/**
* #6070, #6084
* Non-atomic column assignments which use a TTL are broken into two invocations
* of `transform`, such as in the following example:
* CREATE TABLE t (a int PRIMARY KEY, b map<int, int>) WITH cdc = {'enabled':true};
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
*
* The above UPDATE creates a tombstone and a (0, 0) cell; because tombstones don't have the notion
* of a TTL, we split the UPDATE into two separate changes (represented as two separate delta rows in the log,
* resulting in two invocations of `transform`): one change for the deletion with no TTL,
* and one change for adding cells with TTL = 5.
*
* In other words, we use the fact that
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
* is equivalent to
* BEGIN UNLOGGED BATCH
* UPDATE t SET b = null WHERE a = 0;
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
* APPLY BATCH;
* (the mutations are the same in both cases),
* and perform a separate `transform` call for each statement in the batch.
*
* An assignment also happens when an INSERT statement is used as follows:
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
*
* This will be split into three separate changes (three invocations of `transform`):
* 1. One with TTL = 5 for the row marker (introduces by the INSERT), indicating that a row was inserted.
* 2. One without a TTL for the tombstone, indicating that the collection was cleared.
* 3. One with TTL = 5 for the addition of cell (0, 0), indicating that the collection
* was extended by a new key/value.
*
* Why do we need three changes and not two, like in the UPDATE case?
* The tombstone needs to be a separate change because it doesn't have a TTL,
* so only the row marker change could potentially be merged with the cell change (1 and 3 above).
* However, we cannot do that: the row marker change is of INSERT type (cdc$operation == cdc::operation::insert),
* but there is no way to create a statement that
* - has a row marker,
* - adds cells to a collection,
* - but *doesn't* add a tombstone for this collection.
* INSERT statements that modify collections *always* add tombstones.
*
* Merging the row marker with the cell addition would result in such an impossible statement.
*
* Instead, we observe that
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
* is equivalent to
* BEGIN UNLOGGED BATCH
* INSERT INTO t (a) VALUES (0) USING TTL 5;
* UPDATE t SET b = null WHERE a = 0;
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
* APPLY BATCH;
* and perform a separate `transform` call for each statement in the batch.
*
* Unfortunately, due to splitting, the cell addition call (b + b {0:0}) does not know about the tombstone.
* If it was performed independently from the tombstone call, it would create a wrong post-image:
* the post-image would look as if the previous cells still existed.
* For example, suppose that b was equal to {1:1} before the above statement was performed.
* Then the final post-image for b for above statement/batch would be {0:0, 1:1}, when instead it should be {0:0}.
*
* To handle this we use the fact that
* 1. changes without a TTL are treated as if TTL = 0,
* 2. `transform` is invoked in order of increasing TTLs,
* and we maintain state between `transform` invocations (`_clustering_row_states`, `_static_row_state`).
*
* Thus, the tombstone call will happen *before* the cell addition call,
* so the cell addition call will know that there previously was a tombstone
* and create a correct post-image.
*
* Furthermore, `transform` calls for INSERT changes (i.e. with a row marker)
* happen before `transform` calls for UPDATE changes, so in the case of an INSERT
* which modifies a collection column as above, the row marker call will happen first;
* its post-image will still show {1:1} for the collection column. Good.
*/
stats::part_type_set _touched_parts;
using cell_map = std::unordered_map<const column_definition*, bytes_opt>;
std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality> _clustering_row_states;
cell_map _static_row_state;
std::vector<mutation> _result_mutations;
// Keeps the next cdc$batch_seq_no value
int _batch_no = -1;
// The timestamp of changes being currently processed
api::timestamp_type _ts;
// The cdc$time value of changes being currently processed
bytes _tuuid;
// When enabled, process_change will update _clustering_row_states and _static_row_state
bool _enable_updating_state = false;
mutation& current_mutation() {
assert(!_result_mutations.empty());
return _result_mutations.back();
}
clustering_key allocate_new_log_row() {
assert(_batch_no != -1);
auto& m = current_mutation();
const auto log_ck = clustering_key::from_exploded(
*m.schema(), { _tuuid, int32_type->decompose(_batch_no++) });
auto pk_value = _dk.key().explode(*_schema);
size_t pos = 0;
for (const auto& column : _schema->partition_key_columns()) {
assert (pos < pk_value.size());
auto cdef = m.schema()->get_column_definition(log_data_column_name_bytes(column.name()));
auto value = atomic_cell::make_live(*column.type,
_ts,
bytes_view(pk_value[pos]),
_cdc_ttl_opt);
m.set_cell(log_ck, *cdef, std::move(value));
++pos;
}
return log_ck;
}
void set_operation(const clustering_key& ck, operation op) {
auto& m = current_mutation();
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
}
void set_ttl(const clustering_key& ck, gc_clock::duration ttl) {
auto& m = current_mutation();
m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt));
}
// Remove non-key columns or entire delta rows, according to `delta` setting in `cdc` options.
void adjust_or_delete_deltas() {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::full) {
return;
}
static const auto preimg_op_bytes = _op_col.type->decompose(operation_native_type(operation::pre_image));
static const auto postimg_op_bytes = _op_col.type->decompose(operation_native_type(operation::post_image));
for (auto& m : _result_mutations) {
auto& clustered_rows = m.partition().clustered_rows();
int deleted_rows_cnt = 0;
for (auto it = clustered_rows.begin(); it != clustered_rows.end(); /* no increment */) {
const auto& op_cell = it->row().cells().cell_at(_op_col.id).as_atomic_cell(_op_col);
const auto op_val = op_cell.value().linearize();
if (op_val != preimg_op_bytes && op_val != postimg_op_bytes) {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::off) {
it = m.partition().clustered_rows().erase_and_dispose(it, current_deleter<rows_entry>());
++deleted_rows_cnt;
continue;
}
// The case of `get_delta_mode() == delta_mode::keys`:
it->row().cells().remove_if([this, log_s = m.schema()] (column_id id, atomic_cell_or_collection& acoc) {
const auto& log_cdef = log_s->column_at(column_kind::regular_column, id);
// We can surely remove "cdc$*" columns.
if (is_cdc_metacolumn_name(log_cdef.name_as_text())) {
return true;
}
const auto* base_cdef = _schema->get_column_definition(log_cdef.name());
// Remove columns from delta that correspond to non-PK/CK columns in the base table.
return base_cdef != nullptr
&& (base_cdef->kind != column_kind::partition_key && base_cdef->kind != column_kind::clustering_key);
});
}
// Deletion of deltas might leave gaps in `batch_seq_no` - let's fix them.
if (deleted_rows_cnt > 0) {
const auto* batch_seq_no_cdef = m.schema()->get_column_definition(log_meta_column_name_bytes("batch_seq_no"));
assert(batch_seq_no_cdef != nullptr);
auto exploded_ck = it->key().explode();
const size_t batch_seq_no_idx = batch_seq_no_cdef->component_index();
const auto old_batch_seq_no = value_cast<int32_t>(int32_type->deserialize(exploded_ck[batch_seq_no_idx]));
exploded_ck[batch_seq_no_idx] = int32_type->decompose(old_batch_seq_no - deleted_rows_cnt);
it->key() = clustering_key::from_exploded(std::move(exploded_ck));
}
++it;
}
}
}
public:
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk)
: _ctx(ctx)
, _schema(std::move(s))
, _dk(std::move(dk))
, _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _op_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("operation")))
, _ttl_col(*_log_schema->get_column_definition(log_meta_column_name_bytes("ttl")))
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
{
if (_schema->cdc_options().ttl()) {
_cdc_ttl_opt = std::chrono::seconds(_schema->cdc_options().ttl());
}
}
static size_t collection_size(const bytes_opt& bo) {
if (bo) {
bytes_view bv(*bo);
return read_collection_size(bv, cql_serialization_format::internal());
}
return 0;
}
template<typename Func>
static void udt_for_each(const bytes_opt& bo, Func&& f) {
if (bo) {
bytes_view bv(*bo);
std::for_each(tuple_deserializing_iterator::start(bv), tuple_deserializing_iterator::finish(bv), std::forward<Func>(f));
}
}
static bytes merge(const collection_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
std::vector<std::pair<bytes_view, bytes_view>> res;
res.reserve(collection_size(prev) + collection_size(next));
auto type = ctype.name_comparator();
auto cmp = [&type = *type](const std::pair<bytes_view, bytes_view>& p1, const std::pair<bytes_view, bytes_view>& p2) {
return type.compare(p1.first, p2.first) < 0;
};
collection_iterator<std::pair<bytes_view, bytes_view>> e, i(prev), j(next);
// note order: set_union, when finding doubles, use value from first1 (j here). So
// since this is next, it has prio
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator<bytes_view>(deleted)), cmp);
return map_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal());
}
static bytes merge(const set_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
std::vector<bytes_view> res;
res.reserve(collection_size(prev) + collection_size(next));
auto type = ctype.name_comparator();
auto cmp = [&type = *type](bytes_view k1, bytes_view k2) {
return type.compare(k1, k2) < 0;
};
collection_iterator<bytes_view> e, i(prev), j(next), d(deleted);
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, d), cmp);
return set_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal());
}
static bytes merge(const user_type_impl& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
std::vector<bytes_view_opt> res(type.size());
udt_for_each(prev, [&res, i = res.begin()](bytes_view_opt k) mutable {
*i++ = k;
});
udt_for_each(next, [&res, i = res.begin()](bytes_view_opt k) mutable {
if (k) {
*i = k;
}
++i;
});
collection_iterator<bytes_view> e, d(deleted);
std::for_each(d, e, [&res](bytes_view k) {
auto index = deserialize_field_index(k);
res[index] = std::nullopt;
});
return type.build_value(res);
}
static bytes merge(const abstract_type& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
}
void begin_timestamp(api::timestamp_type ts, bool is_last) override {
const auto stream_id = _ctx._cdc_metadata.get_stream(ts, _dk.token());
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
_batch_no = 0;
_ts = ts;
_tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage());
}
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
generate_image(operation::pre_image, ck, &columns_to_include);
};
void produce_postimage(const clustering_key* ck) override {
generate_image(operation::post_image, ck, nullptr);
}
void generate_image(operation op, const clustering_key* ck, const one_kind_column_set* affected_columns) {
assert(op == operation::pre_image || op == operation::post_image);
auto& res = current_mutation();
auto image_ck = allocate_new_log_row();
set_operation(image_ck, op);
if (ck) {
// set clustering columns
// TODO: Move this to a separate function
auto ck_value = ck->explode(*_schema);
size_t pos = 0;
for (const auto& column : _schema->clustering_key_columns()) {
assert (pos < ck_value.size());
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
res.set_cell(image_ck, *cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
++pos;
}
}
const auto kind = ck ? column_kind::regular_column : column_kind::static_column;
cell_map* row_state;
if (ck) {
row_state = get_current_clustering_row_state(*ck);
if (!row_state) {
// We have no data for this row, we can stop here
return;
}
} else {
row_state = &_static_row_state;
}
auto process_cell = [&, this] (const column_definition& cdef) {
if (auto current = get_col_from_row_state(row_state, cdef)) {
auto log_cdef = _log_schema->get_column_definition(log_data_column_name_bytes(cdef.name()));
res.set_cell(image_ck, *log_cdef, atomic_cell::make_live(*cdef.type, _ts, *current, _cdc_ttl_opt));
}
};
if (affected_columns) {
// Preimage case - include only data from requested columns
for (auto it = affected_columns->find_first(); it != one_kind_column_set::npos; it = affected_columns->find_next(it)) {
const auto id = column_id(it);
const auto& cdef = _schema->column_at(kind, id);
process_cell(cdef);
}
} else {
// Postimage case - include data from all columns
const auto column_range = _schema->columns(kind);
std::for_each(column_range.begin(), column_range.end(), process_cell);
}
}
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
void process_change(const mutation& m) override {
mutation& res = current_mutation();
auto& p = m.partition();
if (p.partition_tombstone()) {
// Partition deletion
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
auto log_ck = allocate_new_log_row();
set_operation(log_ck, operation::partition_delete);
} else if (!p.row_tombstones().empty()) {
// range deletion
_touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
for (auto& rt : p.row_tombstones()) {
auto set_bound = [&] (const clustering_key& log_ck, const clustering_key_prefix& ckp) {
auto exploded = ckp.explode(*_schema);
size_t pos = 0;
for (const auto& column : _schema->clustering_key_columns()) {
if (pos >= exploded.size()) {
break;
}
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
auto value = atomic_cell::make_live(*column.type,
_ts,
bytes_view(exploded[pos]),
_cdc_ttl_opt);
res.set_cell(log_ck, *cdef, std::move(value));
++pos;
}
};
{
auto log_ck = allocate_new_log_row();
set_bound(log_ck, rt.start);
const auto start_operation = rt.start_kind == bound_kind::incl_start
? operation::range_delete_start_inclusive
: operation::range_delete_start_exclusive;
set_operation(log_ck, start_operation);
}
{
auto log_ck = allocate_new_log_row();
set_bound(log_ck, rt.end);
const auto end_operation = rt.end_kind == bound_kind::incl_end
? operation::range_delete_end_inclusive
: operation::range_delete_end_exclusive;
set_operation(log_ck, end_operation);
}
}
} else {
// should be insert, update or deletion
auto process_cells = [&](const row& r, column_kind ckind, const clustering_key& log_ck, const clustering_key* base_ck, cell_map* row_state) -> std::optional<gc_clock::duration> {
std::optional<gc_clock::duration> ttl;
std::unordered_set<column_id> columns_assigned;
r.for_each_cell([&](column_id id, const atomic_cell_or_collection& cell) {
auto& cdef = _schema->column_at(ckind, id);
auto* dst = _log_schema->get_column_definition(log_data_column_name_bytes(cdef.name()));
bool is_column_delete = true;
bytes_opt value;
bytes_opt deleted_elements = std::nullopt;
if (cdef.is_atomic()) {
value = std::nullopt;
auto view = cell.as_atomic_cell(cdef);
if (view.is_live()) {
is_column_delete = false;
value = view.value().linearize();
if (view.is_live_and_has_ttl()) {
ttl = view.ttl();
}
}
} else {
auto mv = cell.as_collection_mutation();
is_column_delete = false;
std::vector<bytes> buf;
value = mv.with_deserialized(*cdef.type, [&](collection_mutation_view_description view) -> bytes_opt {
if (view.tomb) {
// there is a tombstone with timestamp before this mutation.
// this is how a assign collection = <value> is represented.
// for non-atomics, a column delete + values in data column
// simply means "replace values"
is_column_delete = true;
}
auto process_collection = [&](auto value_callback) {
for (auto& [key, value] : view.cells) {
// note: we are assuming that all mutations coming here adhere to
// / are created by the cql machinery or similar, i.e. if we have
// the tombstone above, it preceeds the actual cells, and is in
// fact an "assign" marker. So we only check for explicitly
// dead cells, i.e. null markers.
auto live = value.is_live();
if (!live) {
value_callback(key, bytes_view{}, live);
continue;
}
auto val = value.value().is_fragmented()
? bytes_view{buf.emplace_back(value.value().linearize())}
: value.value().first_fragment()
;
value_callback(key, val, live);
if (value.is_live_and_has_ttl()) {
ttl = value.ttl();
}
}
};
std::vector<bytes_view> deleted;
return visit(*cdef.type, make_visitor(
// maps and lists are just flattened
[&] (const collection_type_impl& type) -> bytes_opt {
_touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP);
std::vector<std::pair<bytes_view, bytes_view>> result;
process_collection([&](const bytes_view& key, const bytes_view& value, bool live) {
if (live) {
result.emplace_back(key, value);
} else {
deleted.emplace_back(key);
}
});
if (!deleted.empty()) {
deleted_elements = set_type_impl::serialize_partially_deserialized_form(deleted, cql_serialization_format::internal());
}
if (result.empty()) {
return std::nullopt;
}
return map_type_impl::serialize_partially_deserialized_form(result, cql_serialization_format::internal());
},
// set need to transform from mutation view
[&] (const set_type_impl& type) -> bytes_opt {
_touched_parts.set<stats::part_type::SET>();
std::vector<bytes_view> result;
process_collection([&](const bytes_view& key, const bytes_view& value, bool live) {
if (live) {
result.emplace_back(key);
} else {
deleted.emplace_back(key);
}
});
if (!deleted.empty()) {
deleted_elements = set_type_impl::serialize_partially_deserialized_form(deleted, cql_serialization_format::internal());
}
if (result.empty()) {
return std::nullopt;
}
return set_type_impl::serialize_partially_deserialized_form(result, cql_serialization_format::internal());
},
// for user type we collect the fields in the mutation and set to
// tuple of value or tuple of null in case of delete.
// fields not in the mutation are null in the enclosing tuple, signifying "no info"
[&](const user_type_impl& type) -> bytes_opt {
_touched_parts.set<stats::part_type::UDT>();
std::vector<bytes_opt> result(type.size());
process_collection([&](const bytes_view& key, const bytes_view& value, bool live) {
if (live) {
auto idx = deserialize_field_index(key);
result[idx].emplace(value);
} else {
deleted.emplace_back(key);
}
});
if (!deleted.empty()) {
deleted_elements = set_type_impl::serialize_partially_deserialized_form(deleted, cql_serialization_format::internal());
}
if (result.empty()) {
return std::nullopt;
}
return type.build_value(result);
},
[&] (const abstract_type& o) -> bytes_opt {
throw std::runtime_error(format("cdc transform: unknown type {}", o.name()));
}
));
});
if (deleted_elements) {
auto* dc = _log_schema->get_column_definition(log_data_column_deleted_elements_name_bytes(cdef.name()));
res.set_cell(log_ck, *dc, atomic_cell::make_live(*dc->type, _ts, *deleted_elements, _cdc_ttl_opt));
}
}
if (is_column_delete) {
res.set_cell(log_ck, log_data_column_deleted_name_bytes(cdef.name()), data_value(true), _ts, _cdc_ttl_opt);
}
if (value) {
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _ts, *value, _cdc_ttl_opt));
}
if (_enable_updating_state) {
// don't merge with pre-image iff column delete
bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(row_state, cdef);
bytes_opt next;
if (cdef.is_atomic() && !is_column_delete && value) {
next = std::move(value);
} else if (!cdef.is_atomic() && (value || (deleted_elements && prev))) {
next = visit(*cdef.type, [&] (const auto& type) -> bytes {
return merge(type, prev, value, deleted_elements);
});
}
if (!row_state) {
// static row always has a valid state, so this must be
// a clustering row missing
assert(base_ck);
auto [it, inserted] = _clustering_row_states.try_emplace(*base_ck);
row_state = &it->second;
}
(*row_state)[&cdef] = std::move(next);
}
});
return ttl;
};
if (!p.static_row().empty()) {
_touched_parts.set<stats::part_type::STATIC_ROW>();
auto log_ck = allocate_new_log_row();
auto ttl = process_cells(p.static_row().get(), column_kind::static_column, log_ck, nullptr, &_static_row_state);
set_operation(log_ck, operation::update);
if (ttl) {
set_ttl(log_ck, *ttl);
}
} else {
_touched_parts.set_if<stats::part_type::CLUSTERING_ROW>(!p.clustered_rows().empty());
for (const rows_entry& r : p.clustered_rows()) {
auto* row_state = get_current_clustering_row_state(r.key());
auto ck_value = r.key().explode(*_schema);
auto log_ck = allocate_new_log_row();
size_t pos = 0;
for (const auto& column : _schema->clustering_key_columns()) {
assert (pos < ck_value.size());
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
++pos;
}
operation cdc_op;
if (r.row().deleted_at()) {
_touched_parts.set<stats::part_type::ROW_DELETE>();
cdc_op = operation::row_delete;
if (_enable_updating_state && row_state) {
// Erase so that postimage will be empty
_clustering_row_states.erase(r.key());
}
} else {
auto ttl = process_cells(r.row().cells(), column_kind::regular_column, log_ck, &r.key(), row_state);
const auto& marker = r.row().marker();
if (marker.is_live() && marker.is_expiring()) {
ttl = marker.ttl();
}
cdc_op = marker.is_live() ? operation::insert : operation::update;
if (ttl) {
set_ttl(log_ck, *ttl);
}
}
set_operation(log_ck, cdc_op);
}
}
}
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
adjust_or_delete_deltas();
return std::make_pair<std::vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
}
bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) const {
if (state) {
if (auto it = state->find(&cdef); it != state->end()) {
return it->second;
}
}
return std::nullopt;
}
cell_map* get_current_clustering_row_state(const clustering_key& ck) {
auto it = _clustering_row_states.find(ck);
return it == _clustering_row_states.end() ? nullptr : &it->second;
}
bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
if (!pirow || !pirow->has(cdef.name_as_text())) {
return std::nullopt;
}
return cdef.is_atomic()
? pirow->get_blob(cdef.name_as_text())
: visit(*cdef.type, make_visitor(
// flatten set
[&] (const set_type_impl& type) {
auto v = pirow->get_view(cdef.name_as_text());
auto f = cql_serialization_format::internal();
auto n = read_collection_size(v, f);
std::vector<bytes_view> tmp;
tmp.reserve(n);
while (n--) {
tmp.emplace_back(read_collection_value(v, f)); // key
read_collection_value(v, f); // value. ignore.
}
return set_type_impl::serialize_partially_deserialized_form(tmp, f);
},
[&] (const abstract_type& o) -> bytes {
return pirow->get_blob(cdef.name_as_text());
}
));
}
static db::timeout_clock::time_point default_timeout() {
return db::timeout_clock::now() + 10s;
}
future<lw_shared_ptr<cql3::untyped_result_set>> pre_image_select(
service::client_state& client_state,
db::consistency_level write_cl,
const mutation& m)
{
auto& p = m.partition();
if (p.clustered_rows().empty() && p.static_row().empty()) {
return make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>();
}
dht::partition_range_vector partition_ranges{dht::partition_range(m.decorated_key())};
auto&& pc = _schema->partition_key_columns();
auto&& cc = _schema->clustering_key_columns();
std::vector<query::clustering_range> bounds;
uint32_t row_limit = query::max_rows;
const bool has_only_static_row = !p.static_row().empty() && p.clustered_rows().empty();
if (cc.empty() || has_only_static_row) {
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
if (has_only_static_row) {
row_limit = 1;
}
} else {
for (const rows_entry& r : p.clustered_rows()) {
auto& ck = r.key();
bounds.push_back(query::clustering_range::make_singular(ck));
}
}
std::vector<const column_definition*> columns;
columns.reserve(_schema->all_columns().size());
std::transform(pc.begin(), pc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
std::transform(cc.begin(), cc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
query::column_id_vector static_columns, regular_columns;
// TODO: this assumes all mutations touch the same set of columns. This might not be true, and we may need to do more horrible set operation here.
if (!p.static_row().empty()) {
// for postimage we need everything...
if (_schema->cdc_options().postimage()) {
for (const column_definition& c: _schema->static_columns()) {
static_columns.emplace_back(c.id);
columns.emplace_back(&c);
}
} else {
p.static_row().get().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
auto& cdef =_schema->column_at(column_kind::static_column, id);
static_columns.emplace_back(id);
columns.emplace_back(&cdef);
});
}
}
if (!p.clustered_rows().empty()) {
const bool has_row_delete = std::any_of(p.clustered_rows().begin(), p.clustered_rows().end(), [] (const rows_entry& re) {
return re.row().deleted_at();
});
// for postimage we need everything...
if (has_row_delete || _schema->cdc_options().postimage()) {
for (const column_definition& c: _schema->regular_columns()) {
regular_columns.emplace_back(c.id);
columns.emplace_back(&c);
}
} else {
p.clustered_rows().begin()->row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
const auto& cdef =_schema->column_at(column_kind::regular_column, id);
regular_columns.emplace_back(id);
columns.emplace_back(&cdef);
});
}
}
auto selection = cql3::selection::selection::for_columns(_schema, std::move(columns));
auto opts = selection->get_query_options();
opts.set(query::partition_slice::option::collections_as_maps);
opts.set_if<query::partition_slice::option::always_return_static_content>(!p.static_row().empty());
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts));
const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice);
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), query::row_limit(row_limit));
const auto select_cl = adjust_cl(write_cl);
try {
return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), select_cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then(
[s = _schema, partition_slice = std::move(partition_slice), selection = std::move(selection)] (service::storage_proxy::coordinator_query_result qr) -> lw_shared_ptr<cql3::untyped_result_set> {
cql3::selection::result_set_builder builder(*selection, gc_clock::now(), cql_serialization_format::latest());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *s, *selection));
auto result_set = builder.build();
if (!result_set || result_set->empty()) {
return {};
}
return make_lw_shared<cql3::untyped_result_set>(*result_set);
});
} catch (exceptions::unavailable_exception& e) {
// `query` can throw `unavailable_exception`, which is seen by clients as ~ "NoHostAvailable".
// So, we'll translate it to a `read_failure_exception` with custom message.
cdc_log.debug("Preimage: translating a (read) `unavailable_exception` to `request_execution_exception` - {}", e);
throw exceptions::read_failure_exception("CDC preimage query could not achieve the CL.",
e.consistency, e.alive, 0, e.required, false);
}
}
// Note: this assumes that the results are from one partition only
void load_preimage_results_into_state(lw_shared_ptr<cql3::untyped_result_set> preimage_set, bool static_only) {
// static row
if (!preimage_set->empty()) {
// There may be some static row data
const auto& row = preimage_set->front();
for (auto& c : _schema->static_columns()) {
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
_static_row_state[&c] = *maybe_cell_view;
}
}
}
if (static_only) {
return;
}
// clustering rows
for (const auto& row : *preimage_set) {
// Construct the clustering key for this row
std::vector<bytes> ck_parts;
ck_parts.reserve(_schema->clustering_key_size());
for (auto& c : _schema->clustering_key_columns()) {
auto v = row.get_view_opt(c.name_as_text());
if (!v) {
// We might get here if both of the following conditions are true:
// - In preimage query, we requested the static row and some clustering rows,
// - The partition had some static row data, but did not have any requested clustering rows.
// In such case, the result set will have an artificial row that only contains static columns,
// but no clustering columns. In such case, we can safely return from the function,
// as there will be no clustering row data to load into the state.
return;
}
ck_parts.emplace_back(*v);
}
auto ck = clustering_key::from_exploded(std::move(ck_parts));
// Collect regular rows
cell_map cells;
for (auto& c : _schema->regular_columns()) {
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
cells[&c] = *maybe_cell_view;
}
}
_clustering_row_states.insert_or_assign(std::move(ck), std::move(cells));
}
}
/** For preimage query use the same CL as for base write, except for CLs ANY and ALL. */
static db::consistency_level adjust_cl(db::consistency_level write_cl) {
if (write_cl == db::consistency_level::ANY) {
return db::consistency_level::ONE;
} else if (write_cl == db::consistency_level::ALL || write_cl == db::consistency_level::SERIAL) {
return db::consistency_level::QUORUM;
} else if (write_cl == db::consistency_level::LOCAL_SERIAL) {
return db::consistency_level::LOCAL_QUORUM;
}
return write_cl;
}
};
template <typename Func>
future<std::vector<mutation>>
transform_mutations(std::vector<mutation>& muts, decltype(muts.size()) batch_size, Func&& f) {
return parallel_for_each(
boost::irange(static_cast<decltype(muts.size())>(0), muts.size(), batch_size),
std::forward<Func>(f))
.then([&muts] () mutable { return std::move(muts); });
}
} // namespace cdc
future<std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, std::vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl) {
// we do all this because in the case of batches, we can have mixed schemas.
auto e = mutations.end();
auto i = std::find_if(mutations.begin(), e, [](const mutation& m) {
return m.schema()->cdc_options().enabled();
});
if (i == e) {
return make_ready_future<std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), lw_shared_ptr<cdc::operation_result_tracker>()));
}
tracing::trace(tr_state, "CDC: Started generating mutations for log rows");
mutations.reserve(2 * mutations.size());
return do_with(std::move(mutations), service::query_state(service::client_state::for_internal_calls(), empty_service_permit()), operation_details{},
[this, timeout, i, tr_state = std::move(tr_state), write_cl] (std::vector<mutation>& mutations, service::query_state& qs, operation_details& details) {
return transform_mutations(mutations, 1, [this, &mutations, timeout, &qs, tr_state = tr_state, &details, write_cl] (int idx) mutable {
auto& m = mutations[idx];
auto s = m.schema();
if (!s->cdc_options().enabled()) {
return make_ready_future<>();
}
transformer trans(_ctxt, s, m.decorated_key());
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
if (s->cdc_options().preimage() || s->cdc_options().postimage()) {
// Note: further improvement here would be to coalesce the pre-image selects into one
// iff a batch contains several modifications to the same table. Otoh, batch is rare(?)
// so this is premature.
tracing::trace(tr_state, "CDC: Selecting preimage for {}", m.decorated_key());
f = trans.pre_image_select(qs.get_client_state(), write_cl, m).then_wrapped([this] (future<lw_shared_ptr<cql3::untyped_result_set>> f) {
auto& cdc_stats = _ctxt._proxy.get_cdc_stats();
cdc_stats.counters_total.preimage_selects++;
if (f.failed()) {
cdc_stats.counters_failed.preimage_selects++;
}
return f;
});
} else {
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
}
return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
if (rs) {
const auto& p = m.partition();
const bool static_only = !p.static_row().empty() && p.clustered_rows().empty();
trans.load_preimage_results_into_state(std::move(rs), static_only);
}
const bool preimage = s->cdc_options().preimage();
const bool postimage = s->cdc_options().postimage();
details.had_preimage |= preimage;
details.had_postimage |= postimage;
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
if (should_split(m)) {
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
details.was_split = true;
process_changes_with_splitting(m, trans, preimage, postimage);
} else {
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
process_changes_without_splitting(m, trans, preimage, postimage);
}
auto [log_mut, touched_parts] = std::move(trans).finish();
const int generated_count = log_mut.size();
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
// `m` might be invalidated at this point because of the push_back to the vector
tracing::trace(tr_state, "CDC: Generated {} log mutations from {}", generated_count, mutations[idx].decorated_key());
details.touched_parts.add(touched_parts);
});
}).then([this, tr_state, &details](std::vector<mutation> mutations) {
tracing::trace(tr_state, "CDC: Finished generating all log mutations");
auto tracker = make_lw_shared<cdc::operation_result_tracker>(_ctxt._proxy.get_cdc_stats(), details);
return make_ready_future<std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), std::move(tracker)));
});
});
}
bool cdc::cdc_service::needs_cdc_augmentation(const std::vector<mutation>& mutations) const {
return std::any_of(mutations.begin(), mutations.end(), [](const mutation& m) {
return m.schema()->cdc_options().enabled();
});
}
future<std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
cdc::cdc_service::augment_mutation_call(lowres_clock::time_point timeout, std::vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl) {
return _impl->augment_mutation_call(timeout, std::move(mutations), std::move(tr_state), write_cl);
}