From d93299b605e0e0413b5c81d64407b68ed9ec9a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 5 Jan 2026 19:14:48 +0100 Subject: [PATCH 1/5] alternator: add system_keyspace reference Add a reference to `system_keyspace` object to `executor` object in alternator. The reference is needed, because in future commit we will add there (and use) helper functions that read `cdc_log` tables for tablet based tables similarly to already existing siblings for vnodes living in `system_distributed_keyspace`. --- alternator/controller.cc | 4 +++- alternator/controller.hh | 3 +++ alternator/executor.cc | 2 ++ alternator/executor.hh | 3 +++ main.cc | 2 +- 5 files changed, 12 insertions(+), 2 deletions(-) diff --git a/alternator/controller.cc b/alternator/controller.cc index e55c3d1d6d..40c8388f78 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -32,6 +32,7 @@ controller::controller( sharded& ss, sharded& mm, sharded& sys_dist_ks, + sharded& sys_ks, sharded& cdc_gen_svc, sharded& memory_limiter, sharded& auth_service, @@ -45,6 +46,7 @@ controller::controller( , _ss(ss) , _mm(mm) , _sys_dist_ks(sys_dist_ks) + , _sys_ks(sys_ks) , _cdc_gen_svc(cdc_gen_svc) , _memory_limiter(memory_limiter) , _auth_service(auth_service) @@ -94,7 +96,7 @@ future<> controller::start_server() { auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value { return cfg.alternator_timeout_in_ms; }; - _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), + _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), std::ref(_sys_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), std::ref(_vsc), _ssg.value(), sharded_parameter(get_timeout_in_ms, std::ref(_config))).get(); _server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get(); diff --git a/alternator/controller.hh b/alternator/controller.hh index d7a73b8f69..cd8bdf7f55 100644 --- a/alternator/controller.hh +++ b/alternator/controller.hh @@ -22,6 +22,7 @@ class memory_limiter; namespace db { class system_distributed_keyspace; +class system_keyspace; class config; } @@ -65,6 +66,7 @@ class controller : public protocol_server { sharded& _ss; sharded& _mm; sharded& _sys_dist_ks; + sharded& _sys_ks; sharded& _cdc_gen_svc; sharded& _memory_limiter; sharded& _auth_service; @@ -84,6 +86,7 @@ public: sharded& ss, sharded& mm, sharded& sys_dist_ks, + sharded& sys_ks, sharded& cdc_gen_svc, sharded& memory_limiter, sharded& auth_service, diff --git a/alternator/executor.cc b/alternator/executor.cc index fa7e58c5ea..4509578777 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -271,6 +271,7 @@ executor::executor(gms::gossiper& gossiper, service::storage_service& ss, service::migration_manager& mm, db::system_distributed_keyspace& sdks, + db::system_keyspace& system_keyspace, cdc::metadata& cdc_metadata, vector_search::vector_store_client& vsc, smp_service_group ssg, @@ -280,6 +281,7 @@ executor::executor(gms::gossiper& gossiper, _proxy(proxy), _mm(mm), _sdks(sdks), + _system_keyspace(system_keyspace), _cdc_metadata(cdc_metadata), _vsc(vsc), _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization), diff --git a/alternator/executor.hh b/alternator/executor.hh index 3de07fd7b0..1f38a5f734 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -34,6 +34,7 @@ namespace db { class system_distributed_keyspace; + class system_keyspace; } namespace audit { @@ -88,6 +89,7 @@ class executor : public peering_sharded_service { service::storage_proxy& _proxy; service::migration_manager& _mm; db::system_distributed_keyspace& _sdks; + db::system_keyspace& _system_keyspace; cdc::metadata& _cdc_metadata; vector_search::vector_store_client& _vsc; utils::updateable_value _enforce_authorization; @@ -131,6 +133,7 @@ public: service::storage_service& ss, service::migration_manager& mm, db::system_distributed_keyspace& sdks, + db::system_keyspace& system_keyspace, cdc::metadata& cdc_metadata, vector_search::vector_store_client& vsc, smp_service_group ssg, diff --git a/main.cc b/main.cc index 9e5bb50c7b..6455be59ad 100644 --- a/main.cc +++ b/main.cc @@ -2609,7 +2609,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::set_server_service_levels(ctx, cql_server_ctl, qp).get(); - alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group); + alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, sys_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group); // Register at_exit last, so that storage_service::drain_on_shutdown will be called first auto do_drain = defer_verbose_shutdown("local storage", [&ss] { From eb35a7b6ceae117e2364440fdb8350c7dbc57a04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 5 Jan 2026 19:14:52 +0100 Subject: [PATCH 2/5] treewide: add cdc helper functions to system_keyspace Add helper functions to `system_keyspace` object, that deal with reading cdc content for tablet based table's. `read_cdc_for_tablets_current_generation_timestamp` will read current generation's timestamp. `read_cdc_for_tablets_versioned_streams` will build timestamp -> `cdc::streams_version` map similar to how `system_distributed_keyspace::cdc_get_versioned_streams` works. We're adding those helper functions, because their siblings in `system_distributed_keyspace` work only, when base table is backed up by vnodes. New additions work only, when base table is backed up by tablets. --- db/system_distributed_keyspace.hh | 8 ++++ db/system_keyspace.cc | 62 +++++++++++++++++++++++++++++++ db/system_keyspace.hh | 15 +++++++- 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 1e3a3ab805..9b94985ca9 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -93,8 +93,16 @@ public: future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context); future cdc_desc_exists(db_clock::time_point, context); + // Reads and builds generation map - a map from generation timestamps to vector of all stream ids for that generation. + // Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation). + // Returns empty map if there are no generations with timestamp >= `not_older_than`. + // NOTE: there's a sibling `read_cdc_for_tablets_versioned_streams`, that reads the same data for tables backed by tablets. The data returned is the same. + // NOTE: currently used only by alternator future> cdc_get_versioned_streams(db_clock::time_point not_older_than, context); + // Read current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found. + // NOTE: there's a sibling `read_cdc_for_tablets_current_generation_timestamp` in `system_keyspace`, that does the same for tables backed up by tablets. + // NOTE: currently used only by alternator future cdc_current_generation_timestamp(context); future get_service_levels(qos::query_context ctx) const; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 66497a645b..5f9544b66e 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -2057,6 +2058,67 @@ future> system_keyspace::get_local_tokens() { }); } +// Tablet-based counterpart of system_distributed_keyspace::cdc_current_generation_timestamp. +// Unlike the vnode version which reads from a replicated system_distributed table (and thus +// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet +// metadata, so consistency_level::ONE is the only meaningful level. +future system_keyspace::read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name) { + // note: we only care about the newest ("current") timestamp, hence limit 1. + // this depends on table's native clustering order being DESC on timestamp, which is the case for CDC_TIMESTAMPS. + static const sstring query = format("SELECT timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ? limit 1", NAME, CDC_TIMESTAMPS); + auto timestamp_cql = co_await _qp.execute_internal( + query, + db::consistency_level::ONE, + { ks_name, table_name }, + cql3::query_processor::cache_internal::no); + + co_return timestamp_cql->one().get_as("timestamp"); +} + +// Tablet-based counterpart of system_distributed_keyspace::cdc_get_versioned_streams. +// Unlike the vnode version which reads from a replicated system_distributed table (and thus +// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet +// metadata, so consistency_level::ONE is the only meaningful level. +future> system_keyspace::read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than) { + // We read all streams and filter in memory, because we need to include + // the generation that straddles the `not_older_than` boundary (i.e. the + // one just before it), and the virtual table does not support the + // required range query directly. + static const sstring stream_id_query = format("SELECT stream_id, stream_state, timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ?", NAME, CDC_STREAMS); + + std::map> temp_result; + + co_await _qp.query_internal(stream_id_query, + db::consistency_level::ONE, + data_value_list{ ks_name, table_name }, + 1000, // page size + [&] (const cql3::untyped_result_set_row& row) -> future { + auto stream_state = cdc::read_stream_state(row.get_as("stream_state")); + if (stream_state != cdc::stream_state::current) { + co_return stop_iteration::no; + } + auto ts = row.get_as("timestamp"); + + temp_result[ts].push_back(cdc::stream_id{ row.get_as("stream_id") }); + co_await coroutine::maybe_yield(); + co_return stop_iteration::no; + }); + + std::map result; + // Include the generation that straddles the boundary, matching the vnode + // counterpart (cdc_get_versioned_streams) which does the same adjustment. + auto it = temp_result.lower_bound(not_older_than); + if (it != temp_result.begin()) { + --it; + } + for (; it != temp_result.end(); ++it) { + auto& ts = it->first; + auto& streams = it->second; + result.insert_or_assign(ts, cdc::streams_version{ std::move(streams), ts }); + } + co_return std::move(result); +} + future<> system_keyspace::read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) { static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index dd0de6c296..eb9a631d83 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -456,7 +456,7 @@ public: // For this purpose, records of cleanup operations (the affected token ranges // and commitlog ranges) are kept in a system table. // - // The below functions manipulate these records. + // The below functions manipulate these records. // Saves a record of a token range affected by cleanup. // After reboot, tokens from this range will be replayed only if they are on replay positions @@ -587,6 +587,19 @@ public: future<> read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f); future<> read_cdc_streams_history(table_id table, std::optional from, noncopyable_function(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f); + // Reads current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found. + // NOTE: there's a sibling `cdc_current_generation_timestamp` in `system_distributed_keyspace`, that does the same for tables backed up by vnodes. + // NOTE: currently used only by alternator + future read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name); + + // Reads and builds generation map for a given table - a map from generation timestamps to vector of all stream ids for that generation. + // Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation). + // If `not_older_than` is not provided (defaults to min), all generations will be returned. + // Returns empty map if table is not found or if there are no generations. + // NOTE: there's a sibling `cdc_get_versioned_streams`, that reads the same data for tables backed by legacy vnodes. The data returned is the same. + // NOTE: currently used only by alternator + future> read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than = db_clock::time_point::min()); + // Load Raft Group 0 id from scylla.local future get_raft_group0_id(); From d5df3ec07c197e16d4b37705c41cbd82fae277af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 5 Jan 2026 19:14:53 +0100 Subject: [PATCH 3/5] alternator: implement streams for tablets Add a code, that will handle Streams reading, when table is using tablets underneath. Fixes #23838 --- alternator/streams.cc | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 1f77229e6d..a2e0eb701c 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -896,10 +896,22 @@ future executor::describe_stream(client_state& cl // TODO: label // TODO: creation time - // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) - auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + std::map topologies; + + // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) + if (schema->table().uses_tablets()) { + // We can't use table creation time here, as tablets might report a + // generation timestamp just before table creation. This is safe + // because CDC generations are per-table and cannot pre-date the + // table, so expanding the window won't pull in unrelated data. + auto low_ts = db_clock::now() - ttl; + topologies = co_await _system_keyspace.read_cdc_for_tablets_versioned_streams(bs->ks_name(), bs->cf_name(), low_ts); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); + } - std::map topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); const auto e = topologies.end(); std::optional shard_filter; @@ -1485,9 +1497,15 @@ future executor::get_records(client_state& client } // ugh. figure out if we are and end-of-shard - auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); - db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + db_clock::time_point ts; + if (schema->table().uses_tablets()) { + ts = co_await _system_keyspace.read_cdc_for_tablets_current_generation_timestamp(base->ks_name(), base->cf_name()); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + } + auto& shard = iter.shard; if (shard.time < ts && ts < high_ts) { From 6be16cf2242960ad309a78bad1afa161043d1216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 5 Jan 2026 19:14:53 +0100 Subject: [PATCH 4/5] alternator: remove antitablet guards when using Streams Remove `if` condition, that prevented tables with tablets working with Streams. Remove a test, that verifies, that Alternator will reject tables with tablets underneath working with Streams feature enabled on them. Update few tests, that were expected to fail on tablets to enable their normal execution. --- alternator/executor.cc | 16 +--------------- docs/alternator/compatibility.md | 8 -------- docs/alternator/new-apis.md | 8 +------- test/alternator/test_cql_schema.py | 6 ------ test/alternator/test_metrics.py | 6 ------ test/alternator/test_tablets.py | 20 -------------------- test/alternator/test_ttl.py | 6 ------ 7 files changed, 2 insertions(+), 68 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 4509578777..2db1c3700d 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -1647,16 +1647,7 @@ future executor::create_table_on_shard0(service:: locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option()); const auto& topo = _proxy.local_db().get_token_metadata().get_topology(); auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo); - // Alternator Streams doesn't yet work when the table uses tablets (#23838) - if (stream_specification && stream_specification->IsObject()) { - auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); - if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) { - if (rs->uses_tablets()) { - co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). " - "If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'."); - } - } - } + // Vector indexes is a new feature that we decided to only support // on tablets. if (vector_indexes && vector_indexes->Size() > 0) { @@ -1848,14 +1839,9 @@ future executor::update_table(client_state& clien if (add_stream_options(*stream_specification, builder, p.local())) { validate_cdc_log_name_length(builder.cf_name()); } - // Alternator Streams doesn't yet work when the table uses tablets (#23838) auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); if (stream_enabled && stream_enabled->IsBool()) { if (stream_enabled->GetBool()) { - if (p.local().local_db().find_keyspace(tab->ks_name()).get_replication_strategy().uses_tablets()) { - co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). " - "If you want to enable streams, re-create this table with vnodes (with the tag 'system:initial_tablets' set to 'none')."); - } if (tab->cdc_options().enabled()) { co_return api_error::validation("Table already has an enabled stream: TableName: " + tab->cf_name()); } diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 5b2198f3e0..311245e0c0 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -296,14 +296,6 @@ experimental: considered experimental so needs to be enabled explicitly with the `--experimental-features=alternator-streams` configuration option. - In this version, Alternator Streams is only supported if the base table - uses vnodes instead of tablets. However, by default new tables use tablets - so to create a table that can be used with Streams, you must set the tag - `system:initial_tablets` set to `none` during CreateTable - so that the - new table will use vnodes. Streams cannot be enabled on an already-existing - table that uses tablets. - See . - Alternator streams also differ in some respects from DynamoDB Streams: * The number of separate "shards" in Alternator's streams is significantly larger than is typical on DynamoDB. diff --git a/docs/alternator/new-apis.md b/docs/alternator/new-apis.md index d61d5657f9..d37e26116e 100644 --- a/docs/alternator/new-apis.md +++ b/docs/alternator/new-apis.md @@ -184,7 +184,7 @@ entire data center, or other data centers, in that case. It replaces the older approach which was named "vnodes". See [Data Distribution with Tablets](../architecture/tablets.rst) for details. -In this version, tablet support is almost complete, so new +In this version, tablet support is complete, so new Alternator tables default to following what the global configuration flag [tablets_mode_for_new_keyspaces](../reference/configuration-parameters.rst#confval-tablets_mode_for_new_keyspaces) tells them to. @@ -207,9 +207,3 @@ in the CreateTable operation. The value of this tag can be: The `system:initial_tablets` tag only has any effect while creating a new table with CreateTable - changing it later has no effect. -Because the tablets support is incomplete, when tablets are enabled for an -Alternator table, the following features will not work for this table: - -* Enabling Streams with CreateTable or UpdateTable doesn't work - (results in an error). - See . diff --git a/test/alternator/test_cql_schema.py b/test/alternator/test_cql_schema.py index 6b10ce2405..998360029f 100644 --- a/test/alternator/test_cql_schema.py +++ b/test/alternator/test_cql_schema.py @@ -112,12 +112,6 @@ def test_alternator_vs_cql(dynamodb, test_table, cql_keyspace, cql_table, option @pytest.fixture(scope='module') def table1(dynamodb): with new_test_table(dynamodb, - # Alternator Streams is expected to fail with tablets due to #23838. - # To ensure that this test still runs, instead of xfailing it, we - # temporarily coerce Alternator to avoid using default tablets - # setting, even if it's available. Remove this "Tags=" line when - # issue #23838 is solved. - Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}], KeySchema=[ # Must have both hash key and range key to allow LSI creation { 'AttributeName': 'p', 'KeyType': 'HASH' }, diff --git a/test/alternator/test_metrics.py b/test/alternator/test_metrics.py index 2ab1e5fb6a..ef3ef17b7a 100644 --- a/test/alternator/test_metrics.py +++ b/test/alternator/test_metrics.py @@ -542,12 +542,6 @@ def test_streams_latency(dynamodb, dynamodbstreams, metrics): # latency metrics are only updated for *successful* operations so we # need to use a real Alternator Stream in this test. with new_test_table(dynamodb, - # Alternator Streams is expected to fail with tablets due to #23838. - # To ensure that this test still runs, instead of xfailing it, we - # temporarily coerce Alternator to avoid using default tablets - # setting, even if it's available. We do this by using the following - # tags when creating the table: - Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}], KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }], AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }], StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'} diff --git a/test/alternator/test_tablets.py b/test/alternator/test_tablets.py index 8e0b796277..4072474baa 100644 --- a/test/alternator/test_tablets.py +++ b/test/alternator/test_tablets.py @@ -131,26 +131,6 @@ def test_tablets_tag_vs_config(dynamodb): with new_test_table(dynamodb, **schema_vnodes) as table: pass -# Before Alternator Streams is supported with tablets (#23838), let's verify -# that enabling Streams results in an orderly error. This test should be -# deleted when #23838 is fixed. -def test_streams_enable_error_with_tablets(dynamodb): - # Test attempting to create a table already with streams - with pytest.raises(ClientError, match='ValidationException.*tablets'): - with new_test_table(dynamodb, - Tags=[{'Key': initial_tablets_tag, 'Value': '4'}], - StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}, - KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ], - AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table: - pass - # Test attempting to add a stream to an existing table - with new_test_table(dynamodb, - Tags=[{'Key': initial_tablets_tag, 'Value': '4'}], - KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ], - AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table: - with pytest.raises(ClientError, match='ValidationException.*tablets'): - table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}); - # For a while (see #18068) it was possible to create an Alternator table with # tablets enabled and choose LWT for write isolation (always_use_lwt) # but the writes themselves failed. This test verifies that this is no longer diff --git a/test/alternator/test_ttl.py b/test/alternator/test_ttl.py index 2e4eb20699..8ac9a8517c 100644 --- a/test/alternator/test_ttl.py +++ b/test/alternator/test_ttl.py @@ -655,12 +655,6 @@ def test_ttl_expiration_lsi_key(dynamodb, waits_for_expiration): # content), and a special userIdentity flag saying that this is not a regular # REMOVE but an expiration. Reproduces issue #11523. def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration): - # Alternator Streams currently doesn't work with tablets, so until - # #23838 is solved, skip this test on tablets. - for tag in TAGS: - if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit(): - pytest.skip("Streams test skipped on tablets due to #23838") - # In my experiments, a 30-minute (1800 seconds) is the typical # expiration delay in this test. If the test doesn't finish within # max_duration, we report a failure. From 9a6aed721b258a006636813349061ffb2afbebaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 5 Jan 2026 19:14:56 +0100 Subject: [PATCH 5/5] alternator: add streams with tablets tests Add tests for Streams, when table uses tablets underneath. One test verifies filtering using CHILD_SHARDS feature. Other one makes sure we get read all data while the table undergoes tablet count change. Add `--tablet-load-stats-refresh-interval-in-seconds=1` to `alternator/run` script, as otherwise newly added tests will fail. The setting changes how often scylla refreshes tablet metadata. This can't be done using `scylla_config_temporary`, as 1) default is 60 seconds 2) scylla will wait full timeout (60s) to read configuration variable again. --- test/alternator/run | 5 + test/alternator/test_config.yaml | 2 + test/alternator/test_streams.py | 37 +- test/alternator/test_streams_tablets.py | 543 ++++++++++++++++++++++++ 4 files changed, 578 insertions(+), 9 deletions(-) create mode 100644 test/alternator/test_streams_tablets.py diff --git a/test/alternator/run b/test/alternator/run index 8c96fd0b82..7edb451026 100755 --- a/test/alternator/run +++ b/test/alternator/run @@ -75,6 +75,11 @@ def run_alternator_cmd(pid, dir): # We only list here Alternator-specific experimental features - CQL # ones are listed in test/cqlpy/run.py. '--experimental-features=alternator-streams', + # this is required by test_streams.py test_parent_filtering and test_get_records_with_alternating_tablets_count + # setting the value using scylla_config_temporary won't work, because the value is read + # at the start and then periodically with `tablet-load-stats-refresh-interval-in-seconds` + # interval, which by default is 60 seconds. + '--tablet-load-stats-refresh-interval-in-seconds=1', ] if '--https' in sys.argv: run.setup_ssl_certificate(dir) diff --git a/test/alternator/test_config.yaml b/test/alternator/test_config.yaml index 72c077f94a..73b44f8283 100644 --- a/test/alternator/test_config.yaml +++ b/test/alternator/test_config.yaml @@ -9,6 +9,8 @@ extra_scylla_cmdline_options: - '--logger-log-level=alternator_controller=trace' - '--logger-log-level=alternator_ttl=trace' - '--logger-log-level=paxos=trace' + - '--tablet-load-stats-refresh-interval-in-seconds=1' + - '--tablets-initial-scale-factor=1' extra_scylla_config_options: { experimental_features: [ diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 8cb27b1aeb..60522c2752 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -16,12 +16,29 @@ from botocore.exceptions import ClientError from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request -# All tests in this file are expected to fail with tablets due to #23838. -# To ensure that Alternator Streams is still being tested, instead of -# xfailing these tests, we temporarily coerce the tests below to avoid -# using default tablets setting, even if it's available. We do this by -# using the following tags when creating each table below: -TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}] +TAGS = [] +# The following fixture is to ensure that tests in this module will be tested with both vnodes and tablets. +# This fixture runs automatically for every test in this module. +# To avoid relying on semantics of a similar fixture used in TTL tests, we define it locally here instead of reusing +# that fixture, and we do not import or reuse fixtures across modules. +# It sets the TAGS variable in the module’s global namespace to the current parameter value before each test. +# All tests will be run with both values. On AWS the parameterization is meaningless, so we skip the second variant. +@pytest.fixture(params=[ + [{'Key': 'system:initial_tablets', 'Value': 'none'}], + [{'Key': 'system:initial_tablets', 'Value': '0'}], +], ids=["using vnodes", "using tablets"], autouse=True) +def tags_param(request, dynamodb): + if is_aws(dynamodb) and request.param[0].get('Value') != 'none': + pytest.skip('vnodes/tablets parameterization not applicable on AWS') + # Set TAGS in the global namespace of this module + global TAGS + TAGS = request.param + +def running_using_vnodes(): + for tag in TAGS: + if tag['Key'] == 'system:initial_tablets': + v = tag['Value'] + return not v or not v.isdigit() stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES'] @@ -54,12 +71,14 @@ def disable_stream(dynamodbstreams, table): # So we have to create and delete a table per test. And not run this # test to often against aws. @contextmanager -def create_stream_test_table(dynamodb, StreamViewType=None): +def create_stream_test_table(dynamodb, StreamViewType=None, Tags=None): + if Tags is None: + Tags = TAGS spec = { 'StreamEnabled': False } if StreamViewType != None: spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType} table = create_test_table(dynamodb, StreamSpecification=spec, - Tags=TAGS, + Tags=Tags, KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], @@ -415,7 +434,7 @@ def test_get_records(dynamodb, dynamodbstreams): if 'NextShardIterator' in response: next_iterators.append(response['NextShardIterator']) - records = response.get('Records') + records = response.get('Records', []) # print("Query {} -> {}".format(iter, records)) if records: for record in records: diff --git a/test/alternator/test_streams_tablets.py b/test/alternator/test_streams_tablets.py new file mode 100644 index 0000000000..88e6d54ca3 --- /dev/null +++ b/test/alternator/test_streams_tablets.py @@ -0,0 +1,543 @@ +# Copyright 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + +# Tests for stream operations' nuanses that are intristic to ScyllaDB (parent-children relationship on stream shards). + +import time, random, collections + +import pytest +from test.alternator.test_streams import create_stream_test_table, wait_for_active_stream + +TABLET_TAGS = [{'Key': 'system:initial_tablets', 'Value': '0'}] + +# get table_id from keyspace and table name +def get_table_or_view_id(cql, keyspace: str, table: str): + rows = cql.execute(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'") + try: + row = rows.one() + except Exception: + row = None + if row is not None: + return row.id + rows = cql.execute(f"select id from system_schema.views where keyspace_name = '{keyspace}' and view_name = '{table}'") + return rows.one().id + +# get user table id from cdc_log table id +def get_base_table(cql, table_id): + # copied from tablets.py:get_base_table + # this might return more than one row, but we don't care as all rows + # will have the same base_table value + rows = cql.execute(f"SELECT base_table FROM system.tablets where table_id = {table_id} limit 1") + return rows.one().base_table + +# validate that streams tables are synchronized with tablets count - all new cdc shards have been created +def assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name): + # we'll try twice here, as in my tests occasionally i've got into a situation, where this function + # failed, it seems streams were not yet fully updated (debug build). + for x in range(0, 2): + tablet_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + ts = cql.execute(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='{table_name}' ORDER BY timestamp DESC LIMIT 1").one().ts + CdcStreamState_CURRENT = 0 # from test.cluster.test_cdc_with_tablets.CdcStreamState.CURRENT + count = cql.execute(f"SELECT count(*) FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='{table_name}' AND timestamp = {ts} AND stream_state = {CdcStreamState_CURRENT} limit 1").one().count + if count == tablet_count: + break + # on debug occasionally we need more time + time.sleep(0.1) + assert count == tablet_count + +# return tablet count for given cdc_log table (table that holds cdc data for given user table) +def get_tablet_count_for_base_table_of_table(rest_api, cql, keyspace_name: str, table_name: str): + table_id = get_table_or_view_id(cql, keyspace_name, table_name) + table_id = get_base_table(cql, table_id) + # this might return more than one row, but we don't care as all rows + # will have the same tablet_count value + rows = cql.execute(f"SELECT tablet_count FROM system.tablets where table_id = {table_id} limit 1") + return rows.one().tablet_count + +# modify tablet count for given cdc_log table and wait until the change is applied +# this calls alter table with new `min_tablet_count`, which requires additional scylla options to work reliably +# (--tablet-load-stats-refresh-interval-in-seconds=1 and --tablets-initial-scale-factor=1) +def set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, expected_tablet_count): + assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name) + cql.execute(f"ALTER TABLE \"{ks}\".\"{cdc_log_table_name}\" WITH tablets = {{'min_tablet_count': {expected_tablet_count}}};") + start = time.time() + while time.time() < start + 10: + if get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) == expected_tablet_count: + break + time.sleep(0.1) + else: + pytest.fail(f'Tablet count did not reach expected value {expected_tablet_count} within timeout') + +def iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=None): + params = { + 'StreamArn': arn + } + if filter_shard_id is not None: + params['ShardFilter'] = { + 'Type': 'CHILD_SHARDS', + 'ShardId': filter_shard_id + } + desc = dynamodbstreams.describe_stream(**params) + + while True: + assert time.time() <= end_ts, "Time ran out" + shards = desc['StreamDescription']['Shards'] + + for shard in shards: + yield shard + assert time.time() <= end_ts, "Time ran out" + + last_shard = desc["StreamDescription"].get("LastEvaluatedShardId") + if not last_shard: + break + + desc = dynamodbstreams.describe_stream(ExclusiveStartShardId=last_shard, **params) + +# helper function for parent-child relationship test - it will: +# - drive changes in tablet count +# - wait for expected number of stream shards to be created +# - build parent map (shard -> parent) and children map (shard -> list of children) between stream shards +# - call the callback with the data +def run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, callback): + with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table: + (arn, label) = wait_for_active_stream(dynamodbstreams, table) + + ks = f'alternator_{table.name}' + table_name = table.name + cdc_log_table_name = f'{table_name}_scylla_cdc_log' + # Record initial tablet count, which determines the expected + # number of root stream shards (shards without a parent). + init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + + # Drive tablet count changes according to tablet_multipliers. + # Each change triggers a new "generation" of stream shards to be created. + # tablet multiplier might be less than 1 + for tablet_mult in tablet_multipliers: + tablet_count = int(init_table_count * tablet_mult) + assert tablet_count >= 1 + set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, tablet_count) + + # Strip multipliers that don't change the tablet count from the + # previous state. The table starts with a multiplier of 1 (i.e. + # init_table_count tablets), so a leading 1 is a no-op. Subsequent + # duplicates are also no-ops since ALTER TABLE with the current + # tablet count doesn't trigger a new CDC stream generation. + effective_multipliers = [] + for m in tablet_multipliers: + if effective_multipliers: + if m != effective_multipliers[-1]: + effective_multipliers.append(m) + elif m != 1: + effective_multipliers.append(m) + + # The total number of stream shards across all generations: + # 1) The root generation created when the table is first created. + total_shard_count = init_table_count + # 2) One generation per effective multiplier. + total_shard_count += int(sum(effective_multipliers) * init_table_count) + + # Poll DescribeStream until all expected stream shards have appeared, + # building a shard_id -> parent_shard_id map (shard_parents_map) and + # collecting root shard IDs (shards with no parent). + end_ts = time.time() + 30 + root_shard_ids = [] + shard_parents_map = {} + while time.time() < end_ts: + root_shard_ids = [] + shard_parents_map = {} + + for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts): + shard_id = shard['ShardId'] + parent_shard_id = shard.get('ParentShardId', None) + assert shard_id not in shard_parents_map + if parent_shard_id is None: + root_shard_ids.append(shard_id) + shard_parents_map[shard_id] = parent_shard_id + + if len(shard_parents_map) >= total_shard_count: + break + + time.sleep(0.1) + + # For each shard, use DescribeStream's CHILD_SHARDS filter to + # build a shard_id -> [child_shard_ids] map (shard_children_map). + + shard_children_map = {} + for shard_id in shard_parents_map: + filter_children = [] + for child_shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=shard_id): + child_shard_id = child_shard['ShardId'] + filter_children.append(child_shard_id) + shard_children_map[shard_id] = filter_children + + callback(root_shard_ids, shard_parents_map, shard_children_map) + +# run a test, where we create two cdc log generations (parent and children) +# and children count is half of parents (thus parents are merged into children) +# validate merge assumptions: +# - every two parents has the same child +# - only half of parents is marked as parent by any child (because child can provide only single parent and in case of merge it has two parents) +# - various sizes +# NOTE: this test assumes starting tablet count is bigger than one +# NOTE: this test fails if `system:initial_tablets` is set to anything but 0 - maybe because 0 means start with some default +# while non-zero means start with this value, but never go any lower? +def test_parent_children_merge(dynamodb, dynamodbstreams, rest_api, cql): + def verify_parent_children_relationship_merge(root_shard_ids, shard_parents_map, shard_children_map): + # shard_parents_map will contain both generations, so we strip original one + shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None } + + # shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones + shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children } + + # shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is half of parents + assert len(shard_parents_map) * 2 == len(root_shard_ids) + + # shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents + # which must be equal to root_shard_ids + assert len(root_shard_ids) == len(shard_children_map) + + # all parents must be from first (original) generation (`root_shard_ids`) + assert sorted(shard_children_map.keys()) == sorted(root_shard_ids) + + # only half of parents will show up as parents in shard_parents_map, but all of them must be from root_shard_ids + assert len(shard_parents_map.values()) * 2 == len(root_shard_ids) + assert set(root_shard_ids).issuperset(set(shard_parents_map.values())) + + # every parent has exactly one child - we're merging + assert all(len(children) == 1 for children in shard_children_map.values()) + + # every child must occur exactly twice in children lists - we're merging (2 -> 1) so + # for every two parents there will be a one child, thus two parents have the same child as children + existence_count = collections.defaultdict(int) + for children in shard_children_map.values(): + for child in children: + existence_count[child] += 1 + assert all(count == 2 for count in existence_count.values()) + + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [0.5], verify_parent_children_relationship_merge) + +# run a test, where we create two cdc log generations (parent and children) +# and children count is double of parents (thus parents are splited into children) +# validate split assumptions: +# - every parents has the two distinct children +# - every two children has the same parent +# - various sizes +def test_parent_children_split(dynamodb, dynamodbstreams, rest_api, cql): + def verify_parent_children_relationship_split(root_shard_ids, shard_parents_map, shard_children_map): + # shard_parents_map will contain both generations, so we strip original one + shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None } + + # shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones + shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children } + + # shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is double of parents + assert len(shard_parents_map) == len(root_shard_ids) * 2 + + # shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents + # which must be equal to root_shard_ids + assert len(root_shard_ids) == len(shard_children_map) + + # all parents must be from first (original) generation (`root_shard_ids`) + assert sorted(shard_children_map.keys()) == sorted(root_shard_ids) + + # every parent must show up shard_parents_map twice + # set of parents must equal `root_shard_ids` + assert len(shard_parents_map.values()) == len(root_shard_ids) * 2 + assert set(root_shard_ids) == set(shard_parents_map.values()) + existence_count = collections.defaultdict(int) + for child in shard_parents_map.values(): + existence_count[child] += 1 + assert all(count == 2 for count in existence_count.values()) + + # every parent has exactly two children - we're splitting + assert all(len(children) == 2 for children in shard_children_map.values()) + + # every child must occur exactly once in children lists - we're splitting (2 -> 1) so + # for every parent there will be two distinct children + all_children = [] + for children in shard_children_map.values(): + for child in children: + all_children.append(child) + assert len(set(all_children)) == len(all_children) + + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [2], verify_parent_children_relationship_split) + +# the test will: +# - create a table with streams enabled +# - get initial tablet count +# - for each multiplier in tablet_multipliers: +# - modify tablet count to initial * multiplier - this will trigger +# creation of new stream shards in the stream as currently we have 1 to 1 +# mapping between tablets and stream shards +# - after all modifications, use DescribeStream to get all stream shards +# - build a map of stream shard -> parent stream shard (shard_parents_map) and +# a map of stream shard -> child stream shards (shard_children_map) +# - verify that: +# - number of root stream shards (stream shards without parent) is correct - +# should be equal to initial tablet count +# - starting from root stream shards and following children, each path +# down the tree has length equal to len(tablet_multipliers) - +# this is because every change in tablet count causes new generation +# of stream shards to be created. Each stream shard from previous generation will point +# to at least one stream shard in the next generation and never to some other generation. +def test_parent_filtering(dynamodb, dynamodbstreams, rest_api, cql): + tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1] + + def verify_parent_children_relationship(root_shard_ids, shard_parents_map, shard_children_map): + # Verify the split/merge invariant: in a split all children point + # back to the same parent; in a merge the child may point to only one + # of its two parents, but both parents will have the child in their CHILD_SHARDS result. + # + # First, build declared_children from the ParentShardId field recorded + # in shard_parents_map, then compare against the CHILD_SHARDS filter + # results to confirm consistency. + declared_children = {} # parent_shard_id -> [child_shard_ids derived from ParentShardId] + for shard_id, parent_shard_id in shard_parents_map.items(): + if parent_shard_id is not None: + declared_children.setdefault(parent_shard_id, []).append(shard_id) + + all_shards = set() + in_children = set() + for shard_id in shard_parents_map: + all_shards.add(shard_id) + filter_children = shard_children_map[shard_id] + for child_shard_id in filter_children: + in_children.add(child_shard_id) + # Verify that every child declared via ParentShardId is also returned + # by the CHILD_SHARDS filter for this shard. + declared = set(declared_children.get(shard_id, [])) + assert declared.issubset(set(filter_children)), \ + f"Declared children {declared} not subset of filter children {set(filter_children)} for shard {shard_id}" + # Between generations (currently) we can have either splits by half or merges two into one. + # In case of split - children count will be > 1 and all children will point exactly to parent + # (so declared == filter_children). + # In case of merge - the CHILD_SHARDS filter returns the merged child for both parents, + # but the child's ParentShardId can only point to one of them. So the non-designated + # parent will have filter_children = [child] but declared_children = []. + # This assert checks that either all filter children were declared (split case) or + # we have at most one child (merge case where this parent isn't the designated one). + assert set(filter_children) == declared or len(filter_children) <= 1, \ + f"Unexpected children mismatch for shard {shard_id}: filter={filter_children}, declared={list(declared)}" + + # Verify split/merge invariants across generation boundaries. + # Group shards by generation (extracted from the hex timestamp in the shard ID). + def get_gen(t): + return t[2:13] + gen_shards = {} + for shard_id in shard_parents_map: + gen = get_gen(shard_id) + gen_shards.setdefault(gen, []).append(shard_id) + # Sort generations chronologically and verify parent/child ratios. + sorted_gens = sorted(gen_shards.keys()) + for i in range(1, len(sorted_gens)): + prev_gen = sorted_gens[i - 1] + curr_gen = sorted_gens[i] + parent_count = len(gen_shards[prev_gen]) + child_count = len(gen_shards[curr_gen]) + if child_count > parent_count: + # Split: 2x children, every parent has 2 children pointing to it + assert child_count == 2 * parent_count, \ + f"Split ratio wrong: {parent_count} parents -> {child_count} children" + for p in gen_shards[prev_gen]: + kids = declared_children.get(p, []) + assert len(kids) == 2, f"Split parent {p} has {len(kids)} declared children, expected 2" + elif child_count < parent_count: + # Merge: 2x parents, every child has 2 parents (but only 1 declared via ParentShardId) + assert parent_count == 2 * child_count, \ + f"Merge ratio wrong: {parent_count} parents -> {child_count} children" + # Each child in the current generation should appear as a filter_child + # of exactly 2 parents from the previous generation. + for c in gen_shards[curr_gen]: + parents_of_c = [p for p in gen_shards[prev_gen] if c in shard_children_map.get(p, [])] + assert len(parents_of_c) == 2, \ + f"Merge child {c} has {len(parents_of_c)} parents, expected 2" + + # Verify that the set of shards with no parent exactly matches + # root_shard_ids collected earlier (i.e. no orphaned shards). + shards_without_parents = all_shards - in_children + assert shards_without_parents == set(root_shard_ids) + + # Walk the shard tree recursively from each root and verify + # that every root-to-leaf path has length exactly len(tablet_multipliers), + # confirming one new generation per tablet count change. + def run_and_verify(shard_id, depth): + children = shard_children_map.get(shard_id, None) + if not children: + assert depth == len(tablet_multipliers) + else: + for ch in children: + run_and_verify(ch, depth + 1) + + for r in root_shard_ids: + run_and_verify(r, 1) + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, verify_parent_children_relationship) + +# this test will: +# - create a table with streams enabled +# - get initial tablet count +# - for each multiplier in tablet_multipliers: +# - modify tablet count to initial * multiplier - this will trigger +# creation of new stream shards in the stream as currently we have 1 to 1 +# mapping between tablets and stream shards +# - perform writes_per_tablet_multiplier writes to the table +# - after all modifications, use DescribeStream to build shard_parents_map (parent -> child) stream shards map +# - use GetRecords to read all stream records +# - verify that: +# - all written items are present in the stream (check count and then sorted content) +# - within each partition key, the records are in order of writes (check that `e` is monotonically increasing for each record) +# - the stream shard parent-child relationships are correct - stream shards create "generations" (all stream shards are split or merged at the same moment, +# when tablet count is changed). Every stream shard except first ones has a parent, but not every stream shard is being point to as a parent - when stream shard merge +# (let's say A & B merge into C), then next generation stream shard (C) has two parents (A & B), but the DynamoDB API allows to appoint only one - let's say A +# (the other one - B - will never be pointed to as a parent by anything else). +# NOTE: it's not the same as having no children - the stream shard (B) will have a child (C), but that child (C) will have it's sibling as parent (A). +# Then we try to walk the tree starting from every stream shard that is not pointed to as a parent. Depending on which generation that stream shard is in, +# the path will have different length. +def test_get_records_with_alternating_tablets_count(dynamodb, dynamodbstreams, rest_api, cql): + tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1] + writes_per_tablet_multiplier = 100 + partition_count = 32 + + with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table: + (arn, label) = wait_for_active_stream(dynamodbstreams, table) + + ks = f'alternator_{table.name}' + table_name = table.name + cdc_log_table_name = f'{table_name}_scylla_cdc_log' + init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + + # --- Phase 1: Drive tablet count changes and write data --- + # For each multiplier, alter tablet count and write writes_per_tablet_multiplier + # items to the table. Each write uses a small, bounded set of partition keys + # (0..partition_count-1) to force per-partition ordering collisions, which + # lets us later verify that events within a partition appear in write order. + # `e` is a monotonically increasing counter that encodes the write order. + index = 0 + expected_items = [] + retrieved_items = [] + for tablet_mult in tablet_multipliers: + set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, init_table_count * tablet_mult) + for _ in range(0, writes_per_tablet_multiplier): + p = str(random.randint(0, partition_count - 1)) + index += 1 + # we want to partition keys by small set of partitions to force key collisions + # to detect any ordering issues within a partition + c = '1' + e = str(index) + table.put_item(Item={'p': p, 'c': c, 'e': e}) + expected_items.append((p, c, e)) + + # --- Phase 2: Read all stream records via GetRecords, building shard topology --- + # Iterate DescribeStream in a retry loop until all expected items have been + # retrieved from the stream. For each shard discovered: + # - Record its parent in shard_parents_map (child -> parent). + # - Track root shards (those without a parent). + # - Create a GetShardIterator starting at the shard's StartingSequenceNumber + # and later call GetRecords to drain it, collecting all the formerly put (p, c, e) triples. + iterators = {} + shard_parents_map = {} + root_shard_ids = [] + end_ts = time.time() + 30 + while len(retrieved_items) < len(expected_items): + for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts): + shard_id = shard['ShardId'] + parent_shard_id = shard.get('ParentShardId', None) + if parent_shard_id is None: + if shard_id not in root_shard_ids: + root_shard_ids.append(shard_id) + elif shard_id in shard_parents_map: + assert shard_parents_map[shard_id] == parent_shard_id + else: + shard_parents_map[shard_id] = parent_shard_id + if shard_id in iterators: + continue + start = shard['SequenceNumberRange']['StartingSequenceNumber'] + iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator'] + assert iter is not None + iterators[shard_id] = iter + + + for shard_id, iter in iterators.items(): + response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000) + if 'NextShardIterator' in response: + iterators[shard_id] = response['NextShardIterator'] + + records = response.get('Records', []) + for record in records: + dynamodb = record['dynamodb'] + keys = dynamodb['NewImage'] + + assert set(keys) == set(['p', 'c', 'e']) + p = keys['p'].get('S') + c = keys['c'].get('S') + e = keys['e'].get('S') + retrieved_items.append((p, c, e)) + + # --- Phase 3: Verify completeness and per-partition ordering of stream records --- + # Check that the set of records retrieved from the stream exactly matches the + # set of items written (same count, same content when sorted). + # Then verify that for every partition key `p`, the events appear in the same + # order they were written: the `e` value (write index) must be strictly + # increasing across successive reads for the same `p`. + assert len(retrieved_items) == len(expected_items) + assert sorted(retrieved_items) == sorted(expected_items) + previous_values = {} + # we iterate over retrieved items here in an order of how they were read from the stream + # implementation might (and will) reorder items for data with different partition keys as it pleases, + # but for the same partition key the order of events must be preserved. + # `previous_values` keeps track of last `e` value seen for each partition key `p`. When written for the same `p` + # next value will always have `e` greater than previous one. + for p, c, e in retrieved_items: + e = int(e) + pv = previous_values.get(p, -1) + assert pv < e + previous_values[p] = e + + # --- Phase 4: Verify shard topology - root shard count --- + # The number of root shards (those without a parent) must equal the initial + # tablet count. + assert len(root_shard_ids) == init_table_count + + # --- Phase 5: Verify shard topology - generational structure and path lengths --- + # Build `streams_superseded_by_next_gen`: the set of shard IDs that appear as a parent of some + # other shard. Shards NOT in this set are "leaves" (the most recent generation). + # + # Group all shards (except root ones) by their generation, inferred from the + # timestamp embedded in the shard ID (characters [2:13]). + # + # For each leaf shard, walk the parent chain up to the root (get_path), then + # assert that the number of siblings in the leaf's generation equals + # `init_table_count * tablet_multipliers[len(path) - 1]`. + # This verifies the expected doubling/halving pattern produced by the + # alternating tablet_multipliers sequence. + streams_superseded_by_next_gen = set() + for (shard_id, parent_shard_id) in shard_parents_map.items(): + streams_superseded_by_next_gen.add(parent_shard_id) + + def get_generation_from_shard(t): + # Shard IDs have the format "H:", e.g. + # "H 19b22b8563a:7fffffffffffffffe8547ce46400000" + # Characters [2:13] extract the hex generation timestamp, which groups + # shards that were created together during the same tablet count change. + return t[2:13] + count_map = {} + for r in shard_parents_map: + gen = get_generation_from_shard(r) + count = count_map.get(gen, 0) + count_map[gen] = count + 1 + # Returns the ancestor chain from shard_id up to the root (a shard with no parent). + # The returned list is ordered [shard_id, parent, grandparent, ..., root]. + def get_path(shard_id): + path = [ shard_id ] + current_shard_id = shard_id + while True: + parent_shard_id = shard_parents_map.get(current_shard_id, None) + if parent_shard_id is None: + return path + path.append(parent_shard_id) + current_shard_id = parent_shard_id + for r in shard_parents_map: + if r not in streams_superseded_by_next_gen: + path = get_path(r) + siblings_count = count_map[get_generation_from_shard(r)] + assert siblings_count == init_table_count * tablet_multipliers[len(path) - 1]