diff --git a/cdc/cdc_options.hh b/cdc/cdc_options.hh index b447b5edb4..9b18a08944 100644 --- a/cdc/cdc_options.hh +++ b/cdc/cdc_options.hh @@ -28,7 +28,6 @@ namespace cdc { enum class delta_mode : uint8_t { - off, keys, full, }; diff --git a/cdc/log.cc b/cdc/log.cc index f636e17a29..d50cbc341f 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -295,17 +295,15 @@ future<> cdc::cdc_service::stop() { cdc::cdc_service::~cdc_service() = default; namespace { -static const sstring delta_mode_string_off = "off"; static const sstring delta_mode_string_keys = "keys"; static const sstring delta_mode_string_full = "full"; static const std::string_view image_mode_string_on = "on"; -static const std::string_view image_mode_string_off = delta_mode_string_off; +static const std::string_view image_mode_string_off = "off"; static const std::string_view image_mode_string_full = delta_mode_string_full; sstring to_string(cdc::delta_mode dm) { switch (dm) { - case cdc::delta_mode::off : return delta_mode_string_off; case cdc::delta_mode::keys : return delta_mode_string_keys; case cdc::delta_mode::full : return delta_mode_string_full; } @@ -363,8 +361,6 @@ cdc::options::options(const std::map& map) { } else if (key == "delta") { if (val == delta_mode_string_keys) { _delta_mode = delta_mode::keys; - } else if (val == delta_mode_string_off) { - _delta_mode = delta_mode::off; } else if (val != delta_mode_string_full) { throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second); } @@ -377,11 +373,6 @@ cdc::options::options(const std::map& map) { throw exceptions::configuration_exception("Invalid CDC option: " + p.first); } } - - if (_enabled && !preimage() && !postimage() && get_delta_mode() == delta_mode::off) { - throw exceptions::configuration_exception("Invalid combination of CDC options: neither of" - " {preimage, postimage, delta} is enabled"); - } } std::map cdc::options::to_map() const { @@ -985,6 +976,13 @@ static ttl_opt get_ttl(const row_marker& rm) { return rm.is_expiring() ? std::optional{rm.ttl()} : std::nullopt; } +/** + * Returns whether we should generate cdc delta values (beyond keys) + */ +static bool generate_delta_values(const schema& s) { + return s.cdc_options().get_delta_mode() == cdc::delta_mode::full; +} + /* Visits the cells and tombstones of a single base mutation row and constructs corresponding delta-row cells * for the corresponding log mutation. * @@ -1016,13 +1014,16 @@ struct process_row_visitor { // We need to keep a reference to it since we might insert new row_states during the visitation. row_states_map& _clustering_row_states; + const bool _generate_delta_values = true; + process_row_visitor( const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder, bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state, - row_states_map& clustering_row_states) + row_states_map& clustering_row_states, bool generate_delta_values) : _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder), _enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state), - _clustering_row_states(clustering_row_states) + _clustering_row_states(clustering_row_states), + _generate_delta_values(generate_delta_values) {} void update_row_state(const column_definition& cdef, bytes_opt value) { @@ -1040,7 +1041,9 @@ struct process_row_visitor { bytes value = get_bytes(cell); // delta - _builder.set_value(_log_ck, cdef, value); + if (_generate_delta_values) { + _builder.set_value(_log_ck, cdef, value); + } // images if (_enable_updating_state) { @@ -1050,8 +1053,9 @@ struct process_row_visitor { void dead_atomic_cell(const column_definition& cdef, const atomic_cell_view&) { // delta - _builder.set_deleted(_log_ck, cdef); - + if (_generate_delta_values) { + _builder.set_deleted(_log_ck, cdef); + } // images if (_enable_updating_state) { update_row_state(cdef, std::nullopt); @@ -1183,16 +1187,18 @@ struct process_row_visitor { } // delta - if (is_column_delete) { - _builder.set_deleted(_log_ck, cdef); - } + if (_generate_delta_values) { + if (is_column_delete) { + _builder.set_deleted(_log_ck, cdef); + } - if (deleted_elements) { - _builder.set_deleted_elements(_log_ck, cdef, *deleted_elements); - } + if (deleted_elements) { + _builder.set_deleted_elements(_log_ck, cdef, *deleted_elements); + } - if (added_cells) { - _builder.set_value(_log_ck, cdef, *added_cells); + if (added_cells) { + _builder.set_value(_log_ck, cdef, *added_cells); + } } // images @@ -1225,6 +1231,8 @@ struct process_change_visitor { row_states_map& _clustering_row_states; cell_map& _static_row_state; + const bool _generate_delta_values = true; + void static_row_cells(auto&& visit_row_cells) { _touched_parts.set(); @@ -1232,7 +1240,7 @@ struct process_change_visitor { process_row_visitor v( log_ck, _touched_parts, _builder, - _enable_updating_state, nullptr, &_static_row_state, _clustering_row_states); + _enable_updating_state, nullptr, &_static_row_state, _clustering_row_states, _generate_delta_values); visit_row_cells(v); _builder.set_ttl(log_ck, v._ttl_column); @@ -1242,20 +1250,12 @@ struct process_change_visitor { _touched_parts.set(); auto log_ck = _builder.allocate_new_log_row(); - _builder.set_clustering_columns(log_ck, ckey); struct clustering_row_cells_visitor : public process_row_visitor { operation _cdc_op = operation::update; - clustering_row_cells_visitor( - const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder, - bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state, - row_states_map& clustering_row_states) - : process_row_visitor( - log_ck, touched_parts, builder, - enable_updating_state, base_ck, row_state, clustering_row_states) - {} + using process_row_visitor::process_row_visitor; void marker(const row_marker& rm) { _ttl_column = get_ttl(rm); @@ -1266,7 +1266,7 @@ struct process_change_visitor { clustering_row_cells_visitor v( log_ck, _touched_parts, _builder, _enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey), - _clustering_row_states); + _clustering_row_states, _generate_delta_values); visit_row_cells(v); _builder.set_operation(log_ck, v._cdc_op); @@ -1320,7 +1320,6 @@ struct process_change_visitor { void partition_delete(const tombstone&) { _touched_parts.set(); - auto log_ck = _builder.allocate_new_log_row(operation::partition_delete); if (_enable_updating_state) { _clustering_row_states.clear(); @@ -1422,56 +1421,6 @@ private: stats::part_type_set _touched_parts; - // Remove non-key columns or entire delta rows, according to `delta` setting in `cdc` options. - void adjust_or_delete_deltas() { - if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::full) { - return; - } - - static const auto& op_col = *_log_schema->get_column_definition(log_meta_column_name_bytes("operation")); - static const auto preimg_op_bytes = op_col.type->decompose(operation_native_type(operation::pre_image)); - static const auto postimg_op_bytes = op_col.type->decompose(operation_native_type(operation::post_image)); - for (auto& m : _result_mutations) { - auto& clustered_rows = m.partition().clustered_rows(); - int deleted_rows_cnt = 0; - for (auto it = clustered_rows.begin(); it != clustered_rows.end(); /* no increment */) { - const auto& op_cell = it->row().cells().cell_at(op_col.id).as_atomic_cell(op_col); - const auto op_val = op_cell.value().linearize(); - if (op_val != preimg_op_bytes && op_val != postimg_op_bytes) { - if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::off) { - it = m.partition().clustered_rows().erase_and_dispose(it, current_deleter()); - ++deleted_rows_cnt; - continue; - } - - // The case of `get_delta_mode() == delta_mode::keys`: - it->row().cells().remove_if([this, log_s = m.schema()] (column_id id, atomic_cell_or_collection& acoc) { - const auto& log_cdef = log_s->column_at(column_kind::regular_column, id); - if (is_cdc_metacolumn_name(log_cdef.name_as_text())) { - return false; - } - const auto* base_cdef = _schema->get_column_definition(log_cdef.name()); - // Remove columns from delta that correspond to non-PK/CK columns in the base table. - return base_cdef != nullptr - && (base_cdef->kind != column_kind::partition_key && base_cdef->kind != column_kind::clustering_key); - }); - } - - // Deletion of deltas might leave gaps in `batch_seq_no` - let's fix them. - if (deleted_rows_cnt > 0) { - const auto* batch_seq_no_cdef = m.schema()->get_column_definition(log_meta_column_name_bytes("batch_seq_no")); - assert(batch_seq_no_cdef != nullptr); - auto exploded_ck = it->key().explode(); - const size_t batch_seq_no_idx = batch_seq_no_cdef->component_index(); - const auto old_batch_seq_no = value_cast(int32_type->deserialize(exploded_ck[batch_seq_no_idx])); - exploded_ck[batch_seq_no_idx] = int32_type->decompose(old_batch_seq_no - deleted_rows_cnt); - it->key() = clustering_key::from_exploded(std::move(exploded_ck)); - } - ++it; - } - } - } - public: transformer(db_context ctx, schema_ptr s, dht::decorated_key dk) : _ctx(ctx) @@ -1561,7 +1510,8 @@ public: ._builder = *_builder, ._enable_updating_state = _enable_updating_state, ._clustering_row_states = _clustering_row_states, - ._static_row_state = _static_row_state + ._static_row_state = _static_row_state, + ._generate_delta_values = generate_delta_values(_builder->base_schema()) }; cdc::inspect_mutation(m, v); } @@ -1569,7 +1519,6 @@ public: // Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime. // The `transformer` object on which this method was called on should not be used anymore. std::tuple, stats::part_type_set> finish() && { - adjust_or_delete_deltas(); return std::make_pair, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts)); } diff --git a/test/cql/cdc_delta_modes_test.cql b/test/cql/cdc_delta_modes_test.cql index 35376ea690..ba396e53a7 100644 --- a/test/cql/cdc_delta_modes_test.cql +++ b/test/cql/cdc_delta_modes_test.cql @@ -1,11 +1,4 @@ -create table tb1 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': false, 'postimage': false, 'delta': 'off'}; - -create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'off'}; --- Should add 1 row (postimage) -insert into tb2 (pk, ck, i1) VALUES (1, 11, 111) USING TTL 1111; -select "cdc$batch_seq_no", "cdc$operation", pk, ck, i1 from tb2_scylla_cdc_log where pk = 1 and ck = 11 allow filtering; - -alter table tb2 with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'}; +create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'}; -- Should add 3 rows (preimage + postimage + delta). Delta has only key columns and "pk" + "ck" insert into tb2 (pk, ck, i1) VALUES (2, 22, 222) USING TTL 2222; select "cdc$batch_seq_no", "cdc$operation", "cdc$ttl", pk, ck, i1 from tb2_scylla_cdc_log where pk = 2 and ck = 22 allow filtering; diff --git a/test/cql/cdc_delta_modes_test.result b/test/cql/cdc_delta_modes_test.result index f92e4a17fa..6f436b04f3 100644 --- a/test/cql/cdc_delta_modes_test.result +++ b/test/cql/cdc_delta_modes_test.result @@ -1,33 +1,4 @@ -create table tb1 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': false, 'postimage': false, 'delta': 'off'}; -{ - "message" : "exceptions::configuration_exception (Invalid combination of CDC options: neither of {preimage, postimage, delta} is enabled)", - "status" : "error" -} - -create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'off'}; -{ - "status" : "ok" -} --- Should add 1 row (postimage) -insert into tb2 (pk, ck, i1) VALUES (1, 11, 111) USING TTL 1111; -{ - "status" : "ok" -} -select "cdc$batch_seq_no", "cdc$operation", pk, ck, i1 from tb2_scylla_cdc_log where pk = 1 and ck = 11 allow filtering; -{ - "rows" : - [ - { - "cdc$batch_seq_no" : "0", - "cdc$operation" : "9", - "ck" : "11", - "i1" : "111", - "pk" : "1" - } - ] -} - -alter table tb2 with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'}; +create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'}; { "status" : "ok" }