mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
Merge 'alternator: fix unnecesary cdc log entries' from Radosław Cybulski
Fix cdc writing unnecesary entries to it's log, like for example when Alternator deletes an item which in reality doesn't exist.
Originally @wps0 tackled this issue. This patch is an extension of his work. His work involved adding `should_skip` function to cdc, which would process a `mutation` object and decide, wherever changes in the object should be added to cdc log or not.
The issue with his approach is that `mutation` object might contain changes for more than one row. If - for example - the `mutation` object contains two changes, delete of non-existing row and create of non-existing row, `should_skip` function will detect changes in second item and allow whole `mutation` (BOTH items) to be added. For example (using python's boto3) running this on empty table:
```
with table.batch_writer() as batch:
batch.put_item({'p': 'p', 'c': 'c0'})
batch.delete_item(Key={'p': 'p', 'c': 'c1'})
```
will emit two events ("put" event and "delete" event), even though the item with `c` set to `c1` does not exist (thus can't be deleted). Note, that both entries in batch write must use the same partition key, otherwise upper layer with split them into separate `mutation` objects and the issue will not happen.
The solution is to do similar processing, but consider each change separated from others. This is tricky to implement due to a way cdc works. When cdc processes `mutation` object (containing X changes), it emits cdc entries in phases. Phase 1 - emit `preimage` (old state) for each change (if requested). Phase 2 - for each change emit actual "diff" (update / delete and so on). Phase 3 - emit `postimage` (new state).
We will know if change needs to be skipped during phase 2. By that time phase 1 is completed and preimage for the change is emited. At that moment we set a flag that the change (identified by clustering key value) needs to be skipped - we add a clustering key to a `ignore-rows` set (`_alternator_clustering_keys_to_ignore` variable) and continue normally. Once all phases finish we add a `postprocess` phase (`clean_up_noop_rows` function). It will go through generated cdc mutations and skip all modifications, for which clustering key is in `ignore-rows` set. After skipping we need to do a "cleanup" operation - each generated cdc mutation contain index (incremented by one), if we skipped some parts, the index is not consecutive anymore, so we reindex final changes.
There's a special case worth mentioning - Alternator tables without clustering keys. At that point `mutation` object passed to cdc can contain exactly one change (since different partition keys are splitted by upper layers and Alternator will never emit `mutation` object containing two (or more) changes with the same primary key. Here, when we decide the change is to be skipped we add empty `bytes` object to `ignore-rows` set. When checking `ignore-rows` set, we check if it's empty or not (we don't check for presence of empty `bytes` object).
Note: there might be some confusion between this patch and #28452 patch. Both started from the same error observation and use similar tests for validation, as both are easily triggered by BatchWrite commands (both needs `mutation` object passed to cdc to contain more than one single change). This issue tho is about wrong data written in cdc log and is fixed at cdc, where #28452 is about wrong way of parsing correct cdc data and is fixed at Alternator side of things. Note, that we need #28452 to truly verify (otherwise we will emit correct cdc entries, but Alternator will incorrectly parse them).
Note: to benefit / notice this patch you need `alternator_streams_increased_compatibility` flag turned on.
Note: rework is quite "broad" and covers a lot of ground - every operation, that might result in a no-change to the database state should be tested. An additional test was added - trying to remove a column from non-existing item, as well as trying to remove non-existing column from existing item.
Fixes: #28368
Fixes: SCYLLADB-1528
Fixes: SCYLLADB-538
Closes scylladb/scylladb#28544
* github.com:scylladb/scylladb:
alternator: remove unnecesary code
alternator: fix Alternator writing unnecesary cdc entries
alternator: add failing tests for Streams
This commit is contained in:
183
cdc/log.cc
183
cdc/log.cc
@@ -8,7 +8,7 @@
|
||||
|
||||
#include <utility>
|
||||
#include <algorithm>
|
||||
|
||||
#include <unordered_set>
|
||||
#include <boost/range/irange.hpp>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/metrics.hh>
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "stats.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "alternator/executor.hh"
|
||||
|
||||
namespace std {
|
||||
|
||||
@@ -1068,6 +1069,14 @@ public:
|
||||
return create_ck(_batch_no - 1);
|
||||
}
|
||||
|
||||
api::timestamp_type get_timestamp() const {
|
||||
return _ts;
|
||||
}
|
||||
|
||||
ttl_opt get_ttl() const {
|
||||
return _ttl;
|
||||
}
|
||||
|
||||
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
|
||||
clustering_key allocate_new_log_row(operation op) {
|
||||
auto log_ck = allocate_new_log_row();
|
||||
@@ -1209,15 +1218,25 @@ struct process_row_visitor {
|
||||
row_states_map& _clustering_row_states;
|
||||
|
||||
const bool _generate_delta_values = true;
|
||||
|
||||
// true if we are processing changes that were produced by Alternator
|
||||
const bool _alternator;
|
||||
|
||||
// will be set to true, if any kind of change in row will be detected. Used only, when processing Alternator's changes.
|
||||
bool _alternator_any_value_changed = false;
|
||||
|
||||
// will be set to true, if Alternator's collection column (:attrs) will be modified only by removing elements
|
||||
// Used only, when processing Alternator's changes.
|
||||
bool _alternator_only_deletes = false;
|
||||
|
||||
process_row_visitor(
|
||||
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
|
||||
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
|
||||
row_states_map& clustering_row_states, bool generate_delta_values)
|
||||
row_states_map& clustering_row_states, bool generate_delta_values, bool alternator = false)
|
||||
: _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder),
|
||||
_enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state),
|
||||
_clustering_row_states(clustering_row_states),
|
||||
_generate_delta_values(generate_delta_values)
|
||||
_generate_delta_values(generate_delta_values), _alternator(alternator)
|
||||
{}
|
||||
|
||||
void update_row_state(const column_definition& cdef, managed_bytes_opt value) {
|
||||
@@ -1227,7 +1246,17 @@ struct process_row_visitor {
|
||||
auto [it, _] = _clustering_row_states.try_emplace(*_base_ck);
|
||||
_row_state = &it->second;
|
||||
}
|
||||
(*_row_state)[&cdef] = std::move(value);
|
||||
auto [ it, inserted ] = _row_state->insert({ &cdef, std::nullopt });
|
||||
|
||||
// we ignore `_alternator_any_value_changed` for non-alternator changes.
|
||||
// we don't filter if `_enable_updating_state` is false, as on top of needing pre image
|
||||
// we also need cdc to build post image for us
|
||||
// we add check for `_alternator` here for performance reasons - no point in byte compare objects
|
||||
// if the return value will be ignored
|
||||
if (_alternator && _enable_updating_state) {
|
||||
_alternator_any_value_changed = _alternator_any_value_changed || it->second != value;
|
||||
}
|
||||
it->second = std::move(value);
|
||||
}
|
||||
|
||||
void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) {
|
||||
@@ -1377,6 +1406,8 @@ struct process_row_visitor {
|
||||
auto&& deleted_keys = std::get<1>(result);
|
||||
auto&& added_cells = std::get<2>(result);
|
||||
|
||||
_alternator_only_deletes = cdef.name_as_text() == alternator::executor::ATTRS_COLUMN_NAME && !deleted_keys.empty() && !added_cells.has_value();
|
||||
|
||||
// FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob,
|
||||
// then we deserialize again when merging images below
|
||||
managed_bytes_opt deleted_elements = std::nullopt;
|
||||
@@ -1434,12 +1465,31 @@ struct process_change_visitor {
|
||||
const bool _enable_updating_state = false;
|
||||
|
||||
row_states_map& _clustering_row_states;
|
||||
|
||||
// clustering keys' as bytes of rows that should be ignored, when writing cdc log changes
|
||||
// filtering will be done in `clean_up_noop_rows` function. Used only, when processing Alternator's changes.
|
||||
// Since Alternator clustering key is always at most single column, we store unpacked clustering key.
|
||||
// If Alternator table is without clustering key, that means partition has at most one row, any value present
|
||||
// in _alternator_clustering_keys_to_ignore will make us ignore that single row -
|
||||
// we will use an empty bytes object.
|
||||
std::unordered_set<bytes>& _alternator_clustering_keys_to_ignore;
|
||||
|
||||
cell_map& _static_row_state;
|
||||
|
||||
const bool _alternator_schema_has_no_clustering_key = false;
|
||||
|
||||
const bool _is_update = false;
|
||||
|
||||
const bool _generate_delta_values = true;
|
||||
|
||||
// only called, when processing Alternator's change
|
||||
void alternator_add_ckey_to_rows_to_ignore(const clustering_key& ckey) {
|
||||
throwing_assert(_request_options.alternator);
|
||||
auto res = ckey.explode();
|
||||
auto ckey_exploded = !res.empty() ? res[0] : bytes{};
|
||||
_alternator_clustering_keys_to_ignore.insert(ckey_exploded);
|
||||
}
|
||||
|
||||
void static_row_cells(auto&& visit_row_cells) {
|
||||
_touched_parts.set<stats::part_type::STATIC_ROW>();
|
||||
|
||||
@@ -1471,16 +1521,29 @@ struct process_change_visitor {
|
||||
}
|
||||
};
|
||||
|
||||
auto row_state = get_row_state(_clustering_row_states, ckey);
|
||||
clustering_row_cells_visitor v(
|
||||
log_ck, _touched_parts, _builder,
|
||||
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
|
||||
_clustering_row_states, _generate_delta_values);
|
||||
_enable_updating_state, &ckey, row_state,
|
||||
_clustering_row_states, _generate_delta_values, _request_options.alternator);
|
||||
if (_is_update && _request_options.alternator) {
|
||||
v._marker_op = operation::update;
|
||||
v._marker_op = row_state ? operation::update : operation::insert;
|
||||
}
|
||||
visit_row_cells(v);
|
||||
|
||||
if (_enable_updating_state) {
|
||||
if (_request_options.alternator && !v._alternator_any_value_changed) {
|
||||
// we need additional checks here:
|
||||
// - without `row_state != nullptr` inserting new key without additional fields (so only partition / clustering key) would be
|
||||
// treated as no-change, because without additional fields given by the user `v` visitor won't visit any cells
|
||||
// and _alternator_any_value_changed will be false (thus item will be skipped),
|
||||
// - without `row_state == nullptr && v._alternator_only_deletes` check we won't properly ignore
|
||||
// column deletes for existing items, but without the column we want to delete -
|
||||
// item exists (so row_state != nullptr), but we delete non-existing column, so no-op
|
||||
if (row_state != nullptr || (row_state == nullptr && v._alternator_only_deletes)) {
|
||||
alternator_add_ckey_to_rows_to_ignore(ckey);
|
||||
}
|
||||
}
|
||||
// #7716: if there are no regular columns, our visitor would not have visited any cells,
|
||||
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
|
||||
// Ensure that the row state exists.
|
||||
@@ -1497,8 +1560,12 @@ struct process_change_visitor {
|
||||
auto log_ck = _builder.allocate_new_log_row(_row_delete_op);
|
||||
_builder.set_clustering_columns(log_ck, ckey);
|
||||
|
||||
if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) {
|
||||
_clustering_row_states.erase(ckey);
|
||||
if (_enable_updating_state) {
|
||||
if (get_row_state(_clustering_row_states, ckey)) {
|
||||
_clustering_row_states.erase(ckey);
|
||||
} else if (_request_options.alternator) {
|
||||
alternator_add_ckey_to_rows_to_ignore(ckey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1540,6 +1607,22 @@ struct process_change_visitor {
|
||||
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
|
||||
auto log_ck = _builder.allocate_new_log_row(_partition_delete_op);
|
||||
if (_enable_updating_state) {
|
||||
if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) {
|
||||
// Alternator's table can be with or without clustering key. If the clustering key exists,
|
||||
// delete request will be `clustered_row_delete` and will be hanlded there.
|
||||
// If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here.
|
||||
// The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells`
|
||||
// and has some value as clustering_key (the value currently seems to be empty bytes object).
|
||||
// We don't want to rely on knowing the value exactly, instead we rely on the fact that
|
||||
// there will be at most one item in a partition. So if `_clustering_row_states` is empty,
|
||||
// we know the delete is for a non-existing item and we should ignore it.
|
||||
// If `_clustering_row_states` is not empty, then we know the delete is for an existing item
|
||||
// we should log it and clear `_clustering_row_states`.
|
||||
// The same logic applies to `alternator_add_ckey_to_rows_to_ignore` call in `clustered_row_delete`
|
||||
// we need to insert "anything" for no-clustering-key case, so further logic will check
|
||||
// if map is empty or not and will know if it should ignore the single partition item and keep it.
|
||||
alternator_add_ckey_to_rows_to_ignore({});
|
||||
}
|
||||
_clustering_row_states.clear();
|
||||
}
|
||||
}
|
||||
@@ -1647,6 +1730,47 @@ private:
|
||||
|
||||
stats::part_type_set _touched_parts;
|
||||
|
||||
std::unordered_set<bytes> _alternator_clustering_keys_to_ignore;
|
||||
const column_definition* _alternator_clustering_key_column = nullptr;
|
||||
|
||||
// the function will process mutations and remove rows that are in _alternator_clustering_keys_to_ignore
|
||||
// we need to take care and reindex clustering keys (cdc$batch_seq_no)
|
||||
// this is used for Alternator's changes only
|
||||
// NOTE: `_alternator_clustering_keys_to_ignore` must be not empty.
|
||||
mutation clean_up_noop_rows(mutation mut) {
|
||||
throwing_assert(!_alternator_clustering_keys_to_ignore.empty());
|
||||
auto after_mut = mutation(_log_schema, mut.key());
|
||||
if (!_alternator_clustering_key_column) {
|
||||
// no clustering key - only single row per partition
|
||||
// since _alternator_clustering_keys_to_ignore is not empty we need to drop that single row
|
||||
// so we just return empty mutation instead
|
||||
return after_mut;
|
||||
}
|
||||
int batch_seq = 0;
|
||||
for (rows_entry &row : mut.partition().mutable_non_dummy_rows()) {
|
||||
auto cell = row.row().cells().find_cell(_alternator_clustering_key_column->id);
|
||||
if (cell) {
|
||||
auto val = cell->as_atomic_cell(*_alternator_clustering_key_column).value().linearize();
|
||||
|
||||
if (_alternator_clustering_keys_to_ignore.contains(val)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto new_key = _builder->create_ck(batch_seq++);
|
||||
after_mut.partition().clustered_row(*_log_schema, std::move(new_key)) = std::move(row.row());
|
||||
}
|
||||
|
||||
if (batch_seq > 0) {
|
||||
// update end_of_batch marker
|
||||
// we don't need to clear previous one, as we only removed rows
|
||||
// we need to set it on the last row, because original last row might have been deleted
|
||||
// batch_seq == 0 -> no rows, after_mut is empty, all entries were dropped and there's nothing to write to cdc log
|
||||
auto last_key = _builder->create_ck(batch_seq - 1);
|
||||
after_mut.set_cell(last_key, log_meta_column_name_bytes("end_of_batch"), data_value(true), _builder->get_timestamp(), _builder->get_ttl());
|
||||
}
|
||||
|
||||
return after_mut;
|
||||
}
|
||||
public:
|
||||
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options)
|
||||
: _ctx(ctx)
|
||||
@@ -1656,7 +1780,20 @@ public:
|
||||
, _options(options)
|
||||
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
|
||||
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())
|
||||
, _alternator_clustering_keys_to_ignore()
|
||||
{
|
||||
if (_options.alternator) {
|
||||
auto cks = _schema->clustering_key_columns();
|
||||
const column_definition *ck_def = nullptr;
|
||||
if (!cks.empty()) {
|
||||
auto it = _log_schema->columns_by_name().find(cks.front().name());
|
||||
if (it == _log_schema->columns_by_name().end()) {
|
||||
on_internal_error(cdc_log, fmt::format("failed to find clustering key `{}` in cdc log table `{}`", cks.front().name(), _log_schema->id()));
|
||||
}
|
||||
ck_def = it->second;
|
||||
}
|
||||
_alternator_clustering_key_column = ck_def;
|
||||
}
|
||||
}
|
||||
|
||||
// DON'T move the transformer after this
|
||||
@@ -1664,7 +1801,10 @@ public:
|
||||
const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token());
|
||||
_result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
_builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema);
|
||||
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage());
|
||||
// alternator_streams_increased_compatibility set to true reads preimage, but we need to set
|
||||
// _enable_updating_state to true to keep track of changes and produce correct pre/post images even
|
||||
// if upper layer didn't request them explicitly.
|
||||
_enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()) || (_options.alternator && _options.alternator_streams_increased_compatibility);
|
||||
}
|
||||
|
||||
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
|
||||
@@ -1761,7 +1901,9 @@ public:
|
||||
._builder = *_builder,
|
||||
._enable_updating_state = _enable_updating_state,
|
||||
._clustering_row_states = _clustering_row_states,
|
||||
._alternator_clustering_keys_to_ignore = _alternator_clustering_keys_to_ignore,
|
||||
._static_row_state = _static_row_state,
|
||||
._alternator_schema_has_no_clustering_key = (_alternator_clustering_key_column == nullptr),
|
||||
._is_update = _is_update,
|
||||
._generate_delta_values = generate_delta_values(_builder->base_schema())
|
||||
};
|
||||
@@ -1771,10 +1913,19 @@ public:
|
||||
void end_record() override {
|
||||
SCYLLA_ASSERT(_builder);
|
||||
_builder->end_record();
|
||||
}
|
||||
|
||||
const row_states_map& clustering_row_states() const override {
|
||||
return _clustering_row_states;
|
||||
if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) {
|
||||
// we filter mutations for Alternator's changes here.
|
||||
// We do it per mutation object (user might submit a batch of those in one go
|
||||
// and some might be splitted because of different timestamps),
|
||||
// ignore key set is cleared afterwards.
|
||||
// If single mutation object contains two separate changes to the same row
|
||||
// and at least one of them is ignored, all of them will be ignored.
|
||||
// This is not possible in Alternator - Alternator spec forbids reusing
|
||||
// primary key in single batch.
|
||||
_result_mutations.back() = clean_up_noop_rows(std::move(_result_mutations.back()));
|
||||
_alternator_clustering_keys_to_ignore.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
||||
@@ -2013,7 +2164,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
|
||||
}
|
||||
|
||||
return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
|
||||
return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
|
||||
auto& m = mutations[idx];
|
||||
auto& s = m.schema();
|
||||
|
||||
@@ -2031,10 +2182,10 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
if (should_split(m, options)) {
|
||||
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
|
||||
details.was_split = true;
|
||||
process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
|
||||
process_changes_with_splitting(m, trans, preimage, postimage);
|
||||
} else {
|
||||
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
|
||||
process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
|
||||
process_changes_without_splitting(m, trans, preimage, postimage);
|
||||
}
|
||||
auto [log_mut, touched_parts] = std::move(trans).finish();
|
||||
const int generated_count = log_mut.size();
|
||||
|
||||
126
cdc/split.cc
126
cdc/split.cc
@@ -6,26 +6,15 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "bytes_fwd.hh"
|
||||
#include "mutation/atomic_cell.hh"
|
||||
#include "mutation/atomic_cell_or_collection.hh"
|
||||
#include "mutation/collection_mutation.hh"
|
||||
#include "mutation/mutation.hh"
|
||||
#include "mutation/tombstone.hh"
|
||||
#include "schema/schema.hh"
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "types/user.hh"
|
||||
|
||||
#include "split.hh"
|
||||
#include "log.hh"
|
||||
#include "change_visitor.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
|
||||
extern logging::logger cdc_log;
|
||||
|
||||
@@ -610,109 +599,8 @@ bool should_split(const mutation& m, const per_request_options& options) {
|
||||
|| v._ts == api::missing_timestamp;
|
||||
}
|
||||
|
||||
// Returns true if the row state and the atomic and nonatomic entries represent
|
||||
// an equivalent item.
|
||||
static bool entries_match_row_state(const schema_ptr& base_schema, const cell_map& row_state, const std::vector<atomic_column_update>& atomic_entries,
|
||||
std::vector<nonatomic_column_update>& nonatomic_entries) {
|
||||
for (const auto& update : atomic_entries) {
|
||||
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
|
||||
const auto it = row_state.find(&cdef);
|
||||
if (it == row_state.end()) {
|
||||
return false;
|
||||
}
|
||||
if (to_managed_bytes_opt(update.cell.value().linearize()) != it->second) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (nonatomic_entries.empty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (const auto& update : nonatomic_entries) {
|
||||
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
|
||||
const auto it = row_state.find(&cdef);
|
||||
if (it == row_state.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// The only collection used by Alternator is a non-frozen map.
|
||||
auto current_raw_map = cdef.type->deserialize(*it->second);
|
||||
map_type_impl::native_type current_values = value_cast<map_type_impl::native_type>(current_raw_map);
|
||||
|
||||
if (current_values.size() != update.cells.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::unordered_map<sstring_view, bytes> current_values_map;
|
||||
for (const auto& entry : current_values) {
|
||||
const auto attr_name = std::string_view(value_cast<sstring>(entry.first));
|
||||
current_values_map[attr_name] = value_cast<bytes>(entry.second);
|
||||
}
|
||||
|
||||
for (const auto& [key, value] : update.cells) {
|
||||
const auto key_str = to_string_view(key);
|
||||
if (!value.is_live()) {
|
||||
if (current_values_map.contains(key_str)) {
|
||||
return false;
|
||||
}
|
||||
} else if (current_values_map[key_str] != value.value().linearize()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool should_skip(batch& changes, const mutation& base_mutation, change_processor& processor) {
|
||||
const schema_ptr& base_schema = base_mutation.schema();
|
||||
// Alternator doesn't use static updates and clustered range deletions.
|
||||
if (!changes.static_updates.empty() || !changes.clustered_range_deletions.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (clustered_row_insert& u : changes.clustered_inserts) {
|
||||
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
|
||||
if (!row_state) {
|
||||
return false;
|
||||
}
|
||||
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
for (clustered_row_update& u : changes.clustered_updates) {
|
||||
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
|
||||
if (!row_state) {
|
||||
return false;
|
||||
}
|
||||
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Skip only if the row being deleted does not exist (i.e. the deletion is a no-op).
|
||||
for (const auto& row_deletion : changes.clustered_row_deletions) {
|
||||
if (processor.clustering_row_states().contains(row_deletion.key)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Don't skip if the item exists.
|
||||
//
|
||||
// Increased DynamoDB Streams compatibility guarantees that single-item
|
||||
// operations will read the item and store it in the clustering row states.
|
||||
// If it is not found there, we may skip CDC. This is safe as long as the
|
||||
// assumptions of this operation's write isolation are not violated.
|
||||
if (changes.partition_deletions && processor.clustering_row_states().contains(clustering_key::make_empty())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
cdc_log.trace("Skipping CDC log for mutation {}", base_mutation);
|
||||
return true;
|
||||
}
|
||||
|
||||
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
|
||||
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
|
||||
bool enable_preimage, bool enable_postimage) {
|
||||
const auto base_schema = base_mutation.schema();
|
||||
auto changes = extract_changes(base_mutation);
|
||||
auto pk = base_mutation.key();
|
||||
@@ -732,10 +620,6 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
|
||||
affected_clustered_columns_per_row = btch.get_affected_clustered_columns_per_row(*base_mutation.schema());
|
||||
}
|
||||
|
||||
if (alternator_strict_compatibility && should_skip(btch, base_mutation, processor)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const bool is_last = change_ts == last_timestamp;
|
||||
processor.begin_timestamp(change_ts, is_last);
|
||||
if (enable_preimage) {
|
||||
@@ -825,13 +709,7 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
|
||||
}
|
||||
|
||||
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
|
||||
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
|
||||
if (alternator_strict_compatibility) {
|
||||
auto changes = extract_changes(base_mutation);
|
||||
if (should_skip(changes.begin()->second, base_mutation, processor)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
bool enable_preimage, bool enable_postimage) {
|
||||
auto ts = find_timestamp(base_mutation);
|
||||
processor.begin_timestamp(ts, true);
|
||||
|
||||
|
||||
@@ -66,14 +66,12 @@ public:
|
||||
// Tells processor we have reached end of record - last part
|
||||
// of a given timestamp batch
|
||||
virtual void end_record() = 0;
|
||||
|
||||
virtual const row_states_map& clustering_row_states() const = 0;
|
||||
};
|
||||
|
||||
bool should_split(const mutation& base_mutation, const per_request_options& options);
|
||||
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
|
||||
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
|
||||
bool enable_preimage, bool enable_postimage);
|
||||
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
|
||||
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
|
||||
bool enable_preimage, bool enable_postimage);
|
||||
|
||||
}
|
||||
|
||||
@@ -317,6 +317,13 @@ experimental:
|
||||
in July 2025 to optimize shard discovery, is not yet implemented in
|
||||
Alternator.
|
||||
<https://github.com/scylladb/scylla/issues/25160>
|
||||
* With the ``alternator_streams_increased_compatibility`` configuration
|
||||
option enabled, operations that do not change the database state
|
||||
(e.g., deleting a non-existent item, removing a non-existent
|
||||
attribute, or re-inserting an identical item) will not produce
|
||||
stream events. Without this option, such no-op operations may still
|
||||
generate spurious stream events.
|
||||
<https://github.com/scylladb/scylladb/issues/28368>
|
||||
|
||||
## Unimplemented API features
|
||||
|
||||
|
||||
@@ -1486,6 +1486,10 @@ public:
|
||||
return std::ranges::subrange(_rows.begin(), _rows.end())
|
||||
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
|
||||
}
|
||||
auto mutable_non_dummy_rows() {
|
||||
return std::ranges::subrange(_rows.begin(), _rows.end())
|
||||
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
|
||||
}
|
||||
void accept(const schema&, mutation_partition_visitor&) const;
|
||||
|
||||
// Returns the number of live CQL rows in this partition.
|
||||
|
||||
@@ -305,6 +305,93 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
|
||||
# local java throws here. real does not.
|
||||
ensure_java_server(dynamodbstreams, error=None)
|
||||
|
||||
# We run a batch that mixes noop operations (in our concrete example - one delete of non-existing item) with real changes (put of two new items).
|
||||
# Expected behaviour - Streams will return only two inserts (no delete). Observed behaviour - we get modify for delete as well.
|
||||
# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag.
|
||||
# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue.
|
||||
# Reproduces SCYLLADB-1528.
|
||||
def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams):
|
||||
null = None
|
||||
def do_updates(table, p, c):
|
||||
events = []
|
||||
|
||||
with table.batch_writer() as batch:
|
||||
batch.put_item({'p': p, 'c': c + '0', 'a': 0})
|
||||
events.append(['INSERT',{'c':c + '0','p':p},null,{'c':c + '0','a':0,'p':p}])
|
||||
batch.delete_item(Key={'p': p, 'c': c + '1'})
|
||||
batch.put_item({'p': p, 'c': c + '2', 'a': 2})
|
||||
|
||||
events.append(['INSERT',{'c':c + '2','p':p},null,{'c':c + '2','a':2,'p':p}])
|
||||
return events
|
||||
|
||||
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
||||
do_test(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
# Running update_item with UpdateExpression set to remove column should not emit a MODIFY event when item does not exist
|
||||
# The test tries all combinations (delete column from non-existing item, delete existing column from existing item, delete non-existing column from existing item)
|
||||
# only delete column from non-existing item used to incorrectly emit a MODIFY event
|
||||
# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail
|
||||
# Reproduces SCYLLADB-1528
|
||||
def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
||||
null = None
|
||||
def do_updates(table, p, c):
|
||||
events = []
|
||||
# first we try to remove column from non existing item
|
||||
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p, 'c': c}).get('Item', None)
|
||||
assert v is None
|
||||
|
||||
# then we try to remove existing column from existing item
|
||||
table.update_item(Key={'p': p, 'c': c}, UpdateExpression="SET e = :e, g = :g", ExpressionAttributeValues={':e': 166, ':g': 166})
|
||||
v = table.get_item(Key={'p': p, 'c': c})['Item']
|
||||
assert v == {'p': p, 'c': c, 'e': 166, 'g': 166}
|
||||
events.append(['INSERT',{'c':c,'p':p},null,{'c':c,'e':166,'g':166,'p':p}])
|
||||
|
||||
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p, 'c': c})['Item']
|
||||
assert v == {'p': p, 'c': c, 'e': 166}
|
||||
events.append(['MODIFY',{'c':c,'p':p},{'c':c,'e':166,'g':166,'p':p}, {'c':c,'e':166,'p':p}])
|
||||
|
||||
# finally we try again to remove the same column (non existing) from existing item
|
||||
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p, 'c': c})['Item']
|
||||
assert v == {'p': p, 'c': c, 'e': 166}
|
||||
|
||||
return events
|
||||
|
||||
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
||||
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key
|
||||
def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
|
||||
null = None
|
||||
def do_updates(table, p, c):
|
||||
events = []
|
||||
# first we try to remove column from non existing item
|
||||
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p}).get('Item', None)
|
||||
assert v is None
|
||||
|
||||
# then we try to remove existing column from existing item
|
||||
table.update_item(Key={'p': p}, UpdateExpression='SET e = :e, g = :g', ExpressionAttributeValues={':e': 166, ':g': 166})
|
||||
v = table.get_item(Key={'p': p})['Item']
|
||||
assert v == {'p': p, 'e': 166, 'g': 166}
|
||||
events.append(['INSERT',{'p':p},null,{'e':166,'g':166,'p':p}])
|
||||
|
||||
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p})['Item']
|
||||
assert v == {'p': p, 'e': 166}
|
||||
events.append(['MODIFY',{'p':p},{'e':166,'g':166,'p':p}, {'e':166,'p':p}])
|
||||
|
||||
# finally we try again to remove the same column (non existing) from existing item
|
||||
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
|
||||
v = table.get_item(Key={'p': p})['Item']
|
||||
assert v == {'p': p, 'e': 166}
|
||||
|
||||
return events
|
||||
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
||||
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
def test_get_shard_iterator(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
streams = dynamodbstreams.list_streams(TableName=table.name)
|
||||
@@ -1691,6 +1778,14 @@ def do_updates_1(table, p, c):
|
||||
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
|
||||
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})
|
||||
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])
|
||||
|
||||
# put item with just a key, must be an insert as it doesn't exist
|
||||
table.put_item(Item={'p': p, 'c': c + '1'})
|
||||
events.append(['INSERT', {'p': p, 'c': c + '1'}, None, {'p': p, 'c': c + '1'}])
|
||||
# put item with just a key but it exists, so no event
|
||||
table.put_item(Item={'p': p, 'c': c + '1'})
|
||||
# delete non-existing column from item with only key fields - no event
|
||||
table.update_item(Key={'p': p, 'c': c + '1'}, UpdateExpression='REMOVE g')
|
||||
return events
|
||||
|
||||
# The tested events are the same as in do_updates_1, but the table doesn't have
|
||||
@@ -1723,6 +1818,14 @@ def do_updates_1_no_ck(table, p, _):
|
||||
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
|
||||
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]})
|
||||
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}])
|
||||
# put item with just a key, must be an insert as it doesn't exist
|
||||
table.put_item(Item={'p': p + '1'})
|
||||
events.append(['INSERT', {'p': p + '1'}, None, {'p': p + '1'}])
|
||||
# put item with just a key but it exists, so no event
|
||||
table.put_item(Item={'p': p + '1'})
|
||||
# delete non-existing column from item with only key fields - no event
|
||||
table.update_item(Key={'p': p + '1'}, UpdateExpression='REMOVE g')
|
||||
|
||||
return events
|
||||
|
||||
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
|
||||
|
||||
Reference in New Issue
Block a user