Modify the methods which calculate the default gc mode as well as that which validates whether repair-mode can be used at all, so both accepts use of repair-mode on RF=1 tables. This de-facto changes the default tombstone-gc to repair-mode for all tables. Documentation is updated accordingly. Some tests need adjusting: * cqlpy/test_select_from_mutation_fragments.py: disable GC for some test cases because this patch makes tombstones they write subject to GC when using defaults. * test/cluster/test_mv.py::test_mv_tombstone_gc_not_inherited used repair-mode as a non-default for the base table and expected the MV to revert to default. Another mode has to be used as the non-default (immediate). * test/cqlpy/test_tools.py::test_scylla_sstable_dump_schema: don't compare tombstone_gc schema extension when comparing dumped schema vs. original. The tool's schema loader doesn't have access to the keyspace definition so it will come up with different defaults for tombstone-gc. * test/boost/row_cache_test.cc::test_populating_cache_with_expired_and_nonexpired_tombstones sets tombstone expiry assuming the tombstone-gc timeout-mode default. Change the CREATE TABLE statement to set the expected mode.
2070 lines
90 KiB
C++
2070 lines
90 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <utility>
|
|
#include <algorithm>
|
|
|
|
#include <boost/range/irange.hpp>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/metrics.hh>
|
|
|
|
#include "cdc/log.hh"
|
|
#include "cdc/generation.hh"
|
|
#include "cdc/split.hh"
|
|
#include "cdc/cdc_options.hh"
|
|
#include "cdc/change_visitor.hh"
|
|
#include "cdc/metadata.hh"
|
|
#include "cdc/cdc_partitioner.hh"
|
|
#include "bytes.hh"
|
|
#include "index/vector_index.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/topology.hh"
|
|
#include "replica/database.hh"
|
|
#include "db/config.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "schema/schema.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "service/migration_listener.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "tombstone_gc_extension.hh"
|
|
#include "types/tuple.hh"
|
|
#include "cql3/statements/select_statement.hh"
|
|
#include "cql3/untyped_result_set.hh"
|
|
#include "log.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "utils/managed_bytes.hh"
|
|
#include "types/types.hh"
|
|
#include "types/concrete_types.hh"
|
|
#include "types/listlike_partial_deserializing_iterator.hh"
|
|
#include "tracing/trace_state.hh"
|
|
#include "stats.hh"
|
|
#include "utils/labels.hh"
|
|
|
|
namespace std {
|
|
|
|
template<> struct hash<std::pair<net::inet_address, unsigned int>> {
|
|
std::size_t operator()(const std::pair<net::inet_address, unsigned int> &p) const {
|
|
return std::hash<net::inet_address>{}(p.first) ^ std::hash<int>{}(p.second);
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
logging::logger cdc_log("cdc");
|
|
|
|
namespace {
|
|
|
|
shared_ptr<locator::abstract_replication_strategy> generate_replication_strategy(const keyspace_metadata& ksm, const locator::topology& topo) {
|
|
locator::replication_strategy_params params(ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option());
|
|
return locator::abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, topo);
|
|
}
|
|
|
|
// When dropping a column from a CDC log table, we set the drop timestamp
|
|
// `column_drop_leeway` seconds into the future to ensure that for writes concurrent
|
|
// with column drop, the write timestamp is before the column drop timestamp.
|
|
constexpr auto column_drop_leeway = std::chrono::seconds(5);
|
|
|
|
} // anonymous namespace
|
|
|
|
namespace cdc {
|
|
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&, api::timestamp_type,
|
|
std::optional<table_id> = {}, schema_ptr = nullptr);
|
|
}
|
|
|
|
static constexpr auto cdc_group_name = "cdc";
|
|
|
|
void cdc::stats::parts_touched_stats::register_metrics(seastar::metrics::metric_groups& metrics, std::string_view suffix) {
|
|
namespace sm = seastar::metrics;
|
|
auto register_part = [&] (part_type part, sstring part_name) {
|
|
metrics.add_group(cdc_group_name, {
|
|
sm::make_total_operations(seastar::format("operations_on_{}_performed_{}", part_name, suffix), count[(size_t)part],
|
|
sm::description(seastar::format("number of {} CDC operations that processed a {}", suffix, part_name)),
|
|
{cdc_label}).set_skip_when_empty()
|
|
});
|
|
};
|
|
|
|
register_part(part_type::STATIC_ROW, "static_row");
|
|
register_part(part_type::CLUSTERING_ROW, "clustering_row");
|
|
register_part(part_type::MAP, "map");
|
|
register_part(part_type::SET, "set");
|
|
register_part(part_type::LIST, "list");
|
|
register_part(part_type::UDT, "udt");
|
|
register_part(part_type::RANGE_TOMBSTONE, "range_tombstone");
|
|
register_part(part_type::PARTITION_DELETE, "partition_delete");
|
|
register_part(part_type::ROW_DELETE, "row_delete");
|
|
}
|
|
|
|
cdc::stats::stats() {
|
|
namespace sm = seastar::metrics;
|
|
|
|
auto register_counters = [this] (counters& counters, sstring kind) {
|
|
const auto split_label = sm::label("split");
|
|
_metrics.add_group(cdc_group_name, {
|
|
sm::make_total_operations("operations_" + kind, counters.unsplit_count,
|
|
sm::description(format("number of {} CDC operations", kind)),
|
|
{split_label(false), basic_level, cdc_label}).set_skip_when_empty(),
|
|
|
|
sm::make_total_operations("operations_" + kind, counters.split_count,
|
|
sm::description(format("number of {} CDC operations", kind)),
|
|
{split_label(true), basic_level, cdc_label}).set_skip_when_empty(),
|
|
|
|
sm::make_total_operations("preimage_selects_" + kind, counters.preimage_selects,
|
|
sm::description(format("number of {} preimage queries performed", kind)),
|
|
{cdc_label}).set_skip_when_empty(),
|
|
|
|
sm::make_total_operations("operations_with_preimage_" + kind, counters.with_preimage_count,
|
|
sm::description(format("number of {} operations that included preimage", kind)),
|
|
{cdc_label}).set_skip_when_empty(),
|
|
|
|
sm::make_total_operations("operations_with_postimage_" + kind, counters.with_postimage_count,
|
|
sm::description(format("number of {} operations that included postimage", kind)),
|
|
{cdc_label}).set_skip_when_empty()
|
|
});
|
|
|
|
counters.touches.register_metrics(_metrics, kind);
|
|
};
|
|
register_counters(counters_total, "total");
|
|
register_counters(counters_failed, "failed");
|
|
}
|
|
|
|
cdc::operation_result_tracker::~operation_result_tracker() {
|
|
auto update_stats = [this] (stats::counters& counters) {
|
|
if (_details.was_split) {
|
|
counters.split_count++;
|
|
} else {
|
|
counters.unsplit_count++;
|
|
}
|
|
counters.touches.apply(_details.touched_parts);
|
|
if (_details.had_preimage) {
|
|
counters.with_preimage_count++;
|
|
}
|
|
if (_details.had_postimage) {
|
|
counters.with_postimage_count++;
|
|
}
|
|
};
|
|
|
|
update_stats(_stats.counters_total);
|
|
if (_failed) {
|
|
update_stats(_stats.counters_failed);
|
|
}
|
|
}
|
|
|
|
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
|
|
friend cdc_service;
|
|
db_context _ctxt;
|
|
bool _stopped = false;
|
|
public:
|
|
impl(db_context ctxt)
|
|
: _ctxt(std::move(ctxt))
|
|
{
|
|
_ctxt._migration_notifier.register_listener(this);
|
|
}
|
|
~impl() {
|
|
SCYLLA_ASSERT(_stopped);
|
|
}
|
|
|
|
future<> stop() {
|
|
return _ctxt._migration_notifier.unregister_listener(this).then([this] {
|
|
_stopped = true;
|
|
});
|
|
}
|
|
|
|
virtual void on_before_allocate_tablet_map(const locator::tablet_map& map, const schema& s, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
|
|
if (!is_log_schema(s)) {
|
|
return;
|
|
}
|
|
|
|
auto stream_ts = db_clock::now() - duration_cast<std::chrono::milliseconds>(get_generation_leeway());
|
|
auto mut = create_table_streams_mutation(s.id(), stream_ts, map, ts).get();
|
|
muts.emplace_back(std::move(mut));
|
|
}
|
|
|
|
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type ts) override {
|
|
std::vector<schema_ptr> new_cfms;
|
|
|
|
for (auto sp : cfms) {
|
|
const auto& schema = *sp;
|
|
|
|
if (!schema.cdc_options().enabled()) {
|
|
continue;
|
|
}
|
|
|
|
auto& db = _ctxt._proxy.get_db().local();
|
|
auto logname = log_name(schema.cf_name());
|
|
check_that_cdc_log_table_does_not_exist(db, schema, logname);
|
|
ensure_that_table_has_no_counter_columns(schema);
|
|
if (!db.features().cdc_with_tablets) {
|
|
ensure_that_table_uses_vnodes(ksm, schema, db.get_token_metadata().get_topology());
|
|
}
|
|
|
|
// in seastar thread
|
|
auto log_schema = create_log_schema(schema, db, ksm, ts);
|
|
new_cfms.push_back(std::move(log_schema));
|
|
}
|
|
|
|
cfms.insert(cfms.end(), std::make_move_iterator(new_cfms.begin()), std::make_move_iterator(new_cfms.end()));
|
|
}
|
|
|
|
void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
|
bool has_vector_index = secondary_index::vector_index::has_vector_index(new_schema);
|
|
if (has_vector_index) {
|
|
// If we have a vector index, we need to ensure that the CDC log is created
|
|
// satisfying the minimal requirements of Vector Search.
|
|
secondary_index::vector_index::check_cdc_options(new_schema);
|
|
}
|
|
|
|
bool is_cdc = cdc_enabled(new_schema);
|
|
bool was_cdc = cdc_enabled(old_schema);
|
|
|
|
// if we are turning off cdc we can skip this, since even if columns change etc,
|
|
// any writer should see cdc -> off together with any actual schema changes to
|
|
// base table, so should never try to write to non-existent log column etc.
|
|
// note that if user has set ttl=0 in cdc options, he is still responsible
|
|
// for emptying the log.
|
|
if (is_cdc) {
|
|
auto& db = _ctxt._proxy.get_db().local();
|
|
auto logname = log_name(old_schema.cf_name());
|
|
auto& keyspace = db.find_keyspace(old_schema.ks_name());
|
|
auto has_cdc_log = db.has_schema(old_schema.ks_name(), logname);
|
|
auto log_schema = has_cdc_log ? db.find_schema(old_schema.ks_name(), logname) : nullptr;
|
|
|
|
if (!was_cdc && has_cdc_log) {
|
|
// make sure the apparent log table really is a cdc log (not user table)
|
|
// we just check the partitioner - since user tables should _not_ be able
|
|
// set/use this.
|
|
if (!is_log_schema(*log_schema)) {
|
|
// will throw
|
|
check_that_cdc_log_table_does_not_exist(db, old_schema, logname);
|
|
}
|
|
}
|
|
|
|
check_for_attempt_to_create_nested_cdc_log(db, new_schema);
|
|
ensure_that_table_has_no_counter_columns(new_schema);
|
|
if (!db.features().cdc_with_tablets) {
|
|
ensure_that_table_uses_vnodes(*keyspace.metadata(), new_schema, db.get_token_metadata().get_topology());
|
|
}
|
|
|
|
std::optional<table_id> maybe_id = log_schema ? std::make_optional(log_schema->id()) : std::nullopt;
|
|
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), timestamp, std::move(maybe_id), log_schema);
|
|
|
|
auto log_mut = log_schema
|
|
? db::schema_tables::make_update_table_mutations(_ctxt._proxy, keyspace.metadata(), log_schema, new_log_schema, timestamp)
|
|
: db::schema_tables::make_create_table_mutations(new_log_schema, timestamp)
|
|
;
|
|
|
|
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
|
|
|
if (!log_schema) {
|
|
db.get_notifier().before_create_column_family(*keyspace.metadata(), *new_log_schema, mutations, timestamp);
|
|
}
|
|
}
|
|
}
|
|
|
|
void on_before_drop_column_family(const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
|
auto logname = log_name(schema.cf_name());
|
|
auto& db = _ctxt._proxy.get_db().local();
|
|
auto has_cdc_log = db.has_schema(schema.ks_name(), logname);
|
|
if (has_cdc_log) {
|
|
auto log_schema = db.find_schema(schema.ks_name(), logname);
|
|
if (is_log_schema(*log_schema)) {
|
|
auto& keyspace = db.find_keyspace(schema.ks_name());
|
|
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
|
|
|
db.get_notifier().before_drop_column_family(*log_schema, mutations, timestamp);
|
|
}
|
|
}
|
|
|
|
if (is_log_schema(schema)) {
|
|
auto& keyspace = db.find_keyspace(schema.ks_name());
|
|
if (keyspace.uses_tablets()) {
|
|
// drop cdc streams of this table
|
|
auto drop_stream_mut = make_drop_table_streams_mutations(schema.id(), timestamp);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(drop_stream_mut.begin()), std::make_move_iterator(drop_stream_mut.end()));
|
|
}
|
|
}
|
|
}
|
|
|
|
void on_before_drop_keyspace(const sstring& keyspace_name, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) override {
|
|
auto& db = _ctxt._proxy.get_db().local();
|
|
auto& ks = db.find_keyspace(keyspace_name);
|
|
|
|
if (ks.uses_tablets()) {
|
|
// drop cdc streams for all CDC tables in this keyspace
|
|
for (auto&& [name, s] : ks.metadata()->cf_meta_data()) {
|
|
seastar::thread::maybe_yield();
|
|
|
|
if (!is_log_schema(*s)) {
|
|
continue;
|
|
}
|
|
|
|
auto drop_stream_mut = make_drop_table_streams_mutations(s->id(), ts);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(drop_stream_mut.begin()), std::make_move_iterator(drop_stream_mut.end()));
|
|
}
|
|
}
|
|
}
|
|
|
|
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>> augment_mutation_call(
|
|
lowres_clock::time_point timeout,
|
|
utils::chunked_vector<mutation>&& mutations,
|
|
tracing::trace_state_ptr tr_state,
|
|
db::consistency_level write_cl,
|
|
per_request_options options
|
|
);
|
|
|
|
template<typename Iter>
|
|
future<> append_mutations(Iter i, Iter e, schema_ptr s, lowres_clock::time_point, utils::chunked_vector<mutation>&);
|
|
|
|
private:
|
|
static void check_for_attempt_to_create_nested_cdc_log(replica::database& db, const schema& schema) {
|
|
const auto& cf_name = schema.cf_name();
|
|
const auto cf_name_view = std::string_view(cf_name.data(), cf_name.size());
|
|
if (is_log_for_some_table(db, schema.ks_name(), cf_name_view)) {
|
|
throw exceptions::invalid_request_exception(format("Cannot create a CDC log for a table {}.{}, because creating nested CDC logs is not allowed",
|
|
schema.ks_name(), schema.cf_name()));
|
|
}
|
|
}
|
|
|
|
static void check_that_cdc_log_table_does_not_exist(replica::database& db, const schema& schema, const sstring& logname) {
|
|
if (db.has_schema(schema.ks_name(), logname)) {
|
|
throw exceptions::invalid_request_exception(format("Cannot create CDC log table for table {}.{} because a table of name {}.{} already exists",
|
|
schema.ks_name(), schema.cf_name(),
|
|
schema.ks_name(), logname));
|
|
}
|
|
}
|
|
|
|
static void ensure_that_table_has_no_counter_columns(const schema& schema) {
|
|
if (schema.is_counter()) {
|
|
throw exceptions::invalid_request_exception(format("Cannot create CDC log for table {}.{}. Counter support not implemented",
|
|
schema.ks_name(), schema.cf_name()));
|
|
}
|
|
}
|
|
|
|
// Until we support CDC with tablets (issue #16317), we can't allow this
|
|
// to be attempted - in particular the log table we try to create will not
|
|
// have tablets, and will cause a failure.
|
|
static void ensure_that_table_uses_vnodes(const keyspace_metadata& ksm, const schema& schema, const locator::topology& topo) {
|
|
auto rs = generate_replication_strategy(ksm, topo);
|
|
if (rs->uses_tablets()) {
|
|
throw exceptions::invalid_request_exception(format("Cannot create CDC log for a table {}.{}, because the keyspace uses tablets, and not all nodes support the CDC with tablets feature.",
|
|
schema.ks_name(), schema.cf_name()));
|
|
}
|
|
}
|
|
};
|
|
|
|
cdc::cdc_service::cdc_service(service::storage_proxy& proxy, cdc::metadata& cdc_metadata, service::migration_notifier& notifier)
|
|
: cdc_service(db_context(proxy, cdc_metadata, notifier))
|
|
{}
|
|
|
|
cdc::cdc_service::cdc_service(db_context ctxt)
|
|
: _impl(std::make_unique<impl>(std::move(ctxt)))
|
|
{
|
|
_impl->_ctxt._proxy.set_cdc_service(this);
|
|
}
|
|
|
|
future<> cdc::cdc_service::stop() {
|
|
_impl->_ctxt._proxy.set_cdc_service(nullptr);
|
|
return _impl->stop();
|
|
}
|
|
|
|
cdc::cdc_service::~cdc_service() = default;
|
|
|
|
namespace {
|
|
static constexpr std::string_view delta_mode_string_keys = "keys";
|
|
static constexpr std::string_view delta_mode_string_full = "full";
|
|
|
|
static constexpr std::string_view image_mode_string_full = delta_mode_string_full;
|
|
|
|
} // anon. namespace
|
|
|
|
auto fmt::formatter<cdc::delta_mode>::format(cdc::delta_mode m, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
using enum cdc::delta_mode;
|
|
switch (m) {
|
|
case keys:
|
|
return fmt::format_to(ctx.out(), delta_mode_string_keys);
|
|
case full:
|
|
return fmt::format_to(ctx.out(), delta_mode_string_full);
|
|
}
|
|
throw std::logic_error("Impossible value of cdc::delta_mode");
|
|
}
|
|
|
|
auto fmt::formatter<cdc::image_mode>::format(cdc::image_mode m, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
using enum cdc::image_mode;
|
|
switch (m) {
|
|
case off:
|
|
return fmt::format_to(ctx.out(), "false");
|
|
case on:
|
|
return fmt::format_to(ctx.out(), "true");
|
|
break;
|
|
case full:
|
|
return fmt::format_to(ctx.out(), image_mode_string_full);
|
|
}
|
|
throw std::logic_error("Impossible value of cdc::image_mode");
|
|
}
|
|
|
|
cdc::options::options(const std::map<sstring, sstring>& map) {
|
|
for (auto& p : map) {
|
|
auto key = p.first;
|
|
auto val = p.second;
|
|
|
|
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
|
|
std::transform(val.begin(), val.end(), val.begin(), ::tolower);
|
|
|
|
auto is_true = val == "true" || val == "1";
|
|
auto is_false = val == "false" || val == "0";
|
|
|
|
if (key == "enabled") {
|
|
if (is_true || is_false) {
|
|
enabled(is_true);
|
|
} else {
|
|
throw exceptions::configuration_exception("Invalid value for CDC option \"enabled\": " + p.second);
|
|
}
|
|
} else if (key == "preimage") {
|
|
if (is_true) {
|
|
_preimage = image_mode::on;
|
|
} else if (val == image_mode_string_full) {
|
|
_preimage = image_mode::full;
|
|
} else if (is_false) {
|
|
_preimage = image_mode::off;
|
|
} else {
|
|
throw exceptions::configuration_exception("Invalid value for CDC option \"preimage\": " + p.second);
|
|
}
|
|
} else if (key == "postimage") {
|
|
if (is_true || is_false) {
|
|
_postimage = is_true;
|
|
} else {
|
|
throw exceptions::configuration_exception("Invalid value for CDC option \"postimage\": " + p.second);
|
|
}
|
|
} else if (key == "delta") {
|
|
if (val == delta_mode_string_keys) {
|
|
_delta_mode = delta_mode::keys;
|
|
} else if (val != delta_mode_string_full) {
|
|
throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second);
|
|
}
|
|
} else if (key == "ttl") {
|
|
try {
|
|
_ttl = std::stoi(p.second);
|
|
} catch (std::invalid_argument& e) {
|
|
throw exceptions::configuration_exception("Invalid value for CDC option \"ttl\": " + p.second);
|
|
} catch (std::out_of_range& e) {
|
|
throw exceptions::configuration_exception("Invalid CDC option: ttl too large");
|
|
}
|
|
if (_ttl < 0) {
|
|
throw exceptions::configuration_exception("Invalid CDC option: ttl must be >= 0");
|
|
}
|
|
} else {
|
|
throw exceptions::configuration_exception("Invalid CDC option: " + p.first);
|
|
}
|
|
}
|
|
}
|
|
|
|
std::map<sstring, sstring> cdc::options::to_map() const {
|
|
if (!is_enabled_set()) {
|
|
return {};
|
|
}
|
|
|
|
return {
|
|
{ "enabled", enabled() ? "true" : "false" },
|
|
{ "preimage", fmt::format("{}", _preimage) },
|
|
{ "postimage", _postimage ? "true" : "false" },
|
|
{ "delta", fmt::format("{}", _delta_mode) },
|
|
{ "ttl", std::to_string(_ttl) },
|
|
};
|
|
}
|
|
|
|
sstring cdc::options::to_sstring() const {
|
|
return rjson::print(rjson::from_string_map(to_map()));
|
|
}
|
|
|
|
bool cdc::options::operator==(const options& o) const {
|
|
return enabled() == o.enabled() && _preimage == o._preimage && _postimage == o._postimage && _ttl == o._ttl
|
|
&& _delta_mode == o._delta_mode;
|
|
}
|
|
|
|
namespace cdc {
|
|
|
|
using operation_native_type = std::underlying_type_t<operation>;
|
|
|
|
static const sstring cdc_log_suffix = "_scylla_cdc_log";
|
|
static const sstring cdc_meta_column_prefix = "cdc$";
|
|
static const sstring cdc_deleted_column_prefix = cdc_meta_column_prefix + "deleted_";
|
|
static const sstring cdc_deleted_elements_column_prefix = cdc_meta_column_prefix + "deleted_elements_";
|
|
|
|
bool cdc_enabled(const schema& s) {
|
|
return s.cdc_options().enabled() || secondary_index::vector_index::has_vector_index(s);
|
|
}
|
|
|
|
bool is_log_name(const std::string_view& table_name) {
|
|
return table_name.ends_with(cdc_log_suffix);
|
|
}
|
|
|
|
bool is_log_schema(const schema& s) {
|
|
return s.get_partitioner().name() == cdc::cdc_partitioner::classname;
|
|
}
|
|
|
|
bool is_cdc_metacolumn_name(const sstring& name) {
|
|
return name.compare(0, cdc_meta_column_prefix.size(), cdc_meta_column_prefix) == 0;
|
|
}
|
|
|
|
bool is_log_for_some_table(const replica::database& db, const sstring& ks_name, const std::string_view& table_name) {
|
|
auto base_schema = get_base_table(db, ks_name, table_name);
|
|
if (!base_schema) {
|
|
return false;
|
|
}
|
|
return cdc_enabled(*base_schema);
|
|
}
|
|
|
|
schema_ptr get_base_table(const replica::database& db, const schema& s) {
|
|
return get_base_table(db, s.ks_name(), s.cf_name());
|
|
}
|
|
|
|
schema_ptr get_base_table(const replica::database& db, std::string_view ks_name, std::string_view table_name) {
|
|
if (!is_log_name(table_name)) {
|
|
return nullptr;
|
|
}
|
|
// Note: It is legal for a user to directly create a table with name
|
|
// `X_scylla_cdc_log`. A table with name `X` might be present (but with
|
|
// cdc log disabled), or not present at all when creating `X_scylla_cdc_log`.
|
|
// Therefore, existence of `X_scylla_cdc_log` does not imply existence of `X`
|
|
// and, in order not to throw, we explicitly need to check for existence of 'X'.
|
|
const auto table_base_name = base_name(table_name);
|
|
if (!db.has_schema(ks_name, table_base_name)) {
|
|
return nullptr;
|
|
}
|
|
return db.find_schema(sstring(ks_name), table_base_name);
|
|
}
|
|
|
|
seastar::sstring base_name(std::string_view log_name) {
|
|
SCYLLA_ASSERT(is_log_name(log_name));
|
|
return sstring(log_name.data(), log_name.size() - cdc_log_suffix.size());
|
|
}
|
|
|
|
sstring log_name(std::string_view table_name) {
|
|
return sstring(table_name) + cdc_log_suffix;
|
|
}
|
|
|
|
sstring log_data_column_name(std::string_view column_name) {
|
|
return sstring(column_name);
|
|
}
|
|
|
|
seastar::sstring log_meta_column_name(std::string_view column_name) {
|
|
return cdc_meta_column_prefix + sstring(column_name);
|
|
}
|
|
|
|
bytes log_data_column_name_bytes(const bytes& column_name) {
|
|
return column_name;
|
|
}
|
|
|
|
bytes log_meta_column_name_bytes(const bytes& column_name) {
|
|
return to_bytes(cdc_meta_column_prefix) + column_name;
|
|
}
|
|
|
|
seastar::sstring log_data_column_deleted_name(std::string_view column_name) {
|
|
return cdc_deleted_column_prefix + sstring(column_name);
|
|
}
|
|
|
|
bytes log_data_column_deleted_name_bytes(const bytes& column_name) {
|
|
return to_bytes(cdc_deleted_column_prefix) + column_name;
|
|
}
|
|
|
|
seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name) {
|
|
return cdc_deleted_elements_column_prefix + sstring(column_name);
|
|
}
|
|
|
|
bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
|
|
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
|
|
}
|
|
|
|
static void set_default_properties_log_table(schema_builder& b, const schema& s,
|
|
const replica::database& db, const keyspace_metadata& ksm)
|
|
{
|
|
b.set_compaction_strategy(compaction::compaction_strategy_type::time_window);
|
|
b.set_comment(fmt::format("CDC log for {}.{}", s.ks_name(), s.cf_name()));
|
|
auto ttl_seconds = s.cdc_options().ttl();
|
|
if (ttl_seconds > 0) {
|
|
b.set_gc_grace_seconds(0);
|
|
auto ceil = [] (int dividend, int divisor) {
|
|
return dividend / divisor + (dividend % divisor == 0 ? 0 : 1);
|
|
};
|
|
auto seconds_to_minutes = [] (int seconds_value) {
|
|
using namespace std::chrono;
|
|
return std::chrono::ceil<minutes>(seconds(seconds_value)).count();
|
|
};
|
|
// What's the minimum window that won't create more than 24 sstables.
|
|
auto window_seconds = ceil(ttl_seconds, 24);
|
|
auto window_minutes = seconds_to_minutes(window_seconds);
|
|
b.set_compaction_strategy_options({
|
|
{"compaction_window_unit", "MINUTES"},
|
|
{"compaction_window_size", std::to_string(window_minutes)},
|
|
// A new SSTable will become fully expired every
|
|
// `window_seconds` seconds so we shouldn't check for expired
|
|
// sstables too often.
|
|
{"expired_sstable_check_frequency_seconds",
|
|
std::to_string(std::max(1, window_seconds / 2))},
|
|
});
|
|
}
|
|
b.set_caching_options(caching_options::get_disabled_caching_options());
|
|
|
|
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
|
|
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, false));
|
|
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
|
|
}
|
|
|
|
static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
|
|
const api::timestamp_type timestamp, const schema_ptr old)
|
|
{
|
|
b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key);
|
|
b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key);
|
|
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
|
|
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
|
|
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
|
|
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
|
|
|
|
auto validate_new_column = [&] (const sstring& name) {
|
|
// When dropping a column from a CDC log table, we set the drop timestamp to be
|
|
// `column_drop_leeway` seconds into the future (see `create_log_schema`).
|
|
// Therefore, when recreating a column with the same name, we need to validate
|
|
// that it's not recreated too soon and that the drop timestamp has passed.
|
|
if (old && old->dropped_columns().contains(name)) {
|
|
const auto& drop_info = old->dropped_columns().at(name);
|
|
auto create_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(timestamp));
|
|
auto drop_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(drop_info.timestamp));
|
|
if (drop_time > create_time) {
|
|
throw exceptions::invalid_request_exception(format("Cannot add column {} because a column with the same name was dropped too recently. Please retry after {} seconds",
|
|
name, std::chrono::duration_cast<std::chrono::seconds>(drop_time - create_time).count() + 1));
|
|
}
|
|
}
|
|
};
|
|
|
|
auto add_column = [&] (sstring name, data_type type) {
|
|
validate_new_column(name);
|
|
b.with_column(to_bytes(name), type);
|
|
};
|
|
|
|
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
|
|
for (const auto& column : columns) {
|
|
auto type = column.type;
|
|
if (type->get_kind() == abstract_type::kind::empty) {
|
|
if (!(s.is_dense() && s.regular_columns_count() == 1)) {
|
|
on_internal_error(cdc_log, "Unexpected column with EMPTY type");
|
|
}
|
|
continue;
|
|
}
|
|
if (is_data_col && type->is_multi_cell()) {
|
|
type = visit(*type, make_visitor(
|
|
// non-frozen lists are represented as map<timeuuid, value_type>. Otherwise we cannot express delta
|
|
[] (const list_type_impl& type) -> data_type {
|
|
return map_type_impl::get_instance(type.name_comparator(), type.value_comparator(), false);
|
|
},
|
|
// everything else is just frozen self
|
|
[] (const abstract_type& type) {
|
|
return type.freeze();
|
|
}
|
|
));
|
|
}
|
|
add_column(log_data_column_name(column.name_as_text()), type);
|
|
if (is_data_col) {
|
|
add_column(log_data_column_deleted_name(column.name_as_text()), boolean_type);
|
|
}
|
|
if (column.type->is_multi_cell()) {
|
|
auto dtype = visit(*type, make_visitor(
|
|
// all collection deletes are set<key_type> (i.e. timeuuid for lists)
|
|
[] (const collection_type_impl& type) -> data_type {
|
|
return set_type_impl::get_instance(type.name_comparator(), false);
|
|
},
|
|
// user types deletes are a set of the indices removed
|
|
[] (const user_type_impl& type) -> data_type {
|
|
return set_type_impl::get_instance(short_type, false);
|
|
},
|
|
[] (const abstract_type& type) -> data_type {
|
|
throw std::invalid_argument("Should not reach");
|
|
}
|
|
));
|
|
add_column(log_data_column_deleted_elements_name(column.name_as_text()), dtype);
|
|
}
|
|
}
|
|
};
|
|
add_columns(s.partition_key_columns());
|
|
add_columns(s.clustering_key_columns());
|
|
add_columns(s.static_columns(), true);
|
|
add_columns(s.regular_columns(), true);
|
|
}
|
|
|
|
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
|
|
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
|
|
{
|
|
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
|
|
|
b.with_partitioner(cdc::cdc_partitioner::classname);
|
|
|
|
if (old) {
|
|
// If the user reattaches the log table, do not change its properties.
|
|
b.set_properties(old->get_properties());
|
|
} else {
|
|
set_default_properties_log_table(b, s, db, ksm);
|
|
}
|
|
|
|
add_columns_to_cdc_log(b, s, timestamp, old);
|
|
|
|
if (uuid) {
|
|
b.set_uuid(*uuid);
|
|
}
|
|
|
|
/**
|
|
* #10473 - if we are redefining the log table, we need to ensure any dropped
|
|
* columns are registered in "dropped_columns" table, otherwise clients will not
|
|
* be able to read data older than now.
|
|
*/
|
|
if (old) {
|
|
// not super efficient, but we don't do this often.
|
|
for (auto& col : old->all_columns()) {
|
|
if (!b.has_column({col.name(), col.name_as_text() })) {
|
|
auto drop_ts = api::timestamp_clock::now() + column_drop_leeway;
|
|
b.without_column(col.name_as_text(), col.type, drop_ts.time_since_epoch().count());
|
|
}
|
|
}
|
|
}
|
|
|
|
return b.build();
|
|
}
|
|
|
|
// iterators for collection merge
|
|
template<typename T>
|
|
class collection_iterator {
|
|
public:
|
|
using iterator_category = std::input_iterator_tag;
|
|
using value_type = const T;
|
|
using difference_type = std::ptrdiff_t;
|
|
using pointer = const T*;
|
|
using reference = const T&;
|
|
private:
|
|
managed_bytes_view _v, _next;
|
|
size_t _rem = 0;
|
|
T _current;
|
|
public:
|
|
collection_iterator(managed_bytes_view_opt v = {})
|
|
: _v(v.value_or(managed_bytes_view{}))
|
|
, _rem(_v.empty() ? 0 : read_collection_size(_v))
|
|
{
|
|
if (_rem != 0) {
|
|
parse();
|
|
}
|
|
}
|
|
collection_iterator(const collection_iterator&) = default;
|
|
const T& operator*() const {
|
|
return _current;
|
|
}
|
|
const T* operator->() const {
|
|
return &_current;
|
|
}
|
|
collection_iterator& operator++() {
|
|
next();
|
|
if (_rem != 0) {
|
|
parse();
|
|
} else {
|
|
_current = {};
|
|
}
|
|
return *this;
|
|
}
|
|
collection_iterator operator++(int) {
|
|
auto v = *this;
|
|
++(*this);
|
|
return v;
|
|
}
|
|
bool operator==(const collection_iterator& x) const {
|
|
return _v == x._v;
|
|
}
|
|
private:
|
|
void next() {
|
|
--_rem;
|
|
_v = _next;
|
|
}
|
|
void parse();
|
|
};
|
|
|
|
template<>
|
|
void collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>>::parse() {
|
|
SCYLLA_ASSERT(_rem > 0);
|
|
_next = _v;
|
|
auto k = read_collection_key(_next);
|
|
auto v = read_collection_value_nonnull(_next);
|
|
_current = std::make_pair(k, v);
|
|
}
|
|
|
|
template<>
|
|
void collection_iterator<managed_bytes_view>::parse() {
|
|
SCYLLA_ASSERT(_rem > 0);
|
|
_next = _v;
|
|
auto k = read_collection_key(_next);
|
|
_current = k;
|
|
}
|
|
|
|
template<>
|
|
void collection_iterator<managed_bytes_view_opt>::parse() {
|
|
SCYLLA_ASSERT(_rem > 0);
|
|
_next = _v;
|
|
auto k = read_collection_value_nonnull(_next);
|
|
_current = k;
|
|
}
|
|
|
|
template<typename Container, typename T>
|
|
class maybe_back_insert_iterator : public std::back_insert_iterator<Container> {
|
|
const abstract_type& _type;
|
|
collection_iterator<T> _s, _e;
|
|
public:
|
|
using value_type = typename Container::value_type;
|
|
maybe_back_insert_iterator(Container& c, const abstract_type& type, collection_iterator<T> s)
|
|
: std::back_insert_iterator<Container>(c)
|
|
, _type(type)
|
|
, _s(s)
|
|
{}
|
|
maybe_back_insert_iterator& operator*() {
|
|
return *this;
|
|
}
|
|
maybe_back_insert_iterator& operator=(const value_type& v) {
|
|
if (!find(v)) {
|
|
std::back_insert_iterator<Container>::operator=(v);
|
|
}
|
|
return *this;
|
|
}
|
|
maybe_back_insert_iterator& operator=(value_type&& v) {
|
|
if (!find(v)) {
|
|
std::back_insert_iterator<Container>::operator=(std::move(v));
|
|
}
|
|
return *this;
|
|
}
|
|
private:
|
|
bool find(const value_type& v) {
|
|
// cheating - reducing search span, because we know we only append unique values (see below).
|
|
while (_s != _e) {
|
|
auto n = compare(*_s, v);
|
|
if (n <= 0) {
|
|
++_s;
|
|
}
|
|
if (n == 0) {
|
|
return true;
|
|
}
|
|
if (n > 0) {
|
|
break;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
std::strong_ordering compare(const T&, const value_type& v);
|
|
};
|
|
|
|
template<>
|
|
std::strong_ordering maybe_back_insert_iterator<std::vector<std::pair<managed_bytes_view, managed_bytes_view>>, managed_bytes_view>::compare(
|
|
const managed_bytes_view& t, const value_type& v) {
|
|
return _type.compare(t, v.first);
|
|
}
|
|
|
|
template<>
|
|
std::strong_ordering maybe_back_insert_iterator<std::vector<managed_bytes_view>, managed_bytes_view>::compare(const managed_bytes_view& t, const value_type& v) {
|
|
return _type.compare(t, v);
|
|
}
|
|
|
|
template<>
|
|
std::strong_ordering maybe_back_insert_iterator<std::vector<managed_bytes_view_opt>, managed_bytes_view_opt>::compare(const managed_bytes_view_opt& t, const value_type& v) {
|
|
if (!t.has_value() || !v.has_value()) {
|
|
return unsigned(t.has_value()) <=> unsigned(v.has_value());
|
|
}
|
|
return _type.compare(*t, *v);
|
|
}
|
|
|
|
template<typename Container, typename T>
|
|
auto make_maybe_back_inserter(Container& c, const abstract_type& type, collection_iterator<T> s) {
|
|
return maybe_back_insert_iterator<Container, T>(c, type, s);
|
|
}
|
|
|
|
static size_t collection_size(const managed_bytes_opt& bo) {
|
|
if (bo) {
|
|
managed_bytes_view mbv(*bo);
|
|
return read_collection_size(mbv);
|
|
}
|
|
return 0;
|
|
}
|
|
template<typename Func>
|
|
static void udt_for_each(const managed_bytes_opt& bo, Func&& f) {
|
|
if (bo) {
|
|
managed_bytes_view mbv(*bo);
|
|
std::for_each(tuple_deserializing_iterator::start(mbv), tuple_deserializing_iterator::finish(mbv), std::forward<Func>(f));
|
|
}
|
|
}
|
|
static managed_bytes merge(const collection_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> res;
|
|
res.reserve(collection_size(prev) + collection_size(next));
|
|
auto type = ctype.name_comparator();
|
|
auto cmp = [&type = *type](const std::pair<managed_bytes_view, managed_bytes_view>& p1, const std::pair<managed_bytes_view, managed_bytes_view>& p2) {
|
|
return type.compare(p1.first, p2.first) < 0;
|
|
};
|
|
collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>> e, i(prev), j(next);
|
|
// note order: set_union, when finding doubles, use value from first1 (j here). So
|
|
// since this is next, it has prio
|
|
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator<managed_bytes_view>(deleted)), cmp);
|
|
return map_type_impl::serialize_partially_deserialized_form_fragmented(res);
|
|
}
|
|
static managed_bytes merge(const set_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
std::vector<managed_bytes_view_opt> res;
|
|
res.reserve(collection_size(prev) + collection_size(next));
|
|
auto type = ctype.name_comparator();
|
|
auto cmp = [&type = *type](managed_bytes_view_opt k1, managed_bytes_view_opt k2) {
|
|
if (!k1.has_value() || !k2.has_value()) {
|
|
return unsigned(k1.has_value()) < unsigned(k2.has_value());
|
|
}
|
|
return type.compare(*k1, *k2) < 0;
|
|
};
|
|
collection_iterator<managed_bytes_view_opt> e, i(prev), j(next), d(deleted);
|
|
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, d), cmp);
|
|
return set_type_impl::serialize_partially_deserialized_form_fragmented(res);
|
|
}
|
|
static managed_bytes merge(const user_type_impl& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
std::vector<managed_bytes_view_opt> res(type.size());
|
|
udt_for_each(prev, [i = res.begin()](managed_bytes_view_opt k) mutable {
|
|
*i++ = k;
|
|
});
|
|
udt_for_each(next, [i = res.begin()](managed_bytes_view_opt k) mutable {
|
|
if (k) {
|
|
*i = k;
|
|
}
|
|
++i;
|
|
});
|
|
collection_iterator<managed_bytes_view> e, d(deleted);
|
|
std::for_each(d, e, [&res](managed_bytes_view k) {
|
|
auto index = deserialize_field_index(k);
|
|
res[index] = std::nullopt;
|
|
});
|
|
return type.build_value_fragmented(res);
|
|
}
|
|
static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
|
|
}
|
|
|
|
static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
|
|
if (state) {
|
|
if (auto it = state->find(&cdef); it != state->end()) {
|
|
return it->second;
|
|
}
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
|
|
auto it = row_states.find(ck);
|
|
return it == row_states.end() ? nullptr : &it->second;
|
|
}
|
|
|
|
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck) {
|
|
auto it = row_states.find(ck);
|
|
return it == row_states.end() ? nullptr : &it->second;
|
|
}
|
|
|
|
static managed_bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
|
|
if (!pirow || !pirow->has(cdef.name_as_text())) {
|
|
return std::nullopt;
|
|
}
|
|
return cdef.is_atomic()
|
|
? pirow->get_blob_fragmented(cdef.name_as_text())
|
|
: visit(*cdef.type, make_visitor(
|
|
// flatten set
|
|
[&] (const set_type_impl& type) {
|
|
auto v = pirow->get_view(cdef.name_as_text());
|
|
auto n = read_collection_size(v);
|
|
std::vector<managed_bytes> tmp;
|
|
tmp.reserve(n);
|
|
while (n--) {
|
|
tmp.emplace_back(read_collection_key(v)); // key
|
|
read_collection_value_nonnull(v); // value. ignore.
|
|
}
|
|
return set_type_impl::serialize_partially_deserialized_form_fragmented({tmp.begin(), tmp.end()});
|
|
},
|
|
[&] (const abstract_type& o) -> managed_bytes {
|
|
return pirow->get_blob_fragmented(cdef.name_as_text());
|
|
}
|
|
));
|
|
}
|
|
|
|
/* Given a timestamp, generates a timeuuid with the following properties:
|
|
* 1. `t1` < `t2` implies timeuuid_type->less(timeuuid_type->decompose(generate_timeuuid(`t1`)),
|
|
* timeuuid_type->decompose(generate_timeuuid(`t2`))),
|
|
* 2. utils::UUID_gen::micros_timestamp(generate_timeuuid(`t`)) == `t`.
|
|
*
|
|
* If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`),
|
|
* with unspecified nondeterministic ordering.
|
|
*/
|
|
utils::UUID generate_timeuuid(api::timestamp_type t) {
|
|
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{t});
|
|
}
|
|
|
|
class log_mutation_builder {
|
|
const schema& _base_schema;
|
|
const schema& _log_schema;
|
|
const column_definition& _op_col;
|
|
const column_definition& _ttl_col;
|
|
|
|
// The base mutation's partition key
|
|
std::vector<managed_bytes> _base_pk;
|
|
|
|
// The cdc$time value of created rows
|
|
const bytes _tuuid;
|
|
// The timestamp of the created log mutation cells
|
|
const api::timestamp_type _ts;
|
|
// The ttl of the created log mutation cells
|
|
const ttl_opt _ttl;
|
|
|
|
// Keeps the next cdc$batch_seq_no value
|
|
int _batch_no = 0;
|
|
|
|
// The mutation under construction
|
|
mutation& _log_mut;
|
|
|
|
public:
|
|
log_mutation_builder(mutation& log_mut, api::timestamp_type ts,
|
|
const partition_key& base_pk, const schema& base_schema)
|
|
: _base_schema(base_schema), _log_schema(*log_mut.schema()),
|
|
_op_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("operation"))),
|
|
_ttl_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("ttl"))),
|
|
_base_pk(base_pk.explode_fragmented()),
|
|
_tuuid(timeuuid_type->decompose(generate_timeuuid(ts))),
|
|
_ts(ts),
|
|
_ttl(_base_schema.cdc_options().ttl()
|
|
? std::optional{std::chrono::seconds(_base_schema.cdc_options().ttl())} : std::nullopt),
|
|
_log_mut(log_mut)
|
|
{}
|
|
|
|
const schema& base_schema() const {
|
|
return _base_schema;
|
|
}
|
|
|
|
clustering_key create_ck(int batch) const {
|
|
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
|
|
}
|
|
|
|
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
|
|
// The numbering of batch sequence numbers starts from 0.
|
|
clustering_key allocate_new_log_row() {
|
|
auto log_ck = create_ck(_batch_no++);
|
|
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
|
|
return log_ck;
|
|
}
|
|
|
|
bool has_rows() const {
|
|
return _batch_no != 0;
|
|
}
|
|
|
|
clustering_key last_row_key() const {
|
|
return create_ck(_batch_no - 1);
|
|
}
|
|
|
|
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
|
|
clustering_key allocate_new_log_row(operation op) {
|
|
auto log_ck = allocate_new_log_row();
|
|
set_operation(log_ck, op);
|
|
return log_ck;
|
|
}
|
|
|
|
// Each clustering key column in the base schema has a corresponding column in the log schema with the same name.
|
|
// This takes a base schema clustering key prefix and sets these columns
|
|
// according to the prefix' values for the given log row.
|
|
void set_clustering_columns(const clustering_key& log_ck, const clustering_key_prefix& base_ckey) {
|
|
set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode_fragmented());
|
|
}
|
|
|
|
// Sets the `cdc$operation` column for the given row.
|
|
void set_operation(const clustering_key& log_ck, operation op) {
|
|
_log_mut.set_cell(log_ck, _op_col, atomic_cell::make_live(
|
|
*_op_col.type, _ts, _op_col.type->decompose(operation_native_type(op)), _ttl));
|
|
}
|
|
|
|
// Sets the `cdc$ttl` column for the given row.
|
|
// Warning: if the caller wants `cdc$ttl` to be null, they shouldn't call `set_ttl` with a non-null value.
|
|
// Calling it with a non-null value and then with a null value will keep the non-null value.
|
|
void set_ttl(const clustering_key& log_ck, ttl_opt ttl) {
|
|
if (ttl) {
|
|
_log_mut.set_cell(log_ck, _ttl_col, atomic_cell::make_live(
|
|
*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl->count()), _ttl));
|
|
}
|
|
}
|
|
|
|
// Each regular and static column in the base schema has a corresponding column in the log schema with the same name.
|
|
// Given a reference to such a column from the base schema, this function sets the corresponding column
|
|
// in the log to the given value for the given row.
|
|
void set_value(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes_view& value) {
|
|
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_name_bytes(base_cdef.name()));
|
|
if (!log_cdef_ptr) {
|
|
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
|
|
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
|
|
}
|
|
_log_mut.set_cell(log_ck, *log_cdef_ptr, atomic_cell::make_live(*base_cdef.type, _ts, value, _ttl));
|
|
}
|
|
|
|
// Each regular and static column in the base schema has a corresponding column in the log schema
|
|
// with boolean type and the name constructed by prefixing the original name with ``cdc$deleted_''
|
|
// Given a reference to such a column from the base schema, this function sets the corresponding column
|
|
// in the log to `true` for the given row. If not called, the column will be `null`.
|
|
void set_deleted(const clustering_key& log_ck, const column_definition& base_cdef) {
|
|
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_deleted_name_bytes(base_cdef.name()));
|
|
if (!log_cdef_ptr) {
|
|
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
|
|
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
|
|
}
|
|
auto& log_cdef = *log_cdef_ptr;
|
|
_log_mut.set_cell(log_ck, *log_cdef_ptr, atomic_cell::make_live(*log_cdef.type, _ts, log_cdef.type->decompose(true), _ttl));
|
|
}
|
|
|
|
// Each regular and static non-atomic column in the base schema has a corresponding column in the log schema
|
|
// whose type is a frozen `set` of keys (the types of which depend on the base type) and whose name is constructed
|
|
// by prefixing the original name with ``cdc$deleted_elements_''.
|
|
// Given a reference to such a column from the base schema, this function sets the corresponding column
|
|
// in the log to the given set of keys for the given row.
|
|
void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes& deleted_elements) {
|
|
auto log_cdef_ptr = _log_schema.get_column_definition(log_data_column_deleted_elements_name_bytes(base_cdef.name()));
|
|
if (!log_cdef_ptr) {
|
|
throw exceptions::invalid_request_exception(format("CDC log schema for {}.{} does not have base column {}",
|
|
_log_schema.ks_name(), _log_schema.cf_name(), base_cdef.name_as_text()));
|
|
}
|
|
auto& log_cdef = *log_cdef_ptr;
|
|
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
|
|
}
|
|
|
|
void end_record() {
|
|
if (has_rows()) {
|
|
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
|
|
}
|
|
}
|
|
private:
|
|
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<managed_bytes>& key) {
|
|
size_t pos = 0;
|
|
for (auto& column : columns) {
|
|
if (pos >= key.size()) {
|
|
break;
|
|
}
|
|
auto& cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(column.name()));
|
|
_log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, managed_bytes_view(key[pos]), _ttl));
|
|
++pos;
|
|
}
|
|
}
|
|
};
|
|
|
|
static managed_bytes get_managed_bytes(const atomic_cell_view& acv) {
|
|
return managed_bytes(acv.value());
|
|
}
|
|
|
|
static ttl_opt get_ttl(const atomic_cell_view& acv) {
|
|
return acv.is_live_and_has_ttl() ? std::optional{acv.ttl()} : std::nullopt;
|
|
}
|
|
|
|
static ttl_opt get_ttl(const row_marker& rm) {
|
|
return rm.is_expiring() ? std::optional{rm.ttl()} : std::nullopt;
|
|
}
|
|
|
|
/**
|
|
* Returns whether we should generate cdc delta values (beyond keys)
|
|
*/
|
|
static bool generate_delta_values(const schema& s) {
|
|
return s.cdc_options().get_delta_mode() == cdc::delta_mode::full;
|
|
}
|
|
|
|
/* Visits the cells and tombstones of a single base mutation row and constructs corresponding delta-row cells
|
|
* for the corresponding log mutation.
|
|
*
|
|
* Additionally updates state required to produce pre/post-image if configured to do so (`enable_updating_state`).
|
|
*/
|
|
struct process_row_visitor {
|
|
const clustering_key& _log_ck;
|
|
|
|
stats::part_type_set& _touched_parts;
|
|
|
|
// The base row being visited gets a single corresponding log delta row.
|
|
// This is the value of the "cdc$ttl" column for that delta row.
|
|
ttl_opt _ttl_column = std::nullopt;
|
|
|
|
// Used to create cells in the corresponding delta row in the log mutation.
|
|
log_mutation_builder& _builder;
|
|
|
|
/* Images-related state */
|
|
const bool _enable_updating_state = false;
|
|
|
|
// Null for the static row, non-null for clustered rows.
|
|
const clustering_key* const _base_ck;
|
|
|
|
// The state required to produce pre/post-image for the row being visited.
|
|
// Might be null if the preimage query didn't return a result for this row.
|
|
cell_map* _row_state;
|
|
|
|
// The state required to produce pre/post-image for all rows.
|
|
// We need to keep a reference to it since we might insert new row_states during the visitation.
|
|
row_states_map& _clustering_row_states;
|
|
|
|
const bool _generate_delta_values = true;
|
|
|
|
process_row_visitor(
|
|
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
|
|
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
|
|
row_states_map& clustering_row_states, bool generate_delta_values)
|
|
: _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder),
|
|
_enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state),
|
|
_clustering_row_states(clustering_row_states),
|
|
_generate_delta_values(generate_delta_values)
|
|
{}
|
|
|
|
void update_row_state(const column_definition& cdef, managed_bytes_opt value) {
|
|
if (!_row_state) {
|
|
// static row always has a valid state, so this must be a clustering row missing
|
|
SCYLLA_ASSERT(_base_ck);
|
|
auto [it, _] = _clustering_row_states.try_emplace(*_base_ck);
|
|
_row_state = &it->second;
|
|
}
|
|
(*_row_state)[&cdef] = std::move(value);
|
|
}
|
|
|
|
void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) {
|
|
_ttl_column = get_ttl(cell);
|
|
|
|
if (cdef.type->get_kind() == abstract_type::kind::empty) {
|
|
return;
|
|
}
|
|
|
|
managed_bytes value = get_managed_bytes(cell);
|
|
|
|
// delta
|
|
if (_generate_delta_values) {
|
|
_builder.set_value(_log_ck, cdef, value);
|
|
}
|
|
|
|
// images
|
|
if (_enable_updating_state) {
|
|
update_row_state(cdef, std::move(value));
|
|
}
|
|
}
|
|
|
|
void dead_atomic_cell(const column_definition& cdef, const atomic_cell_view&) {
|
|
// delta
|
|
if (_generate_delta_values) {
|
|
_builder.set_deleted(_log_ck, cdef);
|
|
}
|
|
// images
|
|
if (_enable_updating_state) {
|
|
update_row_state(cdef, std::nullopt);
|
|
}
|
|
}
|
|
|
|
void collection_column(const column_definition& cdef, auto&& visit_collection) {
|
|
// The handling of dead cells and the tombstone is common for all collection types,
|
|
// but we need separate visitors for different collection types to handle the live cells.
|
|
// See `set_visitor`, `udt_visitior`, and `map_or_list_visitor` below.
|
|
struct collection_visitor {
|
|
bool _is_column_delete = false;
|
|
std::vector<managed_bytes_view_opt> _deleted_keys;
|
|
|
|
ttl_opt& _ttl_column;
|
|
|
|
collection_visitor(ttl_opt& ttl_column) : _ttl_column(ttl_column) {}
|
|
|
|
void collection_tombstone(const tombstone&) {
|
|
_is_column_delete = true;
|
|
}
|
|
|
|
void dead_collection_cell(bytes_view key, const atomic_cell_view&) {
|
|
_deleted_keys.push_back(key);
|
|
}
|
|
|
|
constexpr bool finished() const { return false; }
|
|
};
|
|
|
|
|
|
// cdc$deleted_col, cdc$deleted_elements_col, col
|
|
using result_t = std::tuple<bool, std::vector<managed_bytes_view_opt>, managed_bytes_opt>;
|
|
auto result = visit(*cdef.type, make_visitor(
|
|
[&] (const set_type_impl&) -> result_t {
|
|
_touched_parts.set<stats::part_type::SET>();
|
|
|
|
struct set_visitor : public collection_visitor {
|
|
std::vector<managed_bytes_view_opt> _added_keys;
|
|
|
|
set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {}
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
this->_ttl_column = get_ttl(cell);
|
|
_added_keys.emplace_back(key);
|
|
}
|
|
} v(_ttl_column);
|
|
|
|
visit_collection(v);
|
|
|
|
managed_bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
|
|
std::optional{set_type_impl::serialize_partially_deserialized_form_fragmented(v._added_keys)};
|
|
|
|
return {
|
|
v._is_column_delete,
|
|
std::move(v._deleted_keys),
|
|
std::move(added_keys)
|
|
};
|
|
},
|
|
[&] (const user_type_impl& type) -> result_t {
|
|
_touched_parts.set<stats::part_type::UDT>();
|
|
|
|
struct udt_visitor : public collection_visitor {
|
|
std::vector<managed_bytes_view_opt> _added_cells;
|
|
|
|
udt_visitor(ttl_opt& ttl_column, size_t num_keys)
|
|
: collection_visitor(ttl_column), _added_cells(num_keys) {}
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
this->_ttl_column = get_ttl(cell);
|
|
_added_cells[deserialize_field_index(key)].emplace(cell.value());
|
|
}
|
|
};
|
|
|
|
udt_visitor v(_ttl_column, type.size());
|
|
|
|
visit_collection(v);
|
|
|
|
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
std::optional{type.build_value_fragmented(v._added_cells)};
|
|
|
|
return {
|
|
v._is_column_delete,
|
|
std::move(v._deleted_keys),
|
|
std::move(added_cells)
|
|
};
|
|
},
|
|
[&] (const collection_type_impl& type) -> result_t {
|
|
_touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP);
|
|
|
|
struct map_or_list_visitor : public collection_visitor {
|
|
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> _added_cells;
|
|
|
|
map_or_list_visitor(ttl_opt& ttl_column)
|
|
: collection_visitor(ttl_column) {}
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
this->_ttl_column = get_ttl(cell);
|
|
_added_cells.emplace_back(key, cell.value());
|
|
}
|
|
};
|
|
|
|
map_or_list_visitor v(_ttl_column);
|
|
|
|
visit_collection(v);
|
|
|
|
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
std::optional{map_type_impl::serialize_partially_deserialized_form_fragmented(v._added_cells)};
|
|
|
|
return {
|
|
v._is_column_delete,
|
|
std::move(v._deleted_keys),
|
|
std::move(added_cells)
|
|
};
|
|
},
|
|
[&] (const abstract_type& o) -> result_t {
|
|
throw std::runtime_error(format("cdc process_change: unknown type {}", o.name()));
|
|
}
|
|
));
|
|
auto&& is_column_delete = std::get<0>(result);
|
|
auto&& deleted_keys = std::get<1>(result);
|
|
auto&& added_cells = std::get<2>(result);
|
|
|
|
// FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob,
|
|
// then we deserialize again when merging images below
|
|
managed_bytes_opt deleted_elements = std::nullopt;
|
|
if (!deleted_keys.empty()) {
|
|
deleted_elements = set_type_impl::serialize_partially_deserialized_form_fragmented(deleted_keys);
|
|
}
|
|
|
|
// delta
|
|
if (_generate_delta_values) {
|
|
if (is_column_delete) {
|
|
_builder.set_deleted(_log_ck, cdef);
|
|
}
|
|
|
|
if (deleted_elements) {
|
|
_builder.set_deleted_elements(_log_ck, cdef, *deleted_elements);
|
|
}
|
|
|
|
if (added_cells) {
|
|
_builder.set_value(_log_ck, cdef, *added_cells);
|
|
}
|
|
}
|
|
|
|
// images
|
|
if (_enable_updating_state) {
|
|
// A column delete overwrites any data we gathered until now.
|
|
managed_bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef);
|
|
|
|
managed_bytes_opt next;
|
|
if (added_cells || (deleted_elements && prev)) {
|
|
next = visit(*cdef.type, [&] (const auto& type) -> managed_bytes {
|
|
return merge(type, prev, added_cells, deleted_elements);
|
|
});
|
|
}
|
|
|
|
update_row_state(cdef, std::move(next));
|
|
}
|
|
}
|
|
|
|
constexpr bool finished() const { return false; }
|
|
};
|
|
|
|
struct process_change_visitor {
|
|
const per_request_options& _request_options;
|
|
// The types of the operations used for row / partition deletes. Introduced
|
|
// to differentiate service operations (e.g. operation::service_row_delete
|
|
// vs operation::row_delete).
|
|
const operation _row_delete_op = operation::row_delete;
|
|
const operation _partition_delete_op = operation::partition_delete;
|
|
|
|
stats::part_type_set& _touched_parts;
|
|
|
|
log_mutation_builder& _builder;
|
|
|
|
/* Images-related state */
|
|
const bool _enable_updating_state = false;
|
|
|
|
row_states_map& _clustering_row_states;
|
|
cell_map& _static_row_state;
|
|
|
|
const bool _is_update = false;
|
|
|
|
const bool _generate_delta_values = true;
|
|
|
|
void static_row_cells(auto&& visit_row_cells) {
|
|
_touched_parts.set<stats::part_type::STATIC_ROW>();
|
|
|
|
auto log_ck = _builder.allocate_new_log_row(operation::update);
|
|
|
|
process_row_visitor v(
|
|
log_ck, _touched_parts, _builder,
|
|
_enable_updating_state, nullptr, &_static_row_state, _clustering_row_states, _generate_delta_values);
|
|
visit_row_cells(v);
|
|
|
|
_builder.set_ttl(log_ck, v._ttl_column);
|
|
}
|
|
|
|
void clustered_row_cells(const clustering_key& ckey, auto&& visit_row_cells) {
|
|
_touched_parts.set<stats::part_type::CLUSTERING_ROW>();
|
|
|
|
auto log_ck = _builder.allocate_new_log_row();
|
|
_builder.set_clustering_columns(log_ck, ckey);
|
|
|
|
struct clustering_row_cells_visitor : public process_row_visitor {
|
|
operation _cdc_op = operation::update;
|
|
operation _marker_op = operation::insert;
|
|
|
|
using process_row_visitor::process_row_visitor;
|
|
|
|
void marker(const row_marker& rm) {
|
|
_ttl_column = get_ttl(rm);
|
|
_cdc_op = _marker_op;
|
|
}
|
|
};
|
|
|
|
clustering_row_cells_visitor v(
|
|
log_ck, _touched_parts, _builder,
|
|
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
|
|
_clustering_row_states, _generate_delta_values);
|
|
if (_is_update && _request_options.alternator) {
|
|
v._marker_op = operation::update;
|
|
}
|
|
visit_row_cells(v);
|
|
|
|
if (_enable_updating_state) {
|
|
// #7716: if there are no regular columns, our visitor would not have visited any cells,
|
|
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
|
|
// Ensure that the row state exists.
|
|
_clustering_row_states.try_emplace(ckey);
|
|
}
|
|
|
|
_builder.set_operation(log_ck, v._cdc_op);
|
|
_builder.set_ttl(log_ck, v._ttl_column);
|
|
}
|
|
|
|
void clustered_row_delete(const clustering_key& ckey, const tombstone&) {
|
|
_touched_parts.set<stats::part_type::ROW_DELETE>();
|
|
|
|
auto log_ck = _builder.allocate_new_log_row(_row_delete_op);
|
|
_builder.set_clustering_columns(log_ck, ckey);
|
|
|
|
if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) {
|
|
_clustering_row_states.erase(ckey);
|
|
}
|
|
}
|
|
|
|
void range_delete(const range_tombstone& rt) {
|
|
_touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
|
|
{
|
|
const auto start_operation = rt.start_kind == bound_kind::incl_start
|
|
? operation::range_delete_start_inclusive
|
|
: operation::range_delete_start_exclusive;
|
|
auto log_ck = _builder.allocate_new_log_row(start_operation);
|
|
_builder.set_clustering_columns(log_ck, rt.start);
|
|
}
|
|
{
|
|
const auto end_operation = rt.end_kind == bound_kind::incl_end
|
|
? operation::range_delete_end_inclusive
|
|
: operation::range_delete_end_exclusive;
|
|
auto log_ck = _builder.allocate_new_log_row(end_operation);
|
|
_builder.set_clustering_columns(log_ck, rt.end);
|
|
}
|
|
|
|
// #6900 - remove stored row data (for postimage)
|
|
// if it falls inside the clustering range of the
|
|
// tombstone.
|
|
if (!_enable_updating_state) {
|
|
return;
|
|
}
|
|
|
|
bound_view::compare cmp(_builder.base_schema());
|
|
auto sb = rt.start_bound();
|
|
auto eb = rt.end_bound();
|
|
|
|
std::erase_if(_clustering_row_states, [&](auto& p) {
|
|
auto& ck = p.first;
|
|
return cmp(sb, ck) && !cmp(eb, ck);
|
|
});
|
|
}
|
|
|
|
void partition_delete(const tombstone&) {
|
|
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
|
|
auto log_ck = _builder.allocate_new_log_row(_partition_delete_op);
|
|
if (_enable_updating_state) {
|
|
_clustering_row_states.clear();
|
|
}
|
|
}
|
|
|
|
constexpr bool finished() const { return false; }
|
|
};
|
|
|
|
class transformer final : public change_processor {
|
|
private:
|
|
db_context _ctx;
|
|
schema_ptr _schema;
|
|
dht::decorated_key _dk;
|
|
schema_ptr _log_schema;
|
|
const per_request_options& _options;
|
|
|
|
/**
|
|
* #6070, #6084
|
|
* Non-atomic column assignments which use a TTL are broken into two invocations
|
|
* of `transform`, such as in the following example:
|
|
* CREATE TABLE t (a int PRIMARY KEY, b map<int, int>) WITH cdc = {'enabled':true};
|
|
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
|
|
*
|
|
* The above UPDATE creates a tombstone and a (0, 0) cell; because tombstones don't have the notion
|
|
* of a TTL, we split the UPDATE into two separate changes (represented as two separate delta rows in the log,
|
|
* resulting in two invocations of `transform`): one change for the deletion with no TTL,
|
|
* and one change for adding cells with TTL = 5.
|
|
*
|
|
* In other words, we use the fact that
|
|
* UPDATE t USING TTL 5 SET b = {0:0} WHERE a = 0;
|
|
* is equivalent to
|
|
* BEGIN UNLOGGED BATCH
|
|
* UPDATE t SET b = null WHERE a = 0;
|
|
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
|
|
* APPLY BATCH;
|
|
* (the mutations are the same in both cases),
|
|
* and perform a separate `transform` call for each statement in the batch.
|
|
*
|
|
* An assignment also happens when an INSERT statement is used as follows:
|
|
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
|
|
*
|
|
* This will be split into three separate changes (three invocations of `transform`):
|
|
* 1. One with TTL = 5 for the row marker (introduces by the INSERT), indicating that a row was inserted.
|
|
* 2. One without a TTL for the tombstone, indicating that the collection was cleared.
|
|
* 3. One with TTL = 5 for the addition of cell (0, 0), indicating that the collection
|
|
* was extended by a new key/value.
|
|
*
|
|
* Why do we need three changes and not two, like in the UPDATE case?
|
|
* The tombstone needs to be a separate change because it doesn't have a TTL,
|
|
* so only the row marker change could potentially be merged with the cell change (1 and 3 above).
|
|
* However, we cannot do that: the row marker change is of INSERT type (cdc$operation == cdc::operation::insert),
|
|
* but there is no way to create a statement that
|
|
* - has a row marker,
|
|
* - adds cells to a collection,
|
|
* - but *doesn't* add a tombstone for this collection.
|
|
* INSERT statements that modify collections *always* add tombstones.
|
|
*
|
|
* Merging the row marker with the cell addition would result in such an impossible statement.
|
|
*
|
|
* Instead, we observe that
|
|
* INSERT INTO t (a, b) VALUES (0, {0:0}) USING TTL 5;
|
|
* is equivalent to
|
|
* BEGIN UNLOGGED BATCH
|
|
* INSERT INTO t (a) VALUES (0) USING TTL 5;
|
|
* UPDATE t SET b = null WHERE a = 0;
|
|
* UPDATE t USING TTL 5 SET b = b + {0:0} WHERE a = 0;
|
|
* APPLY BATCH;
|
|
* and perform a separate `transform` call for each statement in the batch.
|
|
*
|
|
* Unfortunately, due to splitting, the cell addition call (b + b {0:0}) does not know about the tombstone.
|
|
* If it was performed independently from the tombstone call, it would create a wrong post-image:
|
|
* the post-image would look as if the previous cells still existed.
|
|
* For example, suppose that b was equal to {1:1} before the above statement was performed.
|
|
* Then the final post-image for b for above statement/batch would be {0:0, 1:1}, when instead it should be {0:0}.
|
|
*
|
|
* To handle this we use the fact that
|
|
* 1. changes without a TTL are treated as if TTL = 0,
|
|
* 2. `transform` is invoked in order of increasing TTLs,
|
|
* and we maintain state between `transform` invocations (`_clustering_row_states`, `_static_row_state`).
|
|
*
|
|
* Thus, the tombstone call will happen *before* the cell addition call,
|
|
* so the cell addition call will know that there previously was a tombstone
|
|
* and create a correct post-image.
|
|
*
|
|
* Furthermore, `transform` calls for INSERT changes (i.e. with a row marker)
|
|
* happen before `transform` calls for UPDATE changes, so in the case of an INSERT
|
|
* which modifies a collection column as above, the row marker call will happen first;
|
|
* its post-image will still show {1:1} for the collection column. Good.
|
|
*/
|
|
|
|
row_states_map _clustering_row_states;
|
|
cell_map _static_row_state;
|
|
// True if the mutated row existed before applying the mutation. In other
|
|
// words, if the preimage is enabled and it isn't empty (otherwise, we
|
|
// assume that the row is non-existent). Used for Alternator Streams (see
|
|
// #6918).
|
|
bool _is_update = false;
|
|
|
|
const bool _uses_tablets;
|
|
|
|
utils::chunked_vector<mutation> _result_mutations;
|
|
std::optional<log_mutation_builder> _builder;
|
|
|
|
// When enabled, process_change will update _clustering_row_states and _static_row_state
|
|
bool _enable_updating_state = false;
|
|
|
|
stats::part_type_set _touched_parts;
|
|
|
|
public:
|
|
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options)
|
|
: _ctx(ctx)
|
|
, _schema(std::move(s))
|
|
, _dk(std::move(dk))
|
|
, _log_schema(_schema->cdc_schema() ? _schema->cdc_schema() : ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
|
|
, _options(options)
|
|
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
|
|
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())
|
|
{
|
|
}
|
|
|
|
// DON'T move the transformer after this
|
|
void begin_timestamp(api::timestamp_type ts, bool is_last) override {
|
|
const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token());
|
|
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
|
|
_builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema);
|
|
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage());
|
|
}
|
|
|
|
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
|
|
// if we want full preimage, just ignore the affected columns and include everything.
|
|
generate_image(operation::pre_image, ck, _schema->cdc_options().full_preimage() ? nullptr : &columns_to_include);
|
|
};
|
|
|
|
void produce_postimage(const clustering_key* ck) override {
|
|
generate_image(operation::post_image, ck, nullptr);
|
|
}
|
|
|
|
void generate_image(operation op, const clustering_key* ck, const one_kind_column_set* affected_columns) {
|
|
SCYLLA_ASSERT(op == operation::pre_image || op == operation::post_image);
|
|
|
|
// SCYLLA_ASSERT that post_image is always full
|
|
SCYLLA_ASSERT(!(op == operation::post_image && affected_columns));
|
|
|
|
SCYLLA_ASSERT(_builder);
|
|
|
|
const auto kind = ck ? column_kind::regular_column : column_kind::static_column;
|
|
|
|
cell_map* row_state;
|
|
if (ck) {
|
|
row_state = get_row_state(_clustering_row_states, *ck);
|
|
if (!row_state) {
|
|
// We have no data for this row, we can stop here
|
|
// Empty row here means we had data, but column deletes
|
|
// removed it. This we should report as pk/ck only
|
|
return;
|
|
}
|
|
} else {
|
|
row_state = &_static_row_state;
|
|
// if the static row state is empty and we did not touch
|
|
// during processing, it either did not exist, or we did pk delete.
|
|
// In those cases, no postimage for you.
|
|
// If we touched it, we should indicate it with an empty
|
|
// postimage.
|
|
if (row_state->empty() && !_touched_parts.contains(stats::part_type::STATIC_ROW)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
auto image_ck = _builder->allocate_new_log_row(op);
|
|
if (ck) {
|
|
_builder->set_clustering_columns(image_ck, *ck);
|
|
}
|
|
|
|
auto process_cell = [&, this] (const column_definition& cdef) {
|
|
// If table uses compact storage it may contain a column of type empty
|
|
// and we need to ignore such a field because it is not present in CDC log.
|
|
if (cdef.type->get_kind() == abstract_type::kind::empty) {
|
|
return;
|
|
}
|
|
if (auto current = get_col_from_row_state(row_state, cdef)) {
|
|
_builder->set_value(image_ck, cdef, *current);
|
|
} else if (op == operation::pre_image) {
|
|
// Cell is NULL.
|
|
// If we generate the preimage,
|
|
// we should also fill in the deleted column.
|
|
// Otherwise the user will not be able
|
|
// to discern whether a value in the preimage
|
|
// is NULL or it was not included in the
|
|
// preimage ('full' preimage disabled).
|
|
// If we generate the postimage,
|
|
// we don't have to fill in the deleted column,
|
|
// as all postimages are full.
|
|
_builder->set_deleted(image_ck, cdef);
|
|
}
|
|
};
|
|
|
|
if (affected_columns) {
|
|
// Preimage case - include only data from requested columns
|
|
for (auto it = affected_columns->find_first(); it != one_kind_column_set::npos; it = affected_columns->find_next(it)) {
|
|
const auto id = column_id(it);
|
|
const auto& cdef = _schema->column_at(kind, id);
|
|
process_cell(cdef);
|
|
}
|
|
} else {
|
|
// Postimage case - include data from all columns
|
|
const auto column_range = _schema->columns(kind);
|
|
std::for_each(column_range.begin(), column_range.end(), process_cell);
|
|
}
|
|
}
|
|
|
|
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
|
|
// more details like tombstones/ttl? Probably not but keep in mind.
|
|
void process_change(const mutation& m) override {
|
|
SCYLLA_ASSERT(_builder);
|
|
process_change_visitor v {
|
|
._request_options = _options,
|
|
._row_delete_op = _options.is_system_originated ? operation::service_row_delete : operation::row_delete,
|
|
._partition_delete_op = _options.is_system_originated ? operation::service_partition_delete : operation::partition_delete,
|
|
._touched_parts = _touched_parts,
|
|
._builder = *_builder,
|
|
._enable_updating_state = _enable_updating_state,
|
|
._clustering_row_states = _clustering_row_states,
|
|
._static_row_state = _static_row_state,
|
|
._is_update = _is_update,
|
|
._generate_delta_values = generate_delta_values(_builder->base_schema())
|
|
};
|
|
cdc::inspect_mutation(m, v);
|
|
}
|
|
|
|
void end_record() override {
|
|
SCYLLA_ASSERT(_builder);
|
|
_builder->end_record();
|
|
}
|
|
|
|
const row_states_map& clustering_row_states() const override {
|
|
return _clustering_row_states;
|
|
}
|
|
|
|
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
|
// The `transformer` object on which this method was called on should not be used anymore.
|
|
std::tuple<utils::chunked_vector<mutation>, stats::part_type_set> finish() && {
|
|
return std::make_pair<utils::chunked_vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
|
|
}
|
|
|
|
static db::timeout_clock::time_point default_timeout() {
|
|
return db::timeout_clock::now() + 10s;
|
|
}
|
|
|
|
future<lw_shared_ptr<cql3::untyped_result_set>> pre_image_select(
|
|
service::client_state& client_state,
|
|
db::consistency_level write_cl,
|
|
const mutation& m)
|
|
{
|
|
auto& p = m.partition();
|
|
const bool no_ck_schema_partition_deletion = m.schema()->clustering_key_size() == 0 && bool(p.partition_tombstone());
|
|
if (p.clustered_rows().empty() && p.static_row().empty() && !no_ck_schema_partition_deletion) {
|
|
return make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>();
|
|
}
|
|
|
|
dht::partition_range_vector partition_ranges{dht::partition_range(m.decorated_key())};
|
|
|
|
auto&& pc = _schema->partition_key_columns();
|
|
auto&& cc = _schema->clustering_key_columns();
|
|
|
|
std::vector<query::clustering_range> bounds;
|
|
uint64_t row_limit = query::max_rows;
|
|
|
|
const bool has_only_static_row = !p.static_row().empty() && p.clustered_rows().empty();
|
|
if (cc.empty() || has_only_static_row) {
|
|
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
|
|
if (has_only_static_row) {
|
|
row_limit = 1;
|
|
}
|
|
} else {
|
|
for (const rows_entry& r : p.clustered_rows()) {
|
|
auto& ck = r.key();
|
|
bounds.push_back(query::clustering_range::make_singular(ck));
|
|
}
|
|
}
|
|
|
|
std::vector<const column_definition*> columns;
|
|
columns.reserve(_schema->all_columns().size());
|
|
|
|
std::transform(pc.begin(), pc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
|
std::transform(cc.begin(), cc.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
|
|
|
query::column_id_vector static_columns, regular_columns;
|
|
|
|
// TODO: this assumes all mutations touch the same set of columns. This might not be true, and we may need to do more horrible set operation here.
|
|
if (!p.static_row().empty()) {
|
|
// for postimage we need everything...
|
|
if (_schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) {
|
|
for (const column_definition& c: _schema->static_columns()) {
|
|
static_columns.emplace_back(c.id);
|
|
columns.emplace_back(&c);
|
|
}
|
|
} else {
|
|
p.static_row().get().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
|
|
auto& cdef =_schema->column_at(column_kind::static_column, id);
|
|
static_columns.emplace_back(id);
|
|
columns.emplace_back(&cdef);
|
|
});
|
|
}
|
|
}
|
|
if (!p.clustered_rows().empty() || no_ck_schema_partition_deletion) {
|
|
const bool has_row_delete = std::any_of(p.clustered_rows().begin(), p.clustered_rows().end(), [] (const rows_entry& re) {
|
|
return re.row().deleted_at();
|
|
});
|
|
// for postimage we need everything...
|
|
if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage() || no_ck_schema_partition_deletion) {
|
|
for (const column_definition& c: _schema->regular_columns()) {
|
|
regular_columns.emplace_back(c.id);
|
|
columns.emplace_back(&c);
|
|
}
|
|
} else {
|
|
p.clustered_rows().begin()->row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection&) {
|
|
const auto& cdef =_schema->column_at(column_kind::regular_column, id);
|
|
regular_columns.emplace_back(id);
|
|
columns.emplace_back(&cdef);
|
|
});
|
|
}
|
|
}
|
|
|
|
auto selection = cql3::selection::selection::for_columns(_schema, std::move(columns));
|
|
|
|
auto opts = selection->get_query_options();
|
|
opts.set(query::partition_slice::option::collections_as_maps);
|
|
opts.set_if<query::partition_slice::option::always_return_static_content>(!p.static_row().empty());
|
|
|
|
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts));
|
|
const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice);
|
|
const auto tombstone_limit = query::tombstone_limit(_ctx._proxy.get_tombstone_limit());
|
|
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), tombstone_limit, query::row_limit(row_limit));
|
|
|
|
const auto select_cl = adjust_cl(write_cl);
|
|
|
|
try {
|
|
return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), select_cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then(
|
|
[s = _schema, partition_slice = std::move(partition_slice), selection = std::move(selection)] (service::storage_proxy::coordinator_query_result qr) -> lw_shared_ptr<cql3::untyped_result_set> {
|
|
return make_lw_shared<cql3::untyped_result_set>(*s, std::move(qr.query_result), *selection, partition_slice);
|
|
});
|
|
} catch (exceptions::unavailable_exception& e) {
|
|
// `query` can throw `unavailable_exception`, which is seen by clients as ~ "NoHostAvailable".
|
|
// So, we'll translate it to a `read_failure_exception` with custom message.
|
|
cdc_log.debug("Preimage: translating a (read) `unavailable_exception` to `request_execution_exception` - {}", e);
|
|
throw exceptions::read_failure_exception("CDC preimage query could not achieve the CL.",
|
|
e.consistency, e.alive, 0, e.required, false);
|
|
}
|
|
}
|
|
|
|
// Note: this assumes that the results are from one partition only
|
|
void load_preimage_results_into_state(lw_shared_ptr<cql3::untyped_result_set> preimage_set, bool static_only) {
|
|
// static row
|
|
if (!preimage_set->empty()) {
|
|
// There may be some static row data
|
|
const auto& row = preimage_set->front();
|
|
for (auto& c : _schema->static_columns()) {
|
|
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
|
|
_static_row_state[&c] = std::move(*maybe_cell_view);
|
|
}
|
|
}
|
|
_is_update = true;
|
|
}
|
|
|
|
if (static_only) {
|
|
return;
|
|
}
|
|
|
|
// clustering rows
|
|
for (const auto& row : *preimage_set) {
|
|
// Construct the clustering key for this row
|
|
std::vector<managed_bytes> ck_parts;
|
|
ck_parts.reserve(_schema->clustering_key_size());
|
|
for (auto& c : _schema->clustering_key_columns()) {
|
|
auto v = row.get_view_opt(c.name_as_text());
|
|
if (!v) {
|
|
// We might get here if both of the following conditions are true:
|
|
// - In preimage query, we requested the static row and some clustering rows,
|
|
// - The partition had some static row data, but did not have any requested clustering rows.
|
|
// In such case, the result set will have an artificial row that only contains static columns,
|
|
// but no clustering columns. In such case, we can safely return from the function,
|
|
// as there will be no clustering row data to load into the state.
|
|
return;
|
|
}
|
|
ck_parts.emplace_back(managed_bytes(*v));
|
|
}
|
|
auto ck = clustering_key::from_exploded(std::move(ck_parts));
|
|
|
|
// Collect regular rows
|
|
cell_map cells;
|
|
for (auto& c : _schema->regular_columns()) {
|
|
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
|
|
cells[&c] = std::move(*maybe_cell_view);
|
|
}
|
|
}
|
|
|
|
_clustering_row_states.insert_or_assign(std::move(ck), std::move(cells));
|
|
}
|
|
}
|
|
|
|
/** For preimage query use the same CL as for base write, except for CLs ANY and ALL. */
|
|
static db::consistency_level adjust_cl(db::consistency_level write_cl) {
|
|
if (write_cl == db::consistency_level::ANY) {
|
|
return db::consistency_level::ONE;
|
|
} else if (write_cl == db::consistency_level::ALL || write_cl == db::consistency_level::SERIAL) {
|
|
return db::consistency_level::QUORUM;
|
|
} else if (write_cl == db::consistency_level::LOCAL_SERIAL) {
|
|
return db::consistency_level::LOCAL_QUORUM;
|
|
}
|
|
return write_cl;
|
|
}
|
|
};
|
|
|
|
template <typename Func>
|
|
future<utils::chunked_vector<mutation>>
|
|
transform_mutations(utils::chunked_vector<mutation>& muts, decltype(muts.size()) batch_size, Func&& f) {
|
|
return parallel_for_each(
|
|
boost::irange(static_cast<decltype(muts.size())>(0), muts.size(), batch_size),
|
|
std::forward<Func>(f))
|
|
.then([&muts] () mutable { return std::move(muts); });
|
|
}
|
|
|
|
} // namespace cdc
|
|
|
|
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
|
|
cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, utils::chunked_vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl, per_request_options options) {
|
|
// we do all this because in the case of batches, we can have mixed schemas.
|
|
auto e = mutations.end();
|
|
auto i = std::find_if(mutations.begin(), e, [](const mutation& m) {
|
|
return cdc_enabled(*m.schema());
|
|
});
|
|
|
|
if (i == e) {
|
|
return make_ready_future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), lw_shared_ptr<cdc::operation_result_tracker>()));
|
|
}
|
|
|
|
tracing::trace(tr_state, "CDC: Started generating mutations for log rows");
|
|
mutations.reserve(2 * mutations.size());
|
|
|
|
return do_with(std::move(mutations), service::query_state(service::client_state::for_internal_calls(), empty_service_permit()), operation_details{}, std::move(options),
|
|
[this, tr_state = std::move(tr_state), write_cl] (utils::chunked_vector<mutation>& mutations, service::query_state& qs, operation_details& details, per_request_options& options) {
|
|
return transform_mutations(mutations, 1, [this, &mutations, &qs, tr_state = tr_state, &details, write_cl, &options] (int idx) mutable {
|
|
auto& m = mutations[idx];
|
|
auto s = m.schema();
|
|
|
|
if (!cdc_enabled(*s)) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
const bool alternator_increased_compatibility = options.alternator && options.alternator_streams_increased_compatibility;
|
|
transformer trans(_ctxt, s, m.decorated_key(), options);
|
|
|
|
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
|
|
if (options.preimage && !options.preimage->empty()) {
|
|
// Preimage has been fetched by upper layers.
|
|
tracing::trace(tr_state, "CDC: Using a prefetched preimage");
|
|
f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(options.preimage);
|
|
} else if (s->cdc_options().preimage() || s->cdc_options().postimage() || alternator_increased_compatibility) {
|
|
// Note: further improvement here would be to coalesce the pre-image selects into one
|
|
// if a batch contains several modifications to the same table. Otoh, batch is rare(?)
|
|
// so this is premature.
|
|
tracing::trace(tr_state, "CDC: Selecting preimage for {}", m.decorated_key());
|
|
f = trans.pre_image_select(qs.get_client_state(), write_cl, m).then_wrapped([this] (future<lw_shared_ptr<cql3::untyped_result_set>> f) {
|
|
auto& cdc_stats = _ctxt._proxy.get_cdc_stats();
|
|
cdc_stats.counters_total.preimage_selects++;
|
|
if (f.failed()) {
|
|
cdc_stats.counters_failed.preimage_selects++;
|
|
}
|
|
return f;
|
|
});
|
|
} else {
|
|
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
|
|
}
|
|
|
|
return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
|
|
auto& m = mutations[idx];
|
|
auto& s = m.schema();
|
|
|
|
if (rs) {
|
|
const auto& p = m.partition();
|
|
const bool static_only = !p.static_row().empty() && p.clustered_rows().empty();
|
|
trans.load_preimage_results_into_state(std::move(rs), static_only);
|
|
}
|
|
|
|
const bool preimage = s->cdc_options().preimage();
|
|
const bool postimage = s->cdc_options().postimage();
|
|
details.had_preimage |= preimage;
|
|
details.had_postimage |= postimage;
|
|
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
|
|
if (should_split(m, options)) {
|
|
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
|
|
details.was_split = true;
|
|
process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
|
|
} else {
|
|
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
|
|
process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
|
|
}
|
|
auto [log_mut, touched_parts] = std::move(trans).finish();
|
|
const int generated_count = log_mut.size();
|
|
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
|
|
|
// `m` might be invalidated at this point because of the push_back to the vector
|
|
tracing::trace(tr_state, "CDC: Generated {} log mutations from {}", generated_count, mutations[idx].decorated_key());
|
|
details.touched_parts.add(touched_parts);
|
|
});
|
|
}).then([this, tr_state, &details](utils::chunked_vector<mutation> mutations) {
|
|
tracing::trace(tr_state, "CDC: Finished generating all log mutations");
|
|
auto tracker = make_lw_shared<cdc::operation_result_tracker>(_ctxt._proxy.get_cdc_stats(), details);
|
|
return make_ready_future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>(std::make_tuple(std::move(mutations), std::move(tracker)));
|
|
});
|
|
});
|
|
}
|
|
|
|
bool cdc::cdc_service::needs_cdc_augmentation(const utils::chunked_vector<mutation>& mutations) const {
|
|
return std::any_of(mutations.begin(), mutations.end(), [](const mutation& m) {
|
|
return cdc_enabled(*m.schema());
|
|
});
|
|
}
|
|
|
|
future<std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>>
|
|
cdc::cdc_service::augment_mutation_call(lowres_clock::time_point timeout, utils::chunked_vector<mutation>&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl, per_request_options options) {
|
|
if (utils::get_local_injector().enter("sleep_before_cdc_augmentation")) {
|
|
return seastar::sleep(std::chrono::milliseconds(100)).then([this, timeout, mutations = std::move(mutations), tr_state = std::move(tr_state), write_cl, options = std::move(options)] () mutable {
|
|
return _impl->augment_mutation_call(timeout, std::move(mutations), std::move(tr_state), write_cl, std::move(options));
|
|
});
|
|
}
|
|
return _impl->augment_mutation_call(timeout, std::move(mutations), std::move(tr_state), write_cl, std::move(options));
|
|
}
|