/* * Copyright (C) 2019-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ /* * This module manages CDC log tables. It contains facilities used to: * - perform schema changes to CDC log tables correspondingly when base tables are changed, * - perform writes to CDC log tables correspondingly when writes to base tables are made. */ #pragma once #include #include #include #include #include #include "cql3/untyped_result_set.hh" #include "mutation/timestamp.hh" #include "tracing/trace_state.hh" #include "utils/UUID.hh" class schema; using schema_ptr = seastar::lw_shared_ptr; namespace locator { class token_metadata; } // namespace locator namespace service { class migration_notifier; class storage_proxy; class query_state; } // namespace service class mutation; class partition_key; namespace replica { class database; } namespace cdc { using cell_map = std::unordered_map; using row_states_map = std::unordered_map; // cdc log table operation enum class operation : int8_t { // note: these values will eventually be read by a third party, probably not privvy to this // enum decl, so don't change the constant values (or the datatype). pre_image = 0, update = 1, insert = 2, row_delete = 3, partition_delete = 4, range_delete_start_inclusive = 5, range_delete_start_exclusive = 6, range_delete_end_inclusive = 7, range_delete_end_exclusive = 8, post_image = 9, // Operations initiated internally by Scylla. Currently used only by Alternator service_row_delete = -3, service_partition_delete = -4, }; struct per_request_options { // The value of the base row before current operation, queried by higher // layers than CDC. We assume that CDC could have seen the row in this // state, i.e. the value isn't 'stale'/'too recent'. lw_shared_ptr preimage; // Whether this mutation is a result of an internal operation initiated by // Scylla. Currently, only TTL expiration implementation for Alternator // uses this. const bool is_system_originated = false; // True if this mutation was emitted by Alternator. const bool alternator = false; // Sacrifice performance for the sake of better compatibility with DynamoDB // Streams. It's important for correctness that // alternator_streams_increased_compatibility config flag be read once per // request, because it's live-updateable. As a result, the flag may change // between reads. const bool alternator_streams_increased_compatibility = false; }; struct operation_result_tracker; class db_context; class metadata; bool is_log_name(const std::string_view& table_name); bool is_log_schema(const schema& s); /// \brief CDC service, responsible for schema listeners /// /// CDC service will listen for schema changes and iff CDC is enabled/changed /// create/modify/delete corresponding log tables etc as part of the schema change. /// class cdc_service final : public async_sharded_service { class impl; std::unique_ptr _impl; public: future<> stop(); cdc_service(service::storage_proxy&, cdc::metadata&, service::migration_notifier&); cdc_service(db_context); ~cdc_service(); // If any of the mutations are cdc enabled, optionally selects preimage, and adds the // appropriate augments to set the log entries. // Iff post-image is enabled for any of these, a non-empty callback is also // returned to be invoked post the mutation query. future, lw_shared_ptr>> augment_mutation_call( lowres_clock::time_point timeout, utils::chunked_vector&& mutations, tracing::trace_state_ptr tr_state, db::consistency_level write_cl, per_request_options options = {} ); bool needs_cdc_augmentation(const utils::chunked_vector&) const; }; struct db_context final { service::storage_proxy& _proxy; service::migration_notifier& _migration_notifier; cdc::metadata& _cdc_metadata; db_context(service::storage_proxy& proxy, cdc::metadata& cdc_meta, service::migration_notifier& notifier) noexcept : _proxy(proxy), _migration_notifier(notifier), _cdc_metadata(cdc_meta) {} }; bool is_log_for_some_table(const replica::database& db, const sstring& ks_name, const std::string_view& table_name); schema_ptr get_base_table(const replica::database&, const schema&); schema_ptr get_base_table(const replica::database&, std::string_view, std::string_view); bool cdc_enabled(const schema& s); seastar::sstring base_name(std::string_view log_name); seastar::sstring log_name(std::string_view table_name); seastar::sstring log_data_column_name(std::string_view column_name); seastar::sstring log_meta_column_name(std::string_view column_name); bytes log_data_column_name_bytes(const bytes& column_name); bytes log_meta_column_name_bytes(const bytes& column_name); seastar::sstring log_data_column_deleted_name(std::string_view column_name); bytes log_data_column_deleted_name_bytes(const bytes& column_name); seastar::sstring log_data_column_deleted_elements_name(std::string_view column_name); bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name); bool is_cdc_metacolumn_name(const sstring& name); utils::UUID generate_timeuuid(api::timestamp_type t); cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck); const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck); } // namespace cdc