alternator: fix Alternator writing unnecesary cdc entries

Work in this patch is a result of two bugs - spurious MODIFY event, when
remove column is used in `update_item` on non-existing item and
spurious events, when batch write item mixed noop operations with
operations involving actual changes (the former would still emit
cdc log entries).
The latter issue required rework of Piotr Wieczorek's algorithm,
which fixed former issue as well.

Piotr Wieczorek previously wrote checks, that should
prevent unnecesary cdc events from being written. His implementation
missed the fact, that a single `mutation` object passed to cdc code
to be analysed for cdc log entries can contain modifications for
multiple rows (with the same timestamp - for example as a result
to BatchWriteItem call). His code tries to skip whole `mutation`,
which in such case is not possible, because BatchWriteItem might have
one item that does nothing and second item that does modification
(this is the reason for the second bug).

His algorithm was extended and moved. Originally it was working
as follows - user would sent a `mutation` object with some changes to
be "augmented". The cdc would process those changes and built a set of
cdc log changes based on them, that would be added to cdc log table.
Piotr added a `should_skip` function, which processes user changes and
tried to determine if they all should be dropped or not.
New version, instead of trying to skip adding rows to
cdc log `mutation` object, builds a rows-to-ignore set.
After whole cdc log `mutation` object is completed, it processes it
and go through it row by row. Any row that was previously added to
a `rows_to_ignore` set will now be removed. Remaining rows are written to
new cdc log `mutation` with new clustering key
(`cdc$batch_seq_no` index value should probably be consecutive -
we just want to be safe here) and returns new `mutation` object to
be sent to cdc log table.

The first bug is fixed as a side effect of new algorithm,
which contains more precise checks detecting, if given
mutation actually made a difference.

Fixes: #28368
Fixes: SCYLLADB-538
Fixes: SCYLLADB-1528
Refs: #28452
This commit is contained in:
Radosław Cybulski
2026-02-02 13:15:49 +01:00
parent 2894542e57
commit 6e5aaa85b6
4 changed files with 176 additions and 21 deletions

View File

@@ -8,7 +8,7 @@
#include <utility>
#include <algorithm>
#include <unordered_set>
#include <boost/range/irange.hpp>
#include <seastar/core/thread.hh>
#include <seastar/core/metrics.hh>
@@ -47,6 +47,7 @@
#include "tracing/trace_state.hh"
#include "stats.hh"
#include "utils/labels.hh"
#include "alternator/executor.hh"
namespace std {
@@ -1068,6 +1069,14 @@ public:
return create_ck(_batch_no - 1);
}
api::timestamp_type get_timestamp() const {
return _ts;
}
ttl_opt get_ttl() const {
return _ttl;
}
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
clustering_key allocate_new_log_row(operation op) {
auto log_ck = allocate_new_log_row();
@@ -1209,15 +1218,25 @@ struct process_row_visitor {
row_states_map& _clustering_row_states;
const bool _generate_delta_values = true;
// true if we are processing changes that were produced by Alternator
const bool _alternator;
// will be set to true, if any kind of change in row will be detected. Used only, when processing Alternator's changes.
bool _alternator_any_value_changed = false;
// will be set to true, if Alternator's collection column (:attrs) will be modified only by removing elements
// Used only, when processing Alternator's changes.
bool _alternator_only_deletes = false;
process_row_visitor(
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
row_states_map& clustering_row_states, bool generate_delta_values)
row_states_map& clustering_row_states, bool generate_delta_values, bool alternator = false)
: _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder),
_enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state),
_clustering_row_states(clustering_row_states),
_generate_delta_values(generate_delta_values)
_generate_delta_values(generate_delta_values), _alternator(alternator)
{}
void update_row_state(const column_definition& cdef, managed_bytes_opt value) {
@@ -1227,7 +1246,17 @@ struct process_row_visitor {
auto [it, _] = _clustering_row_states.try_emplace(*_base_ck);
_row_state = &it->second;
}
(*_row_state)[&cdef] = std::move(value);
auto [ it, inserted ] = _row_state->insert({ &cdef, std::nullopt });
// we ignore `_alternator_any_value_changed` for non-alternator changes.
// we don't filter if `_enable_updating_state` is false, as on top of needing pre image
// we also need cdc to build post image for us
// we add check for `_alternator` here for performance reasons - no point in byte compare objects
// if the return value will be ignored
if (_alternator && _enable_updating_state) {
_alternator_any_value_changed = _alternator_any_value_changed || it->second != value;
}
it->second = std::move(value);
}
void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) {
@@ -1377,6 +1406,8 @@ struct process_row_visitor {
auto&& deleted_keys = std::get<1>(result);
auto&& added_cells = std::get<2>(result);
_alternator_only_deletes = cdef.name_as_text() == alternator::executor::ATTRS_COLUMN_NAME && !deleted_keys.empty() && !added_cells.has_value();
// FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob,
// then we deserialize again when merging images below
managed_bytes_opt deleted_elements = std::nullopt;
@@ -1434,12 +1465,31 @@ struct process_change_visitor {
const bool _enable_updating_state = false;
row_states_map& _clustering_row_states;
// clustering keys' as bytes of rows that should be ignored, when writing cdc log changes
// filtering will be done in `clean_up_noop_rows` function. Used only, when processing Alternator's changes.
// Since Alternator clustering key is always at most single column, we store unpacked clustering key.
// If Alternator table is without clustering key, that means partition has at most one row, any value present
// in _alternator_clustering_keys_to_ignore will make us ignore that single row -
// we will use an empty bytes object.
std::unordered_set<bytes>& _alternator_clustering_keys_to_ignore;
cell_map& _static_row_state;
const bool _alternator_schema_has_no_clustering_key = false;
const bool _is_update = false;
const bool _generate_delta_values = true;
// only called, when processing Alternator's change
void alternator_add_ckey_to_rows_to_ignore(const clustering_key& ckey) {
throwing_assert(_request_options.alternator);
auto res = ckey.explode();
auto ckey_exploded = !res.empty() ? res[0] : bytes{};
_alternator_clustering_keys_to_ignore.insert(ckey_exploded);
}
void static_row_cells(auto&& visit_row_cells) {
_touched_parts.set<stats::part_type::STATIC_ROW>();
@@ -1471,16 +1521,29 @@ struct process_change_visitor {
}
};
auto row_state = get_row_state(_clustering_row_states, ckey);
clustering_row_cells_visitor v(
log_ck, _touched_parts, _builder,
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
_clustering_row_states, _generate_delta_values);
_enable_updating_state, &ckey, row_state,
_clustering_row_states, _generate_delta_values, _request_options.alternator);
if (_is_update && _request_options.alternator) {
v._marker_op = operation::update;
v._marker_op = row_state ? operation::update : operation::insert;
}
visit_row_cells(v);
if (_enable_updating_state) {
if (_request_options.alternator && !v._alternator_any_value_changed) {
// we need additional checks here:
// - without `row_state != nullptr` inserting new key without additional fields (so only partition / clustering key) would be
// treated as no-change, because without additional fields given by the user `v` visitor won't visit any cells
// and _alternator_any_value_changed will be false (thus item will be skipped),
// - without `row_state == nullptr && v._alternator_only_deletes` check we won't properly ignore
// column deletes for existing items, but without the column we want to delete -
// item exists (so row_state != nullptr), but we delete non-existing column, so no-op
if (row_state != nullptr || (row_state == nullptr && v._alternator_only_deletes)) {
alternator_add_ckey_to_rows_to_ignore(ckey);
}
}
// #7716: if there are no regular columns, our visitor would not have visited any cells,
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
// Ensure that the row state exists.
@@ -1497,8 +1560,12 @@ struct process_change_visitor {
auto log_ck = _builder.allocate_new_log_row(_row_delete_op);
_builder.set_clustering_columns(log_ck, ckey);
if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) {
_clustering_row_states.erase(ckey);
if (_enable_updating_state) {
if (get_row_state(_clustering_row_states, ckey)) {
_clustering_row_states.erase(ckey);
} else if (_request_options.alternator) {
alternator_add_ckey_to_rows_to_ignore(ckey);
}
}
}
@@ -1540,6 +1607,22 @@ struct process_change_visitor {
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
auto log_ck = _builder.allocate_new_log_row(_partition_delete_op);
if (_enable_updating_state) {
if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) {
// Alternator's table can be with or without clustering key. If the clustering key exists,
// delete request will be `clustered_row_delete` and will be hanlded there.
// If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here.
// The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells`
// and has some value as clustering_key (the value currently seems to be empty bytes object).
// We don't want to rely on knowing the value exactly, instead we rely on the fact that
// there will be at most one item in a partition. So if `_clustering_row_states` is empty,
// we know the delete is for a non-existing item and we should ignore it.
// If `_clustering_row_states` is not empty, then we know the delete is for an existing item
// we should log it and clear `_clustering_row_states`.
// The same logic applies to `alternator_add_ckey_to_rows_to_ignore` call in `clustered_row_delete`
// we need to insert "anything" for no-clustering-key case, so further logic will check
// if map is empty or not and will know if it should ignore the single partition item and keep it.
alternator_add_ckey_to_rows_to_ignore({});
}
_clustering_row_states.clear();
}
}
@@ -1647,6 +1730,47 @@ private:
stats::part_type_set _touched_parts;
std::unordered_set<bytes> _alternator_clustering_keys_to_ignore;
const column_definition* _alternator_clustering_key_column = nullptr;
// the function will process mutations and remove rows that are in _alternator_clustering_keys_to_ignore
// we need to take care and reindex clustering keys (cdc$batch_seq_no)
// this is used for Alternator's changes only
// NOTE: `_alternator_clustering_keys_to_ignore` must be not empty.
mutation clean_up_noop_rows(mutation mut) {
throwing_assert(!_alternator_clustering_keys_to_ignore.empty());
auto after_mut = mutation(_log_schema, mut.key());
if (!_alternator_clustering_key_column) {
// no clustering key - only single row per partition
// since _alternator_clustering_keys_to_ignore is not empty we need to drop that single row
// so we just return empty mutation instead
return after_mut;
}
int batch_seq = 0;
for (rows_entry &row : mut.partition().mutable_non_dummy_rows()) {
auto cell = row.row().cells().find_cell(_alternator_clustering_key_column->id);
if (cell) {
auto val = cell->as_atomic_cell(*_alternator_clustering_key_column).value().linearize();
if (_alternator_clustering_keys_to_ignore.contains(val)) {
continue;
}
}
auto new_key = _builder->create_ck(batch_seq++);
after_mut.partition().clustered_row(*_log_schema, std::move(new_key)) = std::move(row.row());
}
if (batch_seq > 0) {
// update end_of_batch marker
// we don't need to clear previous one, as we only removed rows
// we need to set it on the last row, because original last row might have been deleted
// batch_seq == 0 -> no rows, after_mut is empty, all entries were dropped and there's nothing to write to cdc log
auto last_key = _builder->create_ck(batch_seq - 1);
after_mut.set_cell(last_key, log_meta_column_name_bytes("end_of_batch"), data_value(true), _builder->get_timestamp(), _builder->get_ttl());
}
return after_mut;
}
public:
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options)
: _ctx(ctx)
@@ -1656,7 +1780,20 @@ public:
, _options(options)
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())
, _alternator_clustering_keys_to_ignore()
{
if (_options.alternator) {
auto cks = _schema->clustering_key_columns();
const column_definition *ck_def = nullptr;
if (!cks.empty()) {
auto it = _log_schema->columns_by_name().find(cks.front().name());
if (it == _log_schema->columns_by_name().end()) {
on_internal_error(cdc_log, fmt::format("failed to find clustering key `{}` in cdc log table `{}`", cks.front().name(), _log_schema->id()));
}
ck_def = it->second;
}
_alternator_clustering_key_column = ck_def;
}
}
// DON'T move the transformer after this
@@ -1664,7 +1801,10 @@ public:
const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token());
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
_builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema);
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage());
// alternator_streams_increased_compatibility set to true reads preimage, but we need to set
// _enable_updating_state to true to keep track of changes and produce correct pre/post images even
// if upper layer didn't request them explicitly.
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()) || (_options.alternator && _options.alternator_streams_increased_compatibility);
}
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
@@ -1761,7 +1901,9 @@ public:
._builder = *_builder,
._enable_updating_state = _enable_updating_state,
._clustering_row_states = _clustering_row_states,
._alternator_clustering_keys_to_ignore = _alternator_clustering_keys_to_ignore,
._static_row_state = _static_row_state,
._alternator_schema_has_no_clustering_key = (_alternator_clustering_key_column == nullptr),
._is_update = _is_update,
._generate_delta_values = generate_delta_values(_builder->base_schema())
};
@@ -1771,6 +1913,19 @@ public:
void end_record() override {
SCYLLA_ASSERT(_builder);
_builder->end_record();
if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) {
// we filter mutations for Alternator's changes here.
// We do it per mutation object (user might submit a batch of those in one go
// and some might be splitted because of different timestamps),
// ignore key set is cleared afterwards.
// If single mutation object contains two separate changes to the same row
// and at least one of them is ignored, all of them will be ignored.
// This is not possible in Alternator - Alternator spec forbids reusing
// primary key in single batch.
_result_mutations.back() = clean_up_noop_rows(std::move(_result_mutations.back()));
_alternator_clustering_keys_to_ignore.clear();
}
}
const row_states_map& clustering_row_states() const override {

View File

@@ -325,6 +325,13 @@ experimental:
in July 2025 to optimize shard discovery, is not yet implemented in
Alternator.
<https://github.com/scylladb/scylla/issues/25160>
* With the ``alternator_streams_increased_compatibility`` configuration
option enabled, operations that do not change the database state
(e.g., deleting a non-existent item, removing a non-existent
attribute, or re-inserting an identical item) will not produce
stream events. Without this option, such no-op operations may still
generate spurious stream events.
<https://github.com/scylladb/scylladb/issues/28368>
## Unimplemented API features

View File

@@ -1486,6 +1486,10 @@ public:
return std::ranges::subrange(_rows.begin(), _rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
auto mutable_non_dummy_rows() {
return std::ranges::subrange(_rows.begin(), _rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
// Returns the number of live CQL rows in this partition.

View File

@@ -291,7 +291,6 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag.
# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue.
# Reproduces SCYLLADB-1528.
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
@@ -314,7 +313,6 @@ def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_it
# only delete column from non-existing item used to incorrectly emit a MODIFY event
# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail
# Reproduces SCYLLADB-1528
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
@@ -346,7 +344,6 @@ def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_imag
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
@@ -1812,42 +1809,34 @@ def do_updates_1_no_ck(table, p, _):
return events
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')