Files
scylladb/cdc/log.cc
Botond Dénes 5998a859f7 tombstone_gc: allow use of repair-mode for RF=1 tables
Modify the methods which calculate the default gc mode as well as that
which validates whether repair-mode can be used at all, so both accepts
use of repair-mode on RF=1 tables.

This de-facto changes the default tombstone-gc to repair-mode for all
tables. Documentation is updated accordingly.

Some tests need adjusting:
* cqlpy/test_select_from_mutation_fragments.py: disable GC for some test
  cases because this patch makes tombstones they write subject to GC
  when using defaults.
* test/cluster/test_mv.py::test_mv_tombstone_gc_not_inherited used
  repair-mode as a non-default for the base table and expected the MV to
  revert to default. Another mode has to be used as the non-default
  (immediate).
* test/cqlpy/test_tools.py::test_scylla_sstable_dump_schema: don't
  compare tombstone_gc schema extension when comparing dumped schema vs.
  original. The tool's schema loader doesn't have access to the keyspace
  definition so it will come up with different defaults for
  tombstone-gc.
* test/boost/row_cache_test.cc::test_populating_cache_with_expired_and_nonexpired_tombstones
  sets tombstone expiry assuming the tombstone-gc timeout-mode default.
  Change the CREATE TABLE statement to set the expected mode.
2026-03-04 09:44:24 +02:00

2070 lines
90 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <utility>
#include <algorithm>
#include <boost/range/irange.hpp>
#include <seastar/core/thread.hh>
#include <seastar/core/metrics.hh>
#include "cdc/log.hh"
#include "cdc/generation.hh"
#include "cdc/split.hh"
#include "cdc/cdc_options.hh"
#include "cdc/change_visitor.hh"
#include "cdc/metadata.hh"
#include "cdc/cdc_partitioner.hh"
#include "bytes.hh"
#include "index/vector_index.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "gms/feature_service.hh"
#include "schema/schema.hh"
#include "schema/schema_builder.hh"
#include "service/migration_listener.hh"
#include "service/storage_proxy.hh"
#include "tombstone_gc_extension.hh"
#include "types/tuple.hh"
#include "cql3/statements/select_statement.hh"
#include "cql3/untyped_result_set.hh"
#include "log.hh"
#include "utils/assert.hh"
#include "utils/rjson.hh"
#include "utils/UUID_gen.hh"
#include "utils/managed_bytes.hh"
#include "types/types.hh"
#include "types/concrete_types.hh"
#include "types/listlike_partial_deserializing_iterator.hh"
#include "tracing/trace_state.hh"
#include "stats.hh"
#include "utils/labels.hh"
namespace std {
template<> struct hash<std::pair<net::inet_address, unsigned int>> {
std::size_t operator()(const std::pair<net::inet_address, unsigned int> &p) const {
return std::hash<net::inet_address>{}(p.first) ^ std::hash<int>{}(p.second);
}
};
}
using namespace std::chrono_literals;
logging::logger cdc_log("cdc");
namespace {
shared_ptr<locator::abstract_replication_strategy> generate_replication_strategy(const keyspace_metadata& ksm, const locator::topology& topo) {
locator::replication_strategy_params params(ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option());
return locator::abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, topo);
}
// When dropping a column from a CDC log table, we set the drop timestamp
// `column_drop_leeway` seconds into the future to ensure that for writes concurrent
// with column drop, the write timestamp is before the column drop timestamp.
constexpr auto column_drop_leeway = std::chrono::seconds(5);
} // anonymous namespace
namespace cdc {
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&, api::timestamp_type,
std::optional<table_id> = {}, schema_ptr = nullptr);
}
static constexpr auto cdc_group_name = "cdc";
void cdc::stats::parts_touched_stats::register_metrics(seastar::metrics::metric_groups& metrics, std::string_view suffix) {
namespace sm = seastar::metrics;
auto register_part = [&] (part_type part, sstring part_name) {
metrics.add_group(cdc_group_name, {
sm::make_total_operations(seastar::format("operations_on_{}_performed_{}", part_name, suffix), count[(size_t)part],
sm::description(seastar::format("number of {} CDC operations that processed a {}", suffix, part_name)),
{cdc_label}).set_skip_when_empty()
});
};
register_part(part_type::STATIC_ROW, "static_row");
register_part(part_type::CLUSTERING_ROW, "clustering_row");
register_part(part_type::MAP, "map");
register_part(part_type::SET, "set");
register_part(part_type::LIST, "list");
register_part(part_type::UDT, "udt");
register_part(part_type::RANGE_TOMBSTONE, "range_tombstone");
register_part(part_type::PARTITION_DELETE, "partition_delete");
register_part(part_type::ROW_DELETE, "row_delete");
}
cdc::stats::stats() {
namespace sm = seastar::metrics;
auto register_counters = [this] (counters& counters, sstring kind) {
const auto split_label = sm::label("split");
_metrics.add_group(cdc_group_name, {
sm::make_total_operations("operations_" + kind, counters.unsplit_count,
sm::description(format("number of {} CDC operations", kind)),
{split_label(false), basic_level, cdc_label}).set_skip_when_empty(),
sm::make_total_operations("operations_" + kind, counters.split_count,
sm::description(format("number of {} CDC operations", kind)),
{split_label(true), basic_level, cdc_label}).set_skip_when_empty(),
sm::make_total_operations("preimage_selects_" + kind, counters.preimage_selects,
sm::description(format("number of {} preimage queries performed", kind)),
{cdc_label}).set_skip_when_empty(),
sm::make_total_operations("operations_with_preimage_" + kind, counters.with_preimage_count,
sm::description(format("number of {} operations that included preimage", kind)),
{cdc_label}).set_skip_when_empty(),
sm::make_total_operations("operations_with_postimage_" + kind, counters.with_postimage_count,
sm::description(format("number of {} operations that included postimage", kind)),
{cdc_label}).set_skip_when_empty()
});
counters.touches.register_metrics(_metrics, kind);
};
register_counters(counters_total, "total");
register_counters(counters_failed, "failed");
}
cdc::operation_result_tracker::~operation_result_tracker() {
auto update_stats = [this] (stats::counters& counters) {
if (_details.was_split) {
counters.split_count++;
} else {
counters.unsplit_count++;
}
counters.touches.apply(_details.touched_parts);
if (_details.had_preimage) {
counters.with_preimage_count++;
}
if (_details.had_postimage) {
counters.with_postimage_count++;
}
};
update_stats(_stats.counters_total);
if (_failed) {
update_stats(_stats.counters_failed);
}
}
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
friend cdc_service;
db_context _ctxt;
bool _stopped = false;
public:
impl(db_context ctxt)
: _ctxt(std::move(ctxt))
{
_ctxt._migration_notifier.register_listener(this);
}
~impl() {
SCYLLA_ASSERT(_stopped);
}
future<> stop() {
return _ctxt._migration_notifier.unregister_listener(this).then([this] {
_stopped = true;
});
}
virtual void on_before_allocate_tablet_map(const locator::tablet_map& map, const schema& s, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
if (!is_log_schema(s)) {
return;
}
auto stream_ts = db_clock::now() - duration_cast<std::chrono::milliseconds>(get_generation_leeway());
auto mut = create_table_streams_mutation(s.id(), stream_ts, map, ts).get();
muts.emplace_back(std::move(mut));
}
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type ts) override {
std::vector<schema_ptr> new_cfms;
for (auto sp : cfms) {
const auto& schema = *sp;
if (!schema.cdc_options().enabled()) {
continue;
}
auto& db = _ctxt._proxy.get_db().local();
auto logname = log_name(schema.cf_name());
check_that_cdc_log_table_does_not_exist(db, schema, logname);
ensure_that_table_has_no_counter_columns(schema);
if (!db.features().cdc_with_tablets) {
ensure_that_table_uses_vnodes(ksm, schema, db.get_token_metadata().get_topology());
}
// in seastar thread
auto log_schema = create_log_schema(schema, db, ksm, ts);
new_cfms.push_back(std::move(log_schema));
}
cfms.insert(cfms.end(), std::make_move_iterator(new_cfms.begin()), std::make_move_iterator(new_cfms.end()));
}
void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
bool has_vector_index = secondary_index::vector_index::has_vector_index(new_schema);
if (has_vector_index) {
// If we have a vector index, we need to ensure that the CDC log is created
// satisfying the minimal requirements of Vector Search.
secondary_index::vector_index::check_cdc_options(new_schema);
}
bool is_cdc = cdc_enabled(new_schema);
bool was_cdc = cdc_enabled(old_schema);
// if we are turning off cdc we can skip this, since even if columns change etc,
// any writer should see cdc -> off together with any actual schema changes to
// base table, so should never try to write to non-existent log column etc.
// note that if user has set ttl=0 in cdc options, he is still responsible
// for emptying the log.
if (is_cdc) {
auto& db = _ctxt._proxy.get_db().local();
auto logname = log_name(old_schema.cf_name());
auto& keyspace = db.find_keyspace(old_schema.ks_name());
auto has_cdc_log = db.has_schema(old_schema.ks_name(), logname);
auto log_schema = has_cdc_log ? db.find_schema(old_schema.ks_name(), logname) : nullptr;
if (!was_cdc && has_cdc_log) {
// make sure the apparent log table really is a cdc log (not user table)
// we just check the partitioner - since user tables should _not_ be able
// set/use this.
if (!is_log_schema(*log_schema)) {
// will throw
check_that_cdc_log_table_does_not_exist(db, old_schema, logname);
}
}
check_for_attempt_to_create_nested_cdc_log(db, new_schema);
ensure_that_table_has_no_counter_columns(new_schema);
if (!db.features().cdc_with_tablets) {
ensure_that_table_uses_vnodes(*keyspace.metadata(), new_schema, db.get_token_metadata().get_topology());
}
std::optional<table_id> maybe_id = log_schema ? std::make_optional(log_schema->id()) : std::nullopt;
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), timestamp, std::move(maybe_id), log_schema);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(_ctxt._proxy, keyspace.metadata(), log_schema, new_log_schema, timestamp)
: db::schema_tables::make_create_table_mutations(new_log_schema, timestamp)
;
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
if (!log_schema) {
db.get_notifier().before_create_column_family(*keyspace.metadata(), *new_log_schema, mutations, timestamp);
}
}
}
void on_before_drop_column_family(const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
auto logname = log_name(schema.cf_name());
auto& db = _ctxt._proxy.get_db().local();
auto has_cdc_log = db.has_schema(schema.ks_name(), logname);
if (has_cdc_log) {
auto log_schema = db.find_schema(schema.ks_name(), logname);
if (is_log_schema(*log_schema)) {
auto& keyspace = db.find_keyspace(schema.ks_name());
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
db.get_notifier().before_drop_column_family(*log_schema, mutations, timestamp);
}
}
if (is_log_schema(schema)) {
auto& keyspace = db.find_keyspace(schema.ks_name());
if (keyspace.uses_tablets()) {
// drop cdc streams of this table
auto drop_stream_mut = make_drop_table_streams_mutations(schema.id(), timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(drop_stream_mut.begin()), std::make_move_iterator(drop_stream_mut.end()));
}
}
}
void on_before_drop_keyspace(const sstring& keyspace_name, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) override {
auto& db = _ctxt._proxy.get_db().local();
auto& ks = db.find_keyspace(keyspace_name);
if (ks.uses_tablets()) {
// drop cdc streams for all CDC tables in this keyspace
for (auto&& [name, s] : ks.metadata()->cf_meta_data()) {
seastar::thread::maybe_yield();
if (!is_log_schema(*s)) {
continue;
}
auto drop_stream_mut = make_drop_table_streams_mutations(s->id(), ts);
mutations.insert(mutations.end(), std::make_move_iterator(drop_stream_mut.begin()), std::make_move_iterator(drop_stream_mut.end()));
}
}
}
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>> augment_mutation_call(
lowres_clock::time_point timeout,
utils::chunked_vector<mutation>&& mutations,
tracing::trace_state_ptr tr_state,
db::consistency_level write_cl,
per_request_options options
);
template<typename Iter>
future<> append_mutations(Iter i, Iter e, schema_ptr s, lowres_clock::time_point, utils::chunked_vector<mutation>&);
private:
static void check_for_attempt_to_create_nested_cdc_log(replica::database& db, const schema& schema) {
const auto& cf_name = schema.cf_name();
const auto cf_name_view = std::string_view(cf_name.data(), cf_name.size());
if (is_log_for_some_table(db, schema.ks_name(), cf_name_view)) {
throw exceptions::invalid_request_exception(format("Cannot create a CDC log for a table {}.{}, because creating nested CDC logs is not allowed",
schema.ks_name(), schema.cf_name()));
}
}
static void check_that_cdc_log_table_does_not_exist(replica::database& db, const schema& schema, const sstring& logname) {
if (db.has_schema(schema.ks_name(), logname)) {
throw exceptions::invalid_request_exception(format("Cannot create CDC log table for table {}.{} because a table of name {}.{} already exists",
schema.ks_name(), schema.cf_name(),
schema.ks_name(), logname));
}
}
static void ensure_that_table_has_no_counter_columns(const schema& schema) {
if (schema.is_counter()) {
throw exceptions::invalid_request_exception(format("Cannot create CDC log for table {}.{}. Counter support not implemented",
schema.ks_name(), schema.cf_name()));
}
}
// Until we support CDC with tablets (issue #16317), we can't allow this
// to be attempted - in particular the log table we try to create will not
// have tablets, and will cause a failure.
static void ensure_that_table_uses_vnodes(const keyspace_metadata& ksm, const schema& schema, const locator::topology& topo) {
auto rs = generate_replication_strategy(ksm, topo);
if (rs->uses_tablets()) {
throw exceptions::invalid_request_exception(format("Cannot create CDC log for a table {}.{}, because the keyspace uses tablets, and not all nodes support the CDC with tablets feature.",
schema.ks_name(), schema.cf_name()));
}
}
};
cdc::cdc_service::cdc_service(service::storage_proxy& proxy, cdc::metadata& cdc_metadata, service::migration_notifier& notifier)
: cdc_service(db_context(proxy, cdc_metadata, notifier))
{}
cdc::cdc_service::cdc_service(db_context ctxt)
: _impl(std::make_unique<impl>(std::move(ctxt)))
{
_impl->_ctxt._proxy.set_cdc_service(this);
}
future<> cdc::cdc_service::stop() {
_impl->_ctxt._proxy.set_cdc_service(nullptr);
return _impl->stop();
}
cdc::cdc_service::~cdc_service() = default;
namespace {
static constexpr std::string_view delta_mode_string_keys = "keys";
static constexpr std::string_view delta_mode_string_full = "full";
static constexpr std::string_view image_mode_string_full = delta_mode_string_full;
} // anon. namespace
auto fmt::formatter<cdc::delta_mode>::format(cdc::delta_mode m, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
using enum cdc::delta_mode;
switch (m) {
case keys:
return fmt::format_to(ctx.out(), delta_mode_string_keys);
case full:
return fmt::format_to(ctx.out(), delta_mode_string_full);
}
throw std::logic_error("Impossible value of cdc::delta_mode");
}
auto fmt::formatter<cdc::image_mode>::format(cdc::image_mode m, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
using enum cdc::image_mode;
switch (m) {
case off:
return fmt::format_to(ctx.out(), "false");
case on:
return fmt::format_to(ctx.out(), "true");
break;
case full:
return fmt::format_to(ctx.out(), image_mode_string_full);
}
throw std::logic_error("Impossible value of cdc::image_mode");
}
cdc::options::options(const std::map<sstring, sstring>& map) {
for (auto& p : map) {
auto key = p.first;
auto val = p.second;
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
std::transform(val.begin(), val.end(), val.begin(), ::tolower);
auto is_true = val == "true" || val == "1";
auto is_false = val == "false" || val == "0";
if (key == "enabled") {
if (is_true || is_false) {
enabled(is_true);
} else {
throw exceptions::configuration_exception("Invalid value for CDC option \"enabled\": " + p.second);
}
} else if (key == "preimage") {
if (is_true) {
_preimage = image_mode::on;
} else if (val == image_mode_string_full) {
_preimage = image_mode::full;
} else if (is_false) {
_preimage = image_mode::off;
} else {
throw exceptions::configuration_exception("Invalid value for CDC option \"preimage\": " + p.second);
}
} else if (key == "postimage") {
if (is_true || is_false) {
_postimage = is_true;
} else {
throw exceptions::configuration_exception("Invalid value for CDC option \"postimage\": " + p.second);
}
} else if (key == "delta") {
if (val == delta_mode_string_keys) {
_delta_mode = delta_mode::keys;
} else if (val != delta_mode_string_full) {
throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second);
}
} else if (key == "ttl") {
try {
_ttl = std::stoi(p.second);
} catch (std::invalid_argument& e) {
throw exceptions::configuration_exception("Invalid value for CDC option \"ttl\": " + p.second);
} catch (std::out_of_range& e) {
throw exceptions::configuration_exception("Invalid CDC option: ttl too large");
}
if (_ttl < 0) {
throw exceptions::configuration_exception("Invalid CDC option: ttl must be >= 0");
}
} else {
throw exceptions::configuration_exception("Invalid CDC option: " + p.first);
}
}
}
std::map<sstring, sstring> cdc::options::to_map() const {
if (!is_enabled_set()) {
return {};
}
return {
{ "enabled", enabled() ? "true" : "false" },
{ "preimage", fmt::format("{}", _preimage) },
{ "postimage", _postimage ? "true" : "false" },
{ "delta", fmt::format("{}", _delta_mode) },
{ "ttl", std::to_string(_ttl) },
};
}
sstring cdc::options::to_sstring() const {
return rjson::print(rjson::from_string_map(to_map()));
}
bool cdc::options::operator==(const options& o) const {
return enabled() == o.enabled() && _preimage == o._preimage && _postimage == o._postimage && _ttl == o._ttl
&& _delta_mode == o._delta_mode;
}
namespace cdc {
using operation_native_type = std::underlying_type_t<operation>;
static const sstring cdc_log_suffix = "_scylla_cdc_log";
static const sstring cdc_meta_column_prefix = "cdc$";
static const sstring cdc_deleted_column_prefix = cdc_meta_column_prefix + "deleted_";
static const sstring cdc_deleted_elements_column_prefix = cdc_meta_column_prefix + "deleted_elements_";
bool cdc_enabled(const schema& s) {
return s.cdc_options().enabled() || secondary_index::vector_index::has_vector_index(s);
}
bool is_log_name(const std::string_view& table_name) {
return table_name.ends_with(cdc_log_suffix);
}
bool is_log_schema(const schema& s) {
return s.get_partitioner().name() == cdc::cdc_partitioner::classname;
}
bool is_cdc_metacolumn_name(const sstring& name) {
return name.compare(0, cdc_meta_column_prefix.size(), cdc_meta_column_prefix) == 0;
}
bool is_log_for_some_table(const replica::database& db, const sstring& ks_name, const std::string_view& table_name) {
auto base_schema = get_base_table(db, ks_name, table_name);
if (!base_schema) {
return false;
}
return cdc_enabled(*base_schema);
}
schema_ptr get_base_table(const replica::database& db, const schema& s) {
return get_base_table(db, s.ks_name(), s.cf_name());
}
schema_ptr get_base_table(const replica::database& db, std::string_view ks_name, std::string_view table_name) {
if (!is_log_name(table_name)) {
return nullptr;
}
// Note: It is legal for a user to directly create a table with name
// `X_scylla_cdc_log`. A table with name `X` might be present (but with
// cdc log disabled), or not present at all when creating `X_scylla_cdc_log`.
// Therefore, existence of `X_scylla_cdc_log` does not imply existence of `X`
// and, in order not to throw, we explicitly need to check for existence of 'X'.
const auto table_base_name = base_name(table_name);
if (!db.has_schema(ks_name, table_base_name)) {
return nullptr;
}
return db.find_schema(sstring(ks_name), table_base_name);
}
seastar::sstring base_name(std::string_view log_name) {
SCYLLA_ASSERT(is_log_name(log_name));
return sstring(log_name.data(), log_name.size() - cdc_log_suffix.size());
}
sstring log_name(std::string_view table_name) {
return sstring(table_name) + cdc_log_suffix;
}
sstring log_data_column_name(std::string_view column_name) {
return sstring(column_name);
}
seastar::sstring log_meta_column_name(std::string_view column_name) {
return cdc_meta_column_prefix + sstring(column_name);
}
bytes log_data_column_name_bytes(const bytes& column_name) {
return column_name;
}
bytes log_meta_column_name_bytes(const bytes& column_name) {
return to_bytes(cdc_meta_column_prefix) + column_name;
}
seastar::sstring log_data_column_deleted_name(std::string_view column_name) {
return cdc_deleted_column_prefix + sstring(column_name);
}
bytes log_data_column_deleted_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_column_prefix) + column_name;
}
seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name) {
return cdc_deleted_elements_column_prefix + sstring(column_name);
}
bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static void set_default_properties_log_table(schema_builder& b, const schema& s,
const replica::database& db, const keyspace_metadata& ksm)
{
b.set_compaction_strategy(compaction::compaction_strategy_type::time_window);
b.set_comment(fmt::format("CDC log for {}.{}", s.ks_name(), s.cf_name()));
auto ttl_seconds = s.cdc_options().ttl();
if (ttl_seconds > 0) {
b.set_gc_grace_seconds(0);
auto ceil = [] (int dividend, int divisor) {
return dividend / divisor + (dividend % divisor == 0 ? 0 : 1);
};
auto seconds_to_minutes = [] (int seconds_value) {
using namespace std::chrono;
return std::chrono::ceil<minutes>(seconds(seconds_value)).count();
};
// What's the minimum window that won't create more than 24 sstables.
auto window_seconds = ceil(ttl_seconds, 24);
auto window_minutes = seconds_to_minutes(window_seconds);
b.set_compaction_strategy_options({
{"compaction_window_unit", "MINUTES"},
{"compaction_window_size", std::to_string(window_minutes)},
// A new SSTable will become fully expired every
// `window_seconds` seconds so we shouldn't check for expired
// sstables too often.
{"expired_sstable_check_frequency_seconds",
std::to_string(std::max(1, window_seconds / 2))},
});
}
b.set_caching_options(caching_options::get_disabled_caching_options());
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, false));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
}
static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
const api::timestamp_type timestamp, const schema_ptr old)
{
b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key);
b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
auto validate_new_column = [&] (const sstring& name) {
// When dropping a column from a CDC log table, we set the drop timestamp to be
// `column_drop_leeway` seconds into the future (see `create_log_schema`).
// Therefore, when recreating a column with the same name, we need to validate
// that it's not recreated too soon and that the drop timestamp has passed.
if (old && old->dropped_columns().contains(name)) {
const auto& drop_info = old->dropped_columns().at(name);
auto create_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(timestamp));
auto drop_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(drop_info.timestamp));
if (drop_time > create_time) {
throw exceptions::invalid_request_exception(format("Cannot add column {} because a column with the same name was dropped too recently. Please retry after {} seconds",
name, std::chrono::duration_cast<std::chrono::seconds>(drop_time - create_time).count() + 1));
}
}
};
auto add_column = [&] (sstring name, data_type type) {
validate_new_column(name);
b.with_column(to_bytes(name), type);
};
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
auto type = column.type;
if (type->get_kind() == abstract_type::kind::empty) {
if (!(s.is_dense() && s.regular_columns_count() == 1)) {
on_internal_error(cdc_log, "Unexpected column with EMPTY type");
}
continue;
}
if (is_data_col && type->is_multi_cell()) {
type = visit(*type, make_visitor(
// non-frozen lists are represented as map<timeuuid, value_type>. Otherwise we cannot express delta
[] (const list_type_impl& type) -> data_type {
return map_type_impl::get_instance(type.name_comparator(), type.value_comparator(), false);
},
// everything else is just frozen self
[] (const abstract_type& type) {
return type.freeze();
}
));
}
add_column(log_data_column_name(column.name_as_text()), type);
if (is_data_col) {
add_column(log_data_column_deleted_name(column.name_as_text()), boolean_type);
}
if (column.type->is_multi_cell()) {
auto dtype = visit(*type, make_visitor(
// all collection deletes are set<key_type> (i.e. timeuuid for lists)
[] (const collection_type_impl& type) -> data_type {
return set_type_impl::get_instance(type.name_comparator(), false);
},
// user types deletes are a set of the indices removed
[] (const user_type_impl& type) -> data_type {
return set_type_impl::get_instance(short_type, false);
},
[] (const abstract_type& type) -> data_type {
throw std::invalid_argument("Should not reach");
}
));
add_column(log_data_column_deleted_elements_name(column.name_as_text()), dtype);
}
}
};
add_columns(s.partition_key_columns());
add_columns(s.clustering_key_columns());
add_columns(s.static_columns(), true);
add_columns(s.regular_columns(), true);
}
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
if (old) {
// If the user reattaches the log table, do not change its properties.
b.set_properties(old->get_properties());
} else {
set_default_properties_log_table(b, s, db, ksm);
}
add_columns_to_cdc_log(b, s, timestamp, old);
if (uuid) {
b.set_uuid(*uuid);
}
/**
* #10473 - if we are redefining the log table, we need to ensure any dropped
* columns are registered in "dropped_columns" table, otherwise clients will not
* be able to read data older than now.
*/
if (old) {
// not super efficient, but we don't do this often.
for (auto& col : old->all_columns()) {
if (!b.has_column({col.name(), col.name_as_text() })) {
auto drop_ts = api::timestamp_clock::now() + column_drop_leeway;
b.without_column(col.name_as_text(), col.type, drop_ts.time_since_epoch().count());
}
}
}
return b.build();
}
// iterators for collection merge
template<typename T>
class collection_iterator {
public:
using iterator_category = std::input_iterator_tag;
using value_type = const T;
using difference_type = std::ptrdiff_t;
using pointer = const T*;
using reference = const T&;
private:
managed_bytes_view _v, _next;
size_t _rem = 0;
T _current;
public:
collection_iterator(managed_bytes_view_opt v = {})
: _v(v.value_or(managed_bytes_view{}))
, _rem(_v.empty() ? 0 : read_collection_size(_v))
{
if (_rem != 0) {
parse();
}
}
collection_iterator(const collection_iterator&) = default;
const T& operator*() const {
return _current;
}
const T* operator->() const {
return &_current;
}
collection_iterator& operator++() {
next();
if (_rem != 0) {
parse();
} else {
_current = {};
}
return *this;
}
collection_iterator operator++(int) {
auto v = *this;
++(*this);
return v;
}
bool operator==(const collection_iterator& x) const {
return _v == x._v;
}
private:
void next() {
--_rem;
_v = _next;
}
void parse();
};
template<>
void collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>>::parse() {
SCYLLA_ASSERT(_rem > 0);
_next = _v;
auto k = read_collection_key(_next);
auto v = read_collection_value_nonnull(_next);
_current = std::make_pair(k, v);
}
template<>
void collection_iterator<managed_bytes_view>::parse() {
SCYLLA_ASSERT(_rem > 0);
_next = _v;
auto k = read_collection_key(_next);
_current = k;
}
template<>
void collection_iterator<managed_bytes_view_opt>::parse() {
SCYLLA_ASSERT(_rem > 0);
_next = _v;
auto k = read_collection_value_nonnull(_next);
_current = k;
}
template<typename Container, typename T>
class maybe_back_insert_iterator : public std::back_insert_iterator<Container> {
const abstract_type& _type;
collection_iterator<T> _s, _e;
public:
using value_type = typename Container::value_type;
maybe_back_insert_iterator(Container& c, const abstract_type& type, collection_iterator<T> s)
: std::back_insert_iterator<Container>(c)
, _type(type)
, _s(s)
{}
maybe_back_insert_iterator& operator*() {
return *this;
}
maybe_back_insert_iterator& operator=(const value_type& v) {
if (!find(v)) {
std::back_insert_iterator<Container>::operator=(v);
}
return *this;
}
maybe_back_insert_iterator& operator=(value_type&& v) {
if (!find(v)) {
std::back_insert_iterator<Container>::operator=(std::move(v));
}
return *this;
}
private:
bool find(const value_type& v) {
// cheating - reducing search span, because we know we only append unique values (see below).
while (_s != _e) {
auto n = compare(*_s, v);
if (n <= 0) {
++_s;
}
if (n == 0) {
return true;
}
if (n > 0) {
break;
}
}
return false;
}
std::strong_ordering compare(const T&, const value_type& v);
};
template<>
std::strong_ordering maybe_back_insert_iterator<std::vector<std::pair<managed_bytes_view, managed_bytes_view>>, managed_bytes_view>::compare(
const managed_bytes_view& t, const value_type& v) {
return _type.compare(t, v.first);
}
template<>
std::strong_ordering maybe_back_insert_iterator<std::vector<managed_bytes_view>, managed_bytes_view>::compare(const managed_bytes_view& t, const value_type& v) {
return _type.compare(t, v);
}
template<>
std::strong_ordering maybe_back_insert_iterator<std::vector<managed_bytes_view_opt>, managed_bytes_view_opt>::compare(const managed_bytes_view_opt& t, const value_type& v) {
if (!t.has_value() || !v.has_value()) {
return unsigned(t.has_value()) <=> unsigned(v.has_value());
}
return _type.compare(*t, *v);
}
template<typename Container, typename T>
auto make_maybe_back_inserter(Container& c, const abstract_type& type, collection_iterator<T> s) {
return maybe_back_insert_iterator<Container, T>(c, type, s);
}
static size_t collection_size(const managed_bytes_opt& bo) {
if (bo) {
managed_bytes_view mbv(*bo);
return read_collection_size(mbv);
}
return 0;
}
template<typename Func>
static void udt_for_each(const managed_bytes_opt& bo, Func&& f) {
if (bo) {
managed_bytes_view mbv(*bo);
std::for_each(tuple_deserializing_iterator::start(mbv), tuple_deserializing_iterator::finish(mbv), std::forward<Func>(f));
}
}
static managed_bytes merge(const collection_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> res;
res.reserve(collection_size(prev) + collection_size(next));
auto type = ctype.name_comparator();
auto cmp = [&type = *type](const std::pair<managed_bytes_view, managed_bytes_view>& p1, const std::pair<managed_bytes_view, managed_bytes_view>& p2) {
return type.compare(p1.first, p2.first) < 0;
};
collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>> e, i(prev), j(next);
// note order: set_union, when finding doubles, use value from first1 (j here). So
// since this is next, it has prio
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator<managed_bytes_view>(deleted)), cmp);
return map_type_impl::serialize_partially_deserialized_form_fragmented(res);
}
static managed_bytes merge(const set_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
std::vector<managed_bytes_view_opt> res;
res.reserve(collection_size(prev) + collection_size(next));
auto type = ctype.name_comparator();
auto cmp = [&type = *type](managed_bytes_view_opt k1, managed_bytes_view_opt k2) {
if (!k1.has_value() || !k2.has_value()) {
return unsigned(k1.has_value()) < unsigned(k2.has_value());
}
return type.compare(*k1, *k2) < 0;
};
collection_iterator<managed_bytes_view_opt> e, i(prev), j(next), d(deleted);
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, d), cmp);
return set_type_impl::serialize_partially_deserialized_form_fragmented(res);
}
static managed_bytes merge(const user_type_impl& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
std::vector<managed_bytes_view_opt> res(type.size());
udt_for_each(prev, [i = res.begin()](managed_bytes_view_opt k) mutable {
*i++ = k;
});
udt_for_each(next, [i = res.begin()](managed_bytes_view_opt k) mutable {
if (k) {
*i = k;
}
++i;
});
collection_iterator<managed_bytes_view> e, d(deleted);
std::for_each(d, e, [&res](managed_bytes_view k) {
auto index = deserialize_field_index(k);
res[index] = std::nullopt;
});
return type.build_value_fragmented(res);
}
static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
}
static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
if (state) {
if (auto it = state->find(&cdef); it != state->end()) {
return it->second;
}
}
return std::nullopt;
}
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
static managed_bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
if (!pirow || !pirow->has(cdef.name_as_text())) {
return std::nullopt;
}
return cdef.is_atomic()
? pirow->get_blob_fragmented(cdef.name_as_text())
: visit(*cdef.type, make_visitor(
// flatten set
[&] (const set_type_impl& type) {
auto v = pirow->get_view(cdef.name_as_text());
auto n = read_collection_size(v);
std::vector<managed_bytes> tmp;
tmp.reserve(n);
while (n--) {
tmp.emplace_back(read_collection_key(v)); // key
read_collection_value_nonnull(v); // value. ignore.
}
return set_type_impl::serialize_partially_deserialized_form_fragmented({tmp.begin(), tmp.end()});
},
[&] (const abstract_type& o) -> managed_bytes {
return pirow->get_blob_fragmented(cdef.name_as_text());
}
));
}
/* Given a timestamp, generates a timeuuid with the following properties:
* 1. `t1` < `t2` implies timeuuid_type->less(timeuuid_type->decompose(generate_timeuuid(`t1`)),
* timeuuid_type->decompose(generate_timeuuid(`t2`))),
* 2. utils::UUID_gen::micros_timestamp(generate_timeuuid(`t`)) == `t`.
*
* If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`),
* with unspecified nondeterministic ordering.
*/
utils::UUID generate_timeuuid(api::timestamp_type t) {
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{t});
}
class log_mutation_builder {
const schema& _base_schema;
const schema& _log_schema;
const column_definition& _op_col;
const column_definition& _ttl_col;
// The base mutation's partition key
std::vector<managed_bytes> _base_pk;
// The cdc$time value of created rows
const bytes _tuuid;
// The timestamp of the created log mutation cells
const api::timestamp_type _ts;
// The ttl of the created log mutation cells
const ttl_opt _ttl;
// Keeps the next cdc$batch_seq_no value
int _batch_no = 0;
// The mutation under construction
mutation& _log_mut;
public:
log_mutation_builder(mutation& log_mut, api::timestamp_type ts,
const partition_key& base_pk, const schema& base_schema)
: _base_schema(base_schema), _log_schema(*log_mut.schema()),
_op_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("operation"))),
_ttl_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("ttl"))),
_base_pk(base_pk.explode_fragmented()),
_tuuid(timeuuid_type->decompose(generate_timeuuid(ts))),
_ts(ts),
_ttl(_base_schema.cdc_options().ttl()
? std::optional{std::chrono::seconds(_base_schema.cdc_options().ttl())} : std::nullopt),
_log_mut(log_mut)
{}
const schema& base_schema() const {
return _base_schema;
}
clustering_key create_ck(int batch) const {
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
}
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
// The numbering of batch sequence numbers starts from 0.
clustering_key allocate_new_log_row() {
auto log_ck = create_ck(_batch_no++);
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
return log_ck;
}
bool has_rows() const {
return _batch_no != 0;
}
clustering_key last_row_key() const {
return create_ck(_batch_no - 1);
}
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
clustering_key allocate_new_log_row(operation op) {
auto log_ck = allocate_new_log_row();
set_operation(log_ck, op);
return log_ck;
}
// Each clustering key column in the base schema has a corresponding column in the log schema with the same name.
// This takes a base schema clustering key prefix and sets these columns
// according to the prefix' values for the given log row.
void set_clustering_columns(const clustering_key& log_ck, const clustering_key_prefix& base_ckey) {
set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode_fragmented());
}
// Sets the `cdc$operation` column for the given row.
void set_operation(const clustering_key& log_ck, operation op) {
_log_mut.set_cell(log_ck, _op_col, atomic_cell::make_live(
*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _ttl));
}
// Sets the `cdc$ttl` column for the given row.
// Warning: if the caller wants `cdc$ttl` to be null, they shouldn't call `set_ttl` with a non-null value.
// Calling it with a non-null value and then with a null value will keep the non-null value.
void set_ttl(const clustering_key& log_ck, ttl_opt ttl) {
if (ttl) {
_log_mut.set_cell(log_ck, _ttl_col, atomic_cell::make_live(
*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl->count()), _ttl));
}
}
// Each regular and static column in the base schema has a corresponding column in the log schema with the same name.
// Given a reference to such a column from the base schema, this function sets the corresponding column
// in the log to the given value for the given row.
void set_value(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes_view& value) {
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_name_bytes(base_cdef.name()));
if (!log_cdef_ptr) {
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
}
_log_mut.set_cell(log_ck, *log_cdef_ptr, atomic_cell::make_live(*base_cdef.type, _ts, value, _ttl));
}
// Each regular and static column in the base schema has a corresponding column in the log schema
// with boolean type and the name constructed by prefixing the original name with ``cdc$deleted_''
// Given a reference to such a column from the base schema, this function sets the corresponding column
// in the log to `true` for the given row. If not called, the column will be `null`.
void set_deleted(const clustering_key& log_ck, const column_definition& base_cdef) {
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_deleted_name_bytes(base_cdef.name()));
if (!log_cdef_ptr) {
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
}
auto& log_cdef = *log_cdef_ptr;
_log_mut.set_cell(log_ck, *log_cdef_ptr, atomic_cell::make_live(*log_cdef.type, _ts, log_cdef.type->decompose(true), _ttl));
}
// Each regular and static non-atomic column in the base schema has a corresponding column in the log schema
// whose type is a frozen `set` of keys (the types of which depend on the base type) and whose name is constructed
// by prefixing the original name with ``cdc$deleted_elements_''.
// Given a reference to such a column from the base schema, this function sets the corresponding column
// in the log to the given set of keys for the given row.
void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes& deleted_elements) {
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_deleted_elements_name_bytes(base_cdef.name()));
if (!log_cdef_ptr) {
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
}
auto& log_cdef = *log_cdef_ptr;
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
}
void end_record() {
if (has_rows()) {
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
}
}
private:
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<managed_bytes>& key) {
size_t pos = 0;
for (auto& column : columns) {
if (pos >= key.size()) {
break;
}
auto& cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(column.name()));
_log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, managed_bytes_view(key[pos]), _ttl));
++pos;
}
}
};
static managed_bytes get_managed_bytes(const atomic_cell_view& acv) {
return managed_bytes(acv.value());
}
static ttl_opt get_ttl(const atomic_cell_view& acv) {
return acv.is_live_and_has_ttl() ? std::optional{acv.ttl()} : std::nullopt;
}
static ttl_opt get_ttl(const row_marker& rm) {
return rm.is_expiring() ? std::optional{rm.ttl()} : std::nullopt;
}
/**
* Returns whether we should generate cdc delta values (beyond keys)
*/
static bool generate_delta_values(const schema& s) {
return s.cdc_options().get_delta_mode() == cdc::delta_mode::full;
}
/* Visits the cells and tombstones of a single base mutation row and constructs corresponding delta-row cells
* for the corresponding log mutation.
*
* Additionally updates state required to produce pre/post-image if configured to do so (`enable_updating_state`).
*/
struct process_row_visitor {
const clustering_key& _log_ck;
stats::part_type_set& _touched_parts;
// The base row being visited gets a single corresponding log delta row.
// This is the value of the "cdc$ttl" column for that delta row.
ttl_opt _ttl_column = std::nullopt;
// Used to create cells in the corresponding delta row in the log mutation.
log_mutation_builder& _builder;
/* Images-related state */
const bool _enable_updating_state = false;
// Null for the static row, non-null for clustered rows.
const clustering_key* const _base_ck;
// The state required to produce pre/post-image for the row being visited.
// Might be null if the preimage query didn't return a result for this row.
cell_map* _row_state;
// The state required to produce pre/post-image for all rows.
// We need to keep a reference to it since we might insert new row_states during the visitation.
row_states_map& _clustering_row_states;
const bool _generate_delta_values = true;
process_row_visitor(
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
row_states_map& clustering_row_states, bool generate_delta_values)
: _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder),
_enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state),
_clustering_row_states(clustering_row_states),
_generate_delta_values(generate_delta_values)
{}
void update_row_state(const column_definition& cdef, managed_bytes_opt value) {
if (!_row_state) {
// static row always has a valid state, so this must be a clustering row missing
SCYLLA_ASSERT(_base_ck);
auto [it, _] = _clustering_row_states.try_emplace(*_base_ck);
_row_state = &it->second;
}
(*_row_state)[&cdef] = std::move(value);
}
void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) {
_ttl_column = get_ttl(cell);
if (cdef.type->get_kind() == abstract_type::kind::empty) {
return;
}
managed_bytes value = get_managed_bytes(cell);
// delta
if (_generate_delta_values) {
_builder.set_value(_log_ck, cdef, value);
}
// images
if (_enable_updating_state) {
update_row_state(cdef, std::move(value));
}
}
void dead_atomic_cell(const column_definition& cdef, const atomic_cell_view&) {
// delta
if (_generate_delta_values) {
_builder.set_deleted(_log_ck, cdef);
}
// images
if (_enable_updating_state) {
update_row_state(cdef, std::nullopt);
}
}
void collection_column(const column_definition& cdef, auto&& visit_collection) {
// The handling of dead cells and the tombstone is common for all collection types,
// but we need separate visitors for different collection types to handle the live cells.
// See `set_visitor`, `udt_visitior`, and `map_or_list_visitor` below.
struct collection_visitor {
bool _is_column_delete = false;
std::vector<managed_bytes_view_opt> _deleted_keys;
ttl_opt& _ttl_column;
collection_visitor(ttl_opt& ttl_column) : _ttl_column(ttl_column) {}
void collection_tombstone(const tombstone&) {
_is_column_delete = true;
}
void dead_collection_cell(bytes_view key, const atomic_cell_view&) {
_deleted_keys.push_back(key);
}
constexpr bool finished() const { return false; }
};
// cdc$deleted_col, cdc$deleted_elements_col, col
using result_t = std::tuple<bool, std::vector<managed_bytes_view_opt>, managed_bytes_opt>;
auto result = visit(*cdef.type, make_visitor(
[&] (const set_type_impl&) -> result_t {
_touched_parts.set<stats::part_type::SET>();
struct set_visitor : public collection_visitor {
std::vector<managed_bytes_view_opt> _added_keys;
set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_keys.emplace_back(key);
}
} v(_ttl_column);
visit_collection(v);
managed_bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
std::optional{set_type_impl::serialize_partially_deserialized_form_fragmented(v._added_keys)};
return {
v._is_column_delete,
std::move(v._deleted_keys),
std::move(added_keys)
};
},
[&] (const user_type_impl& type) -> result_t {
_touched_parts.set<stats::part_type::UDT>();
struct udt_visitor : public collection_visitor {
std::vector<managed_bytes_view_opt> _added_cells;
udt_visitor(ttl_opt& ttl_column, size_t num_keys)
: collection_visitor(ttl_column), _added_cells(num_keys) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells[deserialize_field_index(key)].emplace(cell.value());
}
};
udt_visitor v(_ttl_column, type.size());
visit_collection(v);
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{type.build_value_fragmented(v._added_cells)};
return {
v._is_column_delete,
std::move(v._deleted_keys),
std::move(added_cells)
};
},
[&] (const collection_type_impl& type) -> result_t {
_touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP);
struct map_or_list_visitor : public collection_visitor {
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> _added_cells;
map_or_list_visitor(ttl_opt& ttl_column)
: collection_visitor(ttl_column) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
this->_ttl_column = get_ttl(cell);
_added_cells.emplace_back(key, cell.value());
}
};
map_or_list_visitor v(_ttl_column);
visit_collection(v);
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
std::optional{map_type_impl::serialize_partially_deserialized_form_fragmented(v._added_cells)};
return {
v._is_column_delete,
std::move(v._deleted_keys),
std::move(added_cells)
};
},
[&] (const abstract_type& o) -> result_t {
throw std::runtime_error(format("cdc process_change: unknown type {}", o.name()));
}
));
auto&& is_column_delete = std::get<0>(result);
auto&& deleted_keys = std::get<1>(result);
auto&& added_cells = std::get<2>(result);
// FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob,
// then we deserialize again when merging images below
managed_bytes_opt deleted_elements = std::nullopt;
if (!deleted_keys.empty()) {
deleted_elements = set_type_impl::serialize_partially_deserialized_form_fragmented(deleted_keys);
}
// delta
if (_generate_delta_values) {
if (is_column_delete) {
_builder.set_deleted(_log_ck, cdef);
}
if (deleted_elements) {
_builder.set_deleted_elements(_log_ck, cdef, *deleted_elements);
}
if (added_cells) {
_builder.set_value(_log_ck, cdef, *added_cells);
}
}
// images
if (_enable_updating_state) {
// A column delete overwrites any data we gathered until now.
managed_bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef);
managed_bytes_opt next;
if (added_cells || (deleted_elements && prev)) {
next = visit(*cdef.type, [&] (const auto& type) -> managed_bytes {
return merge(type, prev, added_cells, deleted_elements);
});
}
update_row_state(cdef, std::move(next));
}
}
constexpr bool finished() const { return false; }
};
struct process_change_visitor {
const per_request_options& _request_options;
// The types of the operations used for row / partition deletes. Introduced
// to differentiate service operations (e.g. operation::service_row_delete
// vs operation::row_delete).
const operation _row_delete_op = operation::row_delete;
const operation _partition_delete_op = operation::partition_delete;
stats::part_type_set& _touched_parts;
log_mutation_builder& _builder;
/* Images-related state */
const bool _enable_updating_state = false;
row_states_map& _clustering_row_states;
cell_map& _static_row_state;
const bool _is_update = false;
const bool _generate_delta_values = true;
void static_row_cells(auto&& visit_row_cells) {
_touched_parts.set<stats::part_type::STATIC_ROW>();
auto log_ck = _builder.allocate_new_log_row(operation::update);
process_row_visitor v(
log_ck, _touched_parts, _builder,
_enable_updating_state, nullptr, &_static_row_state, _clustering_row_states, _generate_delta_values);
visit_row_cells(v);
_builder.set_ttl(log_ck, v._ttl_column);
}
void clustered_row_cells(const clustering_key& ckey, auto&& visit_row_cells) {
_touched_parts.set<stats::part_type::CLUSTERING_ROW>();
auto log_ck = _builder.allocate_new_log_row();
_builder.set_clustering_columns(log_ck, ckey);
struct clustering_row_cells_visitor : public process_row_visitor {
operation _cdc_op = operation::update;
operation _marker_op = operation::insert;
using process_row_visitor::process_row_visitor;
void marker(const row_marker& rm) {
_ttl_column = get_ttl(rm);
_cdc_op = _marker_op;
}
};
clustering_row_cells_visitor v(
log_ck, _touched_parts, _builder,
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
_clustering_row_states, _generate_delta_values);
if (_is_update && _request_options.alternator) {
v._marker_op = operation::update;
}
visit_row_cells(v);
if (_enable_updating_state) {
// #7716: if there are no regular columns, our visitor would not have visited any cells,
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
// Ensure that the row state exists.
_clustering_row_states.try_emplace(ckey);
}
_builder.set_operation(log_ck, v._cdc_op);
_builder.set_ttl(log_ck, v._ttl_column);
}
void clustered_row_delete(const clustering_key& ckey, const tombstone&) {
_touched_parts.set<stats::part_type::ROW_DELETE>();
auto log_ck = _builder.allocate_new_log_row(_row_delete_op);
_builder.set_clustering_columns(log_ck, ckey);
if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) {
_clustering_row_states.erase(ckey);
}
}
void range_delete(const range_tombstone& rt) {
_touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
{
const auto start_operation = rt.start_kind == bound_kind::incl_start
? operation::range_delete_start_inclusive
: operation::range_delete_start_exclusive;
auto log_ck = _builder.allocate_new_log_row(start_operation);
_builder.set_clustering_columns(log_ck, rt.start);
}
{
const auto end_operation = rt.end_kind == bound_kind::incl_end
? operation::range_delete_end_inclusive
: operation::range_delete_end_exclusive;
auto log_ck = _builder.allocate_new_log_row(end_operation);
_builder.set_clustering_columns(log_ck, rt.end);
}
// #6900 - remove stored row data (for postimage)
// if it falls inside the clustering range of the
// tombstone.
if (!_enable_updating_state) {
return;
}
bound_view::compare cmp(_builder.base_schema());
auto sb = rt.start_bound();
auto eb = rt.end_bound();
std::erase_if(_clustering_row_states, [&](auto& p) {
auto& ck = p.first;
return cmp(sb, ck) && !cmp(eb, ck);
});
}
void partition_delete(const tombstone&) {
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
auto log_ck = _builder.allocate_new_log_row(_partition_delete_op);
if (_enable_updating_state) {
_clustering_row_states.clear();
}
}
constexpr bool finished() const { return false; }
};
class transformer final : public change_processor {
private:
db_context _ctx;
schema_ptr _schema;
dht::decorated_key _dk;
schema_ptr _log_schema;
const per_request_options& _options;
/**
* #6070, #6084
* Non-atomic column assignments which use a TTL are broken into two invocations
* of `transform`, such as in the following example:
* CREATE TABLE t (a int PRIMARY KEY, b map<int, int>) WITH cdc = {'enabled':true};
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
*
* The above UPDATE creates a tombstone and a (0, 0) cell; because tombstones don't have the notion
* of a TTL, we split the UPDATE into two separate changes (represented as two separate delta rows in the log,
* resulting in two invocations of `transform`): one change for the deletion with no TTL,
* and one change for adding cells with TTL = 5.
*
* In other words, we use the fact that
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
* is equivalent to
* BEGIN UNLOGGED BATCH
* UPDATE t SET b = null WHERE a = 0;
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
* APPLY BATCH;
* (the mutations are the same in both cases),
* and perform a separate `transform` call for each statement in the batch.
*
* An assignment also happens when an INSERT statement is used as follows:
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
*
* This will be split into three separate changes (three invocations of `transform`):
* 1. One with TTL = 5 for the row marker (introduces by the INSERT), indicating that a row was inserted.
* 2. One without a TTL for the tombstone, indicating that the collection was cleared.
* 3. One with TTL = 5 for the addition of cell (0, 0), indicating that the collection
* was extended by a new key/value.
*
* Why do we need three changes and not two, like in the UPDATE case?
* The tombstone needs to be a separate change because it doesn't have a TTL,
* so only the row marker change could potentially be merged with the cell change (1 and 3 above).
* However, we cannot do that: the row marker change is of INSERT type (cdc$operation == cdc::operation::insert),
* but there is no way to create a statement that
* - has a row marker,
* - adds cells to a collection,
* - but *doesn't* add a tombstone for this collection.
* INSERT statements that modify collections *always* add tombstones.
*
* Merging the row marker with the cell addition would result in such an impossible statement.
*
* Instead, we observe that
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
* is equivalent to
* BEGIN UNLOGGED BATCH
* INSERT INTO t (a) VALUES (0) USING TTL 5;
* UPDATE t SET b = null WHERE a = 0;
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
* APPLY BATCH;
* and perform a separate `transform` call for each statement in the batch.
*
* Unfortunately, due to splitting, the cell addition call (b + b {0:0}) does not know about the tombstone.
* If it was performed independently from the tombstone call, it would create a wrong post-image:
* the post-image would look as if the previous cells still existed.
* For example, suppose that b was equal to {1:1} before the above statement was performed.
* Then the final post-image for b for above statement/batch would be {0:0, 1:1}, when instead it should be {0:0}.
*
* To handle this we use the fact that
* 1. changes without a TTL are treated as if TTL = 0,
* 2. `transform` is invoked in order of increasing TTLs,
* and we maintain state between `transform` invocations (`_clustering_row_states`, `_static_row_state`).
*
* Thus, the tombstone call will happen *before* the cell addition call,
* so the cell addition call will know that there previously was a tombstone
* and create a correct post-image.
*
* Furthermore, `transform` calls for INSERT changes (i.e. with a row marker)
* happen before `transform` calls for UPDATE changes, so in the case of an INSERT
* which modifies a collection column as above, the row marker call will happen first;
* its post-image will still show {1:1} for the collection column. Good.
*/
row_states_map _clustering_row_states;
cell_map _static_row_state;
// True if the mutated row existed before applying the mutation. In other
// words, if the preimage is enabled and it isn't empty (otherwise, we
// assume that the row is non-existent). Used for Alternator Streams (see
// #6918).
bool _is_update = false;
const bool _uses_tablets;
utils::chunked_vector<mutation> _result_mutations;
std::optional<log_mutation_builder> _builder;
// When enabled, process_change will update _clustering_row_states and _static_row_state
bool _enable_updating_state = false;
stats::part_type_set _touched_parts;
public:
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options)
: _ctx(ctx)
, _schema(std::move(s))
, _dk(std::move(dk))
, _log_schema(_schema->cdc_schema() ? _schema->cdc_schema() : ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _options(options)
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())
{
}
// DON'T move the transformer after this
void begin_timestamp(api::timestamp_type ts, bool is_last) override {
const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token());
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
_builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema);
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage());
}
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
// if we want full preimage, just ignore the affected columns and include everything.
generate_image(operation::pre_image, ck, _schema->cdc_options().full_preimage() ? nullptr : &columns_to_include);
};
void produce_postimage(const clustering_key* ck) override {
generate_image(operation::post_image, ck, nullptr);
}
void generate_image(operation op, const clustering_key* ck, const one_kind_column_set* affected_columns) {
SCYLLA_ASSERT(op == operation::pre_image || op == operation::post_image);
// SCYLLA_ASSERT that post_image is always full
SCYLLA_ASSERT(!(op == operation::post_image && affected_columns));
SCYLLA_ASSERT(_builder);
const auto kind = ck ? column_kind::regular_column : column_kind::static_column;
cell_map* row_state;
if (ck) {
row_state = get_row_state(_clustering_row_states, *ck);
if (!row_state) {
// We have no data for this row, we can stop here
// Empty row here means we had data, but column deletes
// removed it. This we should report as pk/ck only
return;
}
} else {
row_state = &_static_row_state;
// if the static row state is empty and we did not touch
// during processing, it either did not exist, or we did pk delete.
// In those cases, no postimage for you.
// If we touched it, we should indicate it with an empty
// postimage.
if (row_state->empty() && !_touched_parts.contains(stats::part_type::STATIC_ROW)) {
return;
}
}
auto image_ck = _builder->allocate_new_log_row(op);
if (ck) {
_builder->set_clustering_columns(image_ck, *ck);
}
auto process_cell = [&, this] (const column_definition& cdef) {
// If table uses compact storage it may contain a column of type empty
// and we need to ignore such a field because it is not present in CDC log.
if (cdef.type->get_kind() == abstract_type::kind::empty) {
return;
}
if (auto current = get_col_from_row_state(row_state, cdef)) {
_builder->set_value(image_ck, cdef, *current);
} else if (op == operation::pre_image) {
// Cell is NULL.
// If we generate the preimage,
// we should also fill in the deleted column.
// Otherwise the user will not be able
// to discern whether a value in the preimage
// is NULL or it was not included in the
// preimage ('full' preimage disabled).
// If we generate the postimage,
// we don't have to fill in the deleted column,
// as all postimages are full.
_builder->set_deleted(image_ck, cdef);
}
};
if (affected_columns) {
// Preimage case - include only data from requested columns
for (auto it = affected_columns->find_first(); it != one_kind_column_set::npos; it = affected_columns->find_next(it)) {
const auto id = column_id(it);
const auto& cdef = _schema->column_at(kind, id);
process_cell(cdef);
}
} else {
// Postimage case - include data from all columns
const auto column_range = _schema->columns(kind);
std::for_each(column_range.begin(), column_range.end(), process_cell);
}
}
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
void process_change(const mutation& m) override {
SCYLLA_ASSERT(_builder);
process_change_visitor v {
._request_options = _options,
._row_delete_op = _options.is_system_originated ? operation::service_row_delete : operation::row_delete,
._partition_delete_op = _options.is_system_originated ? operation::service_partition_delete : operation::partition_delete,
._touched_parts = _touched_parts,
._builder = *_builder,
._enable_updating_state = _enable_updating_state,
._clustering_row_states = _clustering_row_states,
._static_row_state = _static_row_state,
._is_update = _is_update,
._generate_delta_values = generate_delta_values(_builder->base_schema())
};
cdc::inspect_mutation(m, v);
}
void end_record() override {
SCYLLA_ASSERT(_builder);
_builder->end_record();
}
const row_states_map& clustering_row_states() const override {
return _clustering_row_states;
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<utils::chunked_vector<mutation>, stats::part_type_set> finish() && {
return std::make_pair<utils::chunked_vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
}
static db::timeout_clock::time_point default_timeout() {
return db::timeout_clock::now() + 10s;
}
future<lw_shared_ptr<cql3::untyped_result_set>> pre_image_select(
service::client_state& client_state,
db::consistency_level write_cl,
const mutation& m)
{
auto& p = m.partition();
const bool no_ck_schema_partition_deletion = m.schema()->clustering_key_size() == 0 && bool(p.partition_tombstone());
if (p.clustered_rows().empty() && p.static_row().empty() && !no_ck_schema_partition_deletion) {
return make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>();
}
dht::partition_range_vector partition_ranges{dht::partition_range(m.decorated_key())};
auto&& pc = _schema->partition_key_columns();
auto&& cc = _schema->clustering_key_columns();
std::vector<query::clustering_range> bounds;
uint64_t row_limit = query::max_rows;
const bool has_only_static_row = !p.static_row().empty() && p.clustered_rows().empty();
if (cc.empty() || has_only_static_row) {
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
if (has_only_static_row) {
row_limit = 1;
}
} else {
for (const rows_entry& r : p.clustered_rows()) {
auto& ck = r.key();
bounds.push_back(query::clustering_range::make_singular(ck));
}
}
std::vector<const column_definition*> columns;
columns.reserve(_schema->all_columns().size());
std::transform(pc.begin(), pc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
std::transform(cc.begin(), cc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
query::column_id_vector static_columns, regular_columns;
// TODO: this assumes all mutations touch the same set of columns. This might not be true, and we may need to do more horrible set operation here.
if (!p.static_row().empty()) {
// for postimage we need everything...
if (_schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) {
for (const column_definition& c: _schema->static_columns()) {
static_columns.emplace_back(c.id);
columns.emplace_back(&c);
}
} else {
p.static_row().get().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
auto& cdef =_schema->column_at(column_kind::static_column, id);
static_columns.emplace_back(id);
columns.emplace_back(&cdef);
});
}
}
if (!p.clustered_rows().empty() || no_ck_schema_partition_deletion) {
const bool has_row_delete = std::any_of(p.clustered_rows().begin(), p.clustered_rows().end(), [] (const rows_entry& re) {
return re.row().deleted_at();
});
// for postimage we need everything...
if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage() || no_ck_schema_partition_deletion) {
for (const column_definition& c: _schema->regular_columns()) {
regular_columns.emplace_back(c.id);
columns.emplace_back(&c);
}
} else {
p.clustered_rows().begin()->row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
const auto& cdef =_schema->column_at(column_kind::regular_column, id);
regular_columns.emplace_back(id);
columns.emplace_back(&cdef);
});
}
}
auto selection = cql3::selection::selection::for_columns(_schema, std::move(columns));
auto opts = selection->get_query_options();
opts.set(query::partition_slice::option::collections_as_maps);
opts.set_if<query::partition_slice::option::always_return_static_content>(!p.static_row().empty());
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts));
const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice);
const auto tombstone_limit = query::tombstone_limit(_ctx._proxy.get_tombstone_limit());
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), tombstone_limit, query::row_limit(row_limit));
const auto select_cl = adjust_cl(write_cl);
try {
return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), select_cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then(
[s = _schema, partition_slice = std::move(partition_slice), selection = std::move(selection)] (service::storage_proxy::coordinator_query_result qr) -> lw_shared_ptr<cql3::untyped_result_set> {
return make_lw_shared<cql3::untyped_result_set>(*s, std::move(qr.query_result), *selection, partition_slice);
});
} catch (exceptions::unavailable_exception& e) {
// `query` can throw `unavailable_exception`, which is seen by clients as ~ "NoHostAvailable".
// So, we'll translate it to a `read_failure_exception` with custom message.
cdc_log.debug("Preimage: translating a (read) `unavailable_exception` to `request_execution_exception` - {}", e);
throw exceptions::read_failure_exception("CDC preimage query could not achieve the CL.",
e.consistency, e.alive, 0, e.required, false);
}
}
// Note: this assumes that the results are from one partition only
void load_preimage_results_into_state(lw_shared_ptr<cql3::untyped_result_set> preimage_set, bool static_only) {
// static row
if (!preimage_set->empty()) {
// There may be some static row data
const auto& row = preimage_set->front();
for (auto& c : _schema->static_columns()) {
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
_static_row_state[&c] = std::move(*maybe_cell_view);
}
}
_is_update = true;
}
if (static_only) {
return;
}
// clustering rows
for (const auto& row : *preimage_set) {
// Construct the clustering key for this row
std::vector<managed_bytes> ck_parts;
ck_parts.reserve(_schema->clustering_key_size());
for (auto& c : _schema->clustering_key_columns()) {
auto v = row.get_view_opt(c.name_as_text());
if (!v) {
// We might get here if both of the following conditions are true:
// - In preimage query, we requested the static row and some clustering rows,
// - The partition had some static row data, but did not have any requested clustering rows.
// In such case, the result set will have an artificial row that only contains static columns,
// but no clustering columns. In such case, we can safely return from the function,
// as there will be no clustering row data to load into the state.
return;
}
ck_parts.emplace_back(managed_bytes(*v));
}
auto ck = clustering_key::from_exploded(std::move(ck_parts));
// Collect regular rows
cell_map cells;
for (auto& c : _schema->regular_columns()) {
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
cells[&c] = std::move(*maybe_cell_view);
}
}
_clustering_row_states.insert_or_assign(std::move(ck), std::move(cells));
}
}
/** For preimage query use the same CL as for base write, except for CLs ANY and ALL. */
static db::consistency_level adjust_cl(db::consistency_level write_cl) {
if (write_cl == db::consistency_level::ANY) {
return db::consistency_level::ONE;
} else if (write_cl == db::consistency_level::ALL || write_cl == db::consistency_level::SERIAL) {
return db::consistency_level::QUORUM;
} else if (write_cl == db::consistency_level::LOCAL_SERIAL) {
return db::consistency_level::LOCAL_QUORUM;
}
return write_cl;
}
};
template <typename Func>
future<utils::chunked_vector<mutation>>
transform_mutations(utils::chunked_vector<mutation>& muts, decltype(muts.size()) batch_size, Func&& f) {
return parallel_for_each(
boost::irange(static_cast<decltype(muts.size())>(0), muts.size(), batch_size),
std::forward<Func>(f))
.then([&muts] () mutable { return std::move(muts); });
}
} // namespace cdc
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, utils::chunked_vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl, per_request_options options) {
// we do all this because in the case of batches, we can have mixed schemas.
auto e = mutations.end();
auto i = std::find_if(mutations.begin(), e, [](const mutation& m) {
return cdc_enabled(*m.schema());
});
if (i == e) {
return make_ready_future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), lw_shared_ptr<cdc::operation_result_tracker>()));
}
tracing::trace(tr_state, "CDC: Started generating mutations for log rows");
mutations.reserve(2 * mutations.size());
return do_with(std::move(mutations), service::query_state(service::client_state::for_internal_calls(), empty_service_permit()), operation_details{}, std::move(options),
[this, tr_state = std::move(tr_state), write_cl] (utils::chunked_vector<mutation>& mutations, service::query_state& qs, operation_details& details, per_request_options& options) {
return transform_mutations(mutations, 1, [this, &mutations, &qs, tr_state = tr_state, &details, write_cl, &options] (int idx) mutable {
auto& m = mutations[idx];
auto s = m.schema();
if (!cdc_enabled(*s)) {
return make_ready_future<>();
}
const bool alternator_increased_compatibility = options.alternator && options.alternator_streams_increased_compatibility;
transformer trans(_ctxt, s, m.decorated_key(), options);
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
if (options.preimage && !options.preimage->empty()) {
// Preimage has been fetched by upper layers.
tracing::trace(tr_state, "CDC: Using a prefetched preimage");
f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(options.preimage);
} else if (s->cdc_options().preimage() || s->cdc_options().postimage() || alternator_increased_compatibility) {
// Note: further improvement here would be to coalesce the pre-image selects into one
// if a batch contains several modifications to the same table. Otoh, batch is rare(?)
// so this is premature.
tracing::trace(tr_state, "CDC: Selecting preimage for {}", m.decorated_key());
f = trans.pre_image_select(qs.get_client_state(), write_cl, m).then_wrapped([this] (future<lw_shared_ptr<cql3::untyped_result_set>> f) {
auto& cdc_stats = _ctxt._proxy.get_cdc_stats();
cdc_stats.counters_total.preimage_selects++;
if (f.failed()) {
cdc_stats.counters_failed.preimage_selects++;
}
return f;
});
} else {
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
}
return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
if (rs) {
const auto& p = m.partition();
const bool static_only = !p.static_row().empty() && p.clustered_rows().empty();
trans.load_preimage_results_into_state(std::move(rs), static_only);
}
const bool preimage = s->cdc_options().preimage();
const bool postimage = s->cdc_options().postimage();
details.had_preimage |= preimage;
details.had_postimage |= postimage;
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
if (should_split(m, options)) {
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
details.was_split = true;
process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
} else {
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
}
auto [log_mut, touched_parts] = std::move(trans).finish();
const int generated_count = log_mut.size();
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
// `m` might be invalidated at this point because of the push_back to the vector
tracing::trace(tr_state, "CDC: Generated {} log mutations from {}", generated_count, mutations[idx].decorated_key());
details.touched_parts.add(touched_parts);
});
}).then([this, tr_state, &details](utils::chunked_vector<mutation> mutations) {
tracing::trace(tr_state, "CDC: Finished generating all log mutations");
auto tracker = make_lw_shared<cdc::operation_result_tracker>(_ctxt._proxy.get_cdc_stats(), details);
return make_ready_future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), std::move(tracker)));
});
});
}
bool cdc::cdc_service::needs_cdc_augmentation(const utils::chunked_vector<mutation>& mutations) const {
return std::any_of(mutations.begin(), mutations.end(), [](const mutation& m) {
return cdc_enabled(*m.schema());
});
}
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
cdc::cdc_service::augment_mutation_call(lowres_clock::time_point timeout, utils::chunked_vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl, per_request_options options) {
if (utils::get_local_injector().enter("sleep_before_cdc_augmentation")) {
return seastar::sleep(std::chrono::milliseconds(100)).then([this, timeout, mutations = std::move(mutations), tr_state = std::move(tr_state), write_cl, options = std::move(options)] () mutable {
return _impl->augment_mutation_call(timeout, std::move(mutations), std::move(tr_state), write_cl, std::move(options));
});
}
return _impl->augment_mutation_call(timeout, std::move(mutations), std::move(tr_state), write_cl, std::move(options));
}