diff --git a/alternator/controller.cc b/alternator/controller.cc index e55c3d1d6d..40c8388f78 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -32,6 +32,7 @@ controller::controller( sharded& ss, sharded& mm, sharded& sys_dist_ks, + sharded& sys_ks, sharded& cdc_gen_svc, sharded& memory_limiter, sharded& auth_service, @@ -45,6 +46,7 @@ controller::controller( , _ss(ss) , _mm(mm) , _sys_dist_ks(sys_dist_ks) + , _sys_ks(sys_ks) , _cdc_gen_svc(cdc_gen_svc) , _memory_limiter(memory_limiter) , _auth_service(auth_service) @@ -94,7 +96,7 @@ future<> controller::start_server() { auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value { return cfg.alternator_timeout_in_ms; }; - _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), + _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), std::ref(_sys_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), std::ref(_vsc), _ssg.value(), sharded_parameter(get_timeout_in_ms, std::ref(_config))).get(); _server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get(); diff --git a/alternator/controller.hh b/alternator/controller.hh index d7a73b8f69..cd8bdf7f55 100644 --- a/alternator/controller.hh +++ b/alternator/controller.hh @@ -22,6 +22,7 @@ class memory_limiter; namespace db { class system_distributed_keyspace; +class system_keyspace; class config; } @@ -65,6 +66,7 @@ class controller : public protocol_server { sharded& _ss; sharded& _mm; sharded& _sys_dist_ks; + sharded& _sys_ks; sharded& _cdc_gen_svc; sharded& _memory_limiter; sharded& _auth_service; @@ -84,6 +86,7 @@ public: sharded& ss, sharded& mm, sharded& sys_dist_ks, + sharded& sys_ks, sharded& cdc_gen_svc, sharded& memory_limiter, sharded& auth_service, diff --git a/alternator/executor.cc b/alternator/executor.cc index fa7e58c5ea..2db1c3700d 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -271,6 +271,7 @@ executor::executor(gms::gossiper& gossiper, service::storage_service& ss, service::migration_manager& mm, db::system_distributed_keyspace& sdks, + db::system_keyspace& system_keyspace, cdc::metadata& cdc_metadata, vector_search::vector_store_client& vsc, smp_service_group ssg, @@ -280,6 +281,7 @@ executor::executor(gms::gossiper& gossiper, _proxy(proxy), _mm(mm), _sdks(sdks), + _system_keyspace(system_keyspace), _cdc_metadata(cdc_metadata), _vsc(vsc), _enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization), @@ -1645,16 +1647,7 @@ future executor::create_table_on_shard0(service:: locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option()); const auto& topo = _proxy.local_db().get_token_metadata().get_topology(); auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo); - // Alternator Streams doesn't yet work when the table uses tablets (#23838) - if (stream_specification && stream_specification->IsObject()) { - auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); - if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) { - if (rs->uses_tablets()) { - co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). " - "If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'."); - } - } - } + // Vector indexes is a new feature that we decided to only support // on tablets. if (vector_indexes && vector_indexes->Size() > 0) { @@ -1846,14 +1839,9 @@ future executor::update_table(client_state& clien if (add_stream_options(*stream_specification, builder, p.local())) { validate_cdc_log_name_length(builder.cf_name()); } - // Alternator Streams doesn't yet work when the table uses tablets (#23838) auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); if (stream_enabled && stream_enabled->IsBool()) { if (stream_enabled->GetBool()) { - if (p.local().local_db().find_keyspace(tab->ks_name()).get_replication_strategy().uses_tablets()) { - co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). " - "If you want to enable streams, re-create this table with vnodes (with the tag 'system:initial_tablets' set to 'none')."); - } if (tab->cdc_options().enabled()) { co_return api_error::validation("Table already has an enabled stream: TableName: " + tab->cf_name()); } diff --git a/alternator/executor.hh b/alternator/executor.hh index 3de07fd7b0..1f38a5f734 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -34,6 +34,7 @@ namespace db { class system_distributed_keyspace; + class system_keyspace; } namespace audit { @@ -88,6 +89,7 @@ class executor : public peering_sharded_service { service::storage_proxy& _proxy; service::migration_manager& _mm; db::system_distributed_keyspace& _sdks; + db::system_keyspace& _system_keyspace; cdc::metadata& _cdc_metadata; vector_search::vector_store_client& _vsc; utils::updateable_value _enforce_authorization; @@ -131,6 +133,7 @@ public: service::storage_service& ss, service::migration_manager& mm, db::system_distributed_keyspace& sdks, + db::system_keyspace& system_keyspace, cdc::metadata& cdc_metadata, vector_search::vector_store_client& vsc, smp_service_group ssg, diff --git a/alternator/streams.cc b/alternator/streams.cc index c04c8ca6ce..bb150856a1 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -899,10 +899,22 @@ future executor::describe_stream(client_state& cl // TODO: label // TODO: creation time - // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) - auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + std::map topologies; + + // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) + if (schema->table().uses_tablets()) { + // We can't use table creation time here, as tablets might report a + // generation timestamp just before table creation. This is safe + // because CDC generations are per-table and cannot pre-date the + // table, so expanding the window won't pull in unrelated data. + auto low_ts = db_clock::now() - ttl; + topologies = co_await _system_keyspace.read_cdc_for_tablets_versioned_streams(bs->ks_name(), bs->cf_name(), low_ts); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); + } - std::map topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); const auto e = topologies.end(); std::optional shard_filter; @@ -1488,9 +1500,15 @@ future executor::get_records(client_state& client } // ugh. figure out if we are and end-of-shard - auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); - db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + db_clock::time_point ts; + if (schema->table().uses_tablets()) { + ts = co_await _system_keyspace.read_cdc_for_tablets_current_generation_timestamp(base->ks_name(), base->cf_name()); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + } + auto& shard = iter.shard; if (shard.time < ts && ts < high_ts) { diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 1e3a3ab805..9b94985ca9 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -93,8 +93,16 @@ public: future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context); future cdc_desc_exists(db_clock::time_point, context); + // Reads and builds generation map - a map from generation timestamps to vector of all stream ids for that generation. + // Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation). + // Returns empty map if there are no generations with timestamp >= `not_older_than`. + // NOTE: there's a sibling `read_cdc_for_tablets_versioned_streams`, that reads the same data for tables backed by tablets. The data returned is the same. + // NOTE: currently used only by alternator future> cdc_get_versioned_streams(db_clock::time_point not_older_than, context); + // Read current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found. + // NOTE: there's a sibling `read_cdc_for_tablets_current_generation_timestamp` in `system_keyspace`, that does the same for tables backed up by tablets. + // NOTE: currently used only by alternator future cdc_current_generation_timestamp(context); future get_service_levels(qos::query_context ctx) const; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 66497a645b..5f9544b66e 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -2057,6 +2058,67 @@ future> system_keyspace::get_local_tokens() { }); } +// Tablet-based counterpart of system_distributed_keyspace::cdc_current_generation_timestamp. +// Unlike the vnode version which reads from a replicated system_distributed table (and thus +// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet +// metadata, so consistency_level::ONE is the only meaningful level. +future system_keyspace::read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name) { + // note: we only care about the newest ("current") timestamp, hence limit 1. + // this depends on table's native clustering order being DESC on timestamp, which is the case for CDC_TIMESTAMPS. + static const sstring query = format("SELECT timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ? limit 1", NAME, CDC_TIMESTAMPS); + auto timestamp_cql = co_await _qp.execute_internal( + query, + db::consistency_level::ONE, + { ks_name, table_name }, + cql3::query_processor::cache_internal::no); + + co_return timestamp_cql->one().get_as("timestamp"); +} + +// Tablet-based counterpart of system_distributed_keyspace::cdc_get_versioned_streams. +// Unlike the vnode version which reads from a replicated system_distributed table (and thus +// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet +// metadata, so consistency_level::ONE is the only meaningful level. +future> system_keyspace::read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than) { + // We read all streams and filter in memory, because we need to include + // the generation that straddles the `not_older_than` boundary (i.e. the + // one just before it), and the virtual table does not support the + // required range query directly. + static const sstring stream_id_query = format("SELECT stream_id, stream_state, timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ?", NAME, CDC_STREAMS); + + std::map> temp_result; + + co_await _qp.query_internal(stream_id_query, + db::consistency_level::ONE, + data_value_list{ ks_name, table_name }, + 1000, // page size + [&] (const cql3::untyped_result_set_row& row) -> future { + auto stream_state = cdc::read_stream_state(row.get_as("stream_state")); + if (stream_state != cdc::stream_state::current) { + co_return stop_iteration::no; + } + auto ts = row.get_as("timestamp"); + + temp_result[ts].push_back(cdc::stream_id{ row.get_as("stream_id") }); + co_await coroutine::maybe_yield(); + co_return stop_iteration::no; + }); + + std::map result; + // Include the generation that straddles the boundary, matching the vnode + // counterpart (cdc_get_versioned_streams) which does the same adjustment. + auto it = temp_result.lower_bound(not_older_than); + if (it != temp_result.begin()) { + --it; + } + for (; it != temp_result.end(); ++it) { + auto& ts = it->first; + auto& streams = it->second; + result.insert_or_assign(ts, cdc::streams_version{ std::move(streams), ts }); + } + co_return std::move(result); +} + future<> system_keyspace::read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) { static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index dd0de6c296..eb9a631d83 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -456,7 +456,7 @@ public: // For this purpose, records of cleanup operations (the affected token ranges // and commitlog ranges) are kept in a system table. // - // The below functions manipulate these records. + // The below functions manipulate these records. // Saves a record of a token range affected by cleanup. // After reboot, tokens from this range will be replayed only if they are on replay positions @@ -587,6 +587,19 @@ public: future<> read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f); future<> read_cdc_streams_history(table_id table, std::optional from, noncopyable_function(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f); + // Reads current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found. + // NOTE: there's a sibling `cdc_current_generation_timestamp` in `system_distributed_keyspace`, that does the same for tables backed up by vnodes. + // NOTE: currently used only by alternator + future read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name); + + // Reads and builds generation map for a given table - a map from generation timestamps to vector of all stream ids for that generation. + // Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation). + // If `not_older_than` is not provided (defaults to min), all generations will be returned. + // Returns empty map if table is not found or if there are no generations. + // NOTE: there's a sibling `cdc_get_versioned_streams`, that reads the same data for tables backed by legacy vnodes. The data returned is the same. + // NOTE: currently used only by alternator + future> read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than = db_clock::time_point::min()); + // Load Raft Group 0 id from scylla.local future get_raft_group0_id(); diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 5b2198f3e0..311245e0c0 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -296,14 +296,6 @@ experimental: considered experimental so needs to be enabled explicitly with the `--experimental-features=alternator-streams` configuration option. - In this version, Alternator Streams is only supported if the base table - uses vnodes instead of tablets. However, by default new tables use tablets - so to create a table that can be used with Streams, you must set the tag - `system:initial_tablets` set to `none` during CreateTable - so that the - new table will use vnodes. Streams cannot be enabled on an already-existing - table that uses tablets. - See . - Alternator streams also differ in some respects from DynamoDB Streams: * The number of separate "shards" in Alternator's streams is significantly larger than is typical on DynamoDB. diff --git a/docs/alternator/new-apis.md b/docs/alternator/new-apis.md index d61d5657f9..d37e26116e 100644 --- a/docs/alternator/new-apis.md +++ b/docs/alternator/new-apis.md @@ -184,7 +184,7 @@ entire data center, or other data centers, in that case. It replaces the older approach which was named "vnodes". See [Data Distribution with Tablets](../architecture/tablets.rst) for details. -In this version, tablet support is almost complete, so new +In this version, tablet support is complete, so new Alternator tables default to following what the global configuration flag [tablets_mode_for_new_keyspaces](../reference/configuration-parameters.rst#confval-tablets_mode_for_new_keyspaces) tells them to. @@ -207,9 +207,3 @@ in the CreateTable operation. The value of this tag can be: The `system:initial_tablets` tag only has any effect while creating a new table with CreateTable - changing it later has no effect. -Because the tablets support is incomplete, when tablets are enabled for an -Alternator table, the following features will not work for this table: - -* Enabling Streams with CreateTable or UpdateTable doesn't work - (results in an error). - See . diff --git a/main.cc b/main.cc index 9e5bb50c7b..6455be59ad 100644 --- a/main.cc +++ b/main.cc @@ -2609,7 +2609,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::set_server_service_levels(ctx, cql_server_ctl, qp).get(); - alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group); + alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, sys_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group); // Register at_exit last, so that storage_service::drain_on_shutdown will be called first auto do_drain = defer_verbose_shutdown("local storage", [&ss] { diff --git a/test/alternator/run b/test/alternator/run index 8c96fd0b82..7edb451026 100755 --- a/test/alternator/run +++ b/test/alternator/run @@ -75,6 +75,11 @@ def run_alternator_cmd(pid, dir): # We only list here Alternator-specific experimental features - CQL # ones are listed in test/cqlpy/run.py. '--experimental-features=alternator-streams', + # this is required by test_streams.py test_parent_filtering and test_get_records_with_alternating_tablets_count + # setting the value using scylla_config_temporary won't work, because the value is read + # at the start and then periodically with `tablet-load-stats-refresh-interval-in-seconds` + # interval, which by default is 60 seconds. + '--tablet-load-stats-refresh-interval-in-seconds=1', ] if '--https' in sys.argv: run.setup_ssl_certificate(dir) diff --git a/test/alternator/test_config.yaml b/test/alternator/test_config.yaml index 72c077f94a..73b44f8283 100644 --- a/test/alternator/test_config.yaml +++ b/test/alternator/test_config.yaml @@ -9,6 +9,8 @@ extra_scylla_cmdline_options: - '--logger-log-level=alternator_controller=trace' - '--logger-log-level=alternator_ttl=trace' - '--logger-log-level=paxos=trace' + - '--tablet-load-stats-refresh-interval-in-seconds=1' + - '--tablets-initial-scale-factor=1' extra_scylla_config_options: { experimental_features: [ diff --git a/test/alternator/test_cql_schema.py b/test/alternator/test_cql_schema.py index 6b10ce2405..998360029f 100644 --- a/test/alternator/test_cql_schema.py +++ b/test/alternator/test_cql_schema.py @@ -112,12 +112,6 @@ def test_alternator_vs_cql(dynamodb, test_table, cql_keyspace, cql_table, option @pytest.fixture(scope='module') def table1(dynamodb): with new_test_table(dynamodb, - # Alternator Streams is expected to fail with tablets due to #23838. - # To ensure that this test still runs, instead of xfailing it, we - # temporarily coerce Alternator to avoid using default tablets - # setting, even if it's available. Remove this "Tags=" line when - # issue #23838 is solved. - Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}], KeySchema=[ # Must have both hash key and range key to allow LSI creation { 'AttributeName': 'p', 'KeyType': 'HASH' }, diff --git a/test/alternator/test_metrics.py b/test/alternator/test_metrics.py index 2ab1e5fb6a..ef3ef17b7a 100644 --- a/test/alternator/test_metrics.py +++ b/test/alternator/test_metrics.py @@ -542,12 +542,6 @@ def test_streams_latency(dynamodb, dynamodbstreams, metrics): # latency metrics are only updated for *successful* operations so we # need to use a real Alternator Stream in this test. with new_test_table(dynamodb, - # Alternator Streams is expected to fail with tablets due to #23838. - # To ensure that this test still runs, instead of xfailing it, we - # temporarily coerce Alternator to avoid using default tablets - # setting, even if it's available. We do this by using the following - # tags when creating the table: - Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}], KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }], AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }], StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'} diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 8cb27b1aeb..60522c2752 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -16,12 +16,29 @@ from botocore.exceptions import ClientError from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request -# All tests in this file are expected to fail with tablets due to #23838. -# To ensure that Alternator Streams is still being tested, instead of -# xfailing these tests, we temporarily coerce the tests below to avoid -# using default tablets setting, even if it's available. We do this by -# using the following tags when creating each table below: -TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}] +TAGS = [] +# The following fixture is to ensure that tests in this module will be tested with both vnodes and tablets. +# This fixture runs automatically for every test in this module. +# To avoid relying on semantics of a similar fixture used in TTL tests, we define it locally here instead of reusing +# that fixture, and we do not import or reuse fixtures across modules. +# It sets the TAGS variable in the module’s global namespace to the current parameter value before each test. +# All tests will be run with both values. On AWS the parameterization is meaningless, so we skip the second variant. +@pytest.fixture(params=[ + [{'Key': 'system:initial_tablets', 'Value': 'none'}], + [{'Key': 'system:initial_tablets', 'Value': '0'}], +], ids=["using vnodes", "using tablets"], autouse=True) +def tags_param(request, dynamodb): + if is_aws(dynamodb) and request.param[0].get('Value') != 'none': + pytest.skip('vnodes/tablets parameterization not applicable on AWS') + # Set TAGS in the global namespace of this module + global TAGS + TAGS = request.param + +def running_using_vnodes(): + for tag in TAGS: + if tag['Key'] == 'system:initial_tablets': + v = tag['Value'] + return not v or not v.isdigit() stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES'] @@ -54,12 +71,14 @@ def disable_stream(dynamodbstreams, table): # So we have to create and delete a table per test. And not run this # test to often against aws. @contextmanager -def create_stream_test_table(dynamodb, StreamViewType=None): +def create_stream_test_table(dynamodb, StreamViewType=None, Tags=None): + if Tags is None: + Tags = TAGS spec = { 'StreamEnabled': False } if StreamViewType != None: spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType} table = create_test_table(dynamodb, StreamSpecification=spec, - Tags=TAGS, + Tags=Tags, KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], @@ -415,7 +434,7 @@ def test_get_records(dynamodb, dynamodbstreams): if 'NextShardIterator' in response: next_iterators.append(response['NextShardIterator']) - records = response.get('Records') + records = response.get('Records', []) # print("Query {} -> {}".format(iter, records)) if records: for record in records: diff --git a/test/alternator/test_streams_tablets.py b/test/alternator/test_streams_tablets.py new file mode 100644 index 0000000000..88e6d54ca3 --- /dev/null +++ b/test/alternator/test_streams_tablets.py @@ -0,0 +1,543 @@ +# Copyright 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + +# Tests for stream operations' nuanses that are intristic to ScyllaDB (parent-children relationship on stream shards). + +import time, random, collections + +import pytest +from test.alternator.test_streams import create_stream_test_table, wait_for_active_stream + +TABLET_TAGS = [{'Key': 'system:initial_tablets', 'Value': '0'}] + +# get table_id from keyspace and table name +def get_table_or_view_id(cql, keyspace: str, table: str): + rows = cql.execute(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'") + try: + row = rows.one() + except Exception: + row = None + if row is not None: + return row.id + rows = cql.execute(f"select id from system_schema.views where keyspace_name = '{keyspace}' and view_name = '{table}'") + return rows.one().id + +# get user table id from cdc_log table id +def get_base_table(cql, table_id): + # copied from tablets.py:get_base_table + # this might return more than one row, but we don't care as all rows + # will have the same base_table value + rows = cql.execute(f"SELECT base_table FROM system.tablets where table_id = {table_id} limit 1") + return rows.one().base_table + +# validate that streams tables are synchronized with tablets count - all new cdc shards have been created +def assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name): + # we'll try twice here, as in my tests occasionally i've got into a situation, where this function + # failed, it seems streams were not yet fully updated (debug build). + for x in range(0, 2): + tablet_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + ts = cql.execute(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='{table_name}' ORDER BY timestamp DESC LIMIT 1").one().ts + CdcStreamState_CURRENT = 0 # from test.cluster.test_cdc_with_tablets.CdcStreamState.CURRENT + count = cql.execute(f"SELECT count(*) FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='{table_name}' AND timestamp = {ts} AND stream_state = {CdcStreamState_CURRENT} limit 1").one().count + if count == tablet_count: + break + # on debug occasionally we need more time + time.sleep(0.1) + assert count == tablet_count + +# return tablet count for given cdc_log table (table that holds cdc data for given user table) +def get_tablet_count_for_base_table_of_table(rest_api, cql, keyspace_name: str, table_name: str): + table_id = get_table_or_view_id(cql, keyspace_name, table_name) + table_id = get_base_table(cql, table_id) + # this might return more than one row, but we don't care as all rows + # will have the same tablet_count value + rows = cql.execute(f"SELECT tablet_count FROM system.tablets where table_id = {table_id} limit 1") + return rows.one().tablet_count + +# modify tablet count for given cdc_log table and wait until the change is applied +# this calls alter table with new `min_tablet_count`, which requires additional scylla options to work reliably +# (--tablet-load-stats-refresh-interval-in-seconds=1 and --tablets-initial-scale-factor=1) +def set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, expected_tablet_count): + assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name) + cql.execute(f"ALTER TABLE \"{ks}\".\"{cdc_log_table_name}\" WITH tablets = {{'min_tablet_count': {expected_tablet_count}}};") + start = time.time() + while time.time() < start + 10: + if get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) == expected_tablet_count: + break + time.sleep(0.1) + else: + pytest.fail(f'Tablet count did not reach expected value {expected_tablet_count} within timeout') + +def iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=None): + params = { + 'StreamArn': arn + } + if filter_shard_id is not None: + params['ShardFilter'] = { + 'Type': 'CHILD_SHARDS', + 'ShardId': filter_shard_id + } + desc = dynamodbstreams.describe_stream(**params) + + while True: + assert time.time() <= end_ts, "Time ran out" + shards = desc['StreamDescription']['Shards'] + + for shard in shards: + yield shard + assert time.time() <= end_ts, "Time ran out" + + last_shard = desc["StreamDescription"].get("LastEvaluatedShardId") + if not last_shard: + break + + desc = dynamodbstreams.describe_stream(ExclusiveStartShardId=last_shard, **params) + +# helper function for parent-child relationship test - it will: +# - drive changes in tablet count +# - wait for expected number of stream shards to be created +# - build parent map (shard -> parent) and children map (shard -> list of children) between stream shards +# - call the callback with the data +def run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, callback): + with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table: + (arn, label) = wait_for_active_stream(dynamodbstreams, table) + + ks = f'alternator_{table.name}' + table_name = table.name + cdc_log_table_name = f'{table_name}_scylla_cdc_log' + # Record initial tablet count, which determines the expected + # number of root stream shards (shards without a parent). + init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + + # Drive tablet count changes according to tablet_multipliers. + # Each change triggers a new "generation" of stream shards to be created. + # tablet multiplier might be less than 1 + for tablet_mult in tablet_multipliers: + tablet_count = int(init_table_count * tablet_mult) + assert tablet_count >= 1 + set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, tablet_count) + + # Strip multipliers that don't change the tablet count from the + # previous state. The table starts with a multiplier of 1 (i.e. + # init_table_count tablets), so a leading 1 is a no-op. Subsequent + # duplicates are also no-ops since ALTER TABLE with the current + # tablet count doesn't trigger a new CDC stream generation. + effective_multipliers = [] + for m in tablet_multipliers: + if effective_multipliers: + if m != effective_multipliers[-1]: + effective_multipliers.append(m) + elif m != 1: + effective_multipliers.append(m) + + # The total number of stream shards across all generations: + # 1) The root generation created when the table is first created. + total_shard_count = init_table_count + # 2) One generation per effective multiplier. + total_shard_count += int(sum(effective_multipliers) * init_table_count) + + # Poll DescribeStream until all expected stream shards have appeared, + # building a shard_id -> parent_shard_id map (shard_parents_map) and + # collecting root shard IDs (shards with no parent). + end_ts = time.time() + 30 + root_shard_ids = [] + shard_parents_map = {} + while time.time() < end_ts: + root_shard_ids = [] + shard_parents_map = {} + + for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts): + shard_id = shard['ShardId'] + parent_shard_id = shard.get('ParentShardId', None) + assert shard_id not in shard_parents_map + if parent_shard_id is None: + root_shard_ids.append(shard_id) + shard_parents_map[shard_id] = parent_shard_id + + if len(shard_parents_map) >= total_shard_count: + break + + time.sleep(0.1) + + # For each shard, use DescribeStream's CHILD_SHARDS filter to + # build a shard_id -> [child_shard_ids] map (shard_children_map). + + shard_children_map = {} + for shard_id in shard_parents_map: + filter_children = [] + for child_shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=shard_id): + child_shard_id = child_shard['ShardId'] + filter_children.append(child_shard_id) + shard_children_map[shard_id] = filter_children + + callback(root_shard_ids, shard_parents_map, shard_children_map) + +# run a test, where we create two cdc log generations (parent and children) +# and children count is half of parents (thus parents are merged into children) +# validate merge assumptions: +# - every two parents has the same child +# - only half of parents is marked as parent by any child (because child can provide only single parent and in case of merge it has two parents) +# - various sizes +# NOTE: this test assumes starting tablet count is bigger than one +# NOTE: this test fails if `system:initial_tablets` is set to anything but 0 - maybe because 0 means start with some default +# while non-zero means start with this value, but never go any lower? +def test_parent_children_merge(dynamodb, dynamodbstreams, rest_api, cql): + def verify_parent_children_relationship_merge(root_shard_ids, shard_parents_map, shard_children_map): + # shard_parents_map will contain both generations, so we strip original one + shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None } + + # shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones + shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children } + + # shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is half of parents + assert len(shard_parents_map) * 2 == len(root_shard_ids) + + # shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents + # which must be equal to root_shard_ids + assert len(root_shard_ids) == len(shard_children_map) + + # all parents must be from first (original) generation (`root_shard_ids`) + assert sorted(shard_children_map.keys()) == sorted(root_shard_ids) + + # only half of parents will show up as parents in shard_parents_map, but all of them must be from root_shard_ids + assert len(shard_parents_map.values()) * 2 == len(root_shard_ids) + assert set(root_shard_ids).issuperset(set(shard_parents_map.values())) + + # every parent has exactly one child - we're merging + assert all(len(children) == 1 for children in shard_children_map.values()) + + # every child must occur exactly twice in children lists - we're merging (2 -> 1) so + # for every two parents there will be a one child, thus two parents have the same child as children + existence_count = collections.defaultdict(int) + for children in shard_children_map.values(): + for child in children: + existence_count[child] += 1 + assert all(count == 2 for count in existence_count.values()) + + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [0.5], verify_parent_children_relationship_merge) + +# run a test, where we create two cdc log generations (parent and children) +# and children count is double of parents (thus parents are splited into children) +# validate split assumptions: +# - every parents has the two distinct children +# - every two children has the same parent +# - various sizes +def test_parent_children_split(dynamodb, dynamodbstreams, rest_api, cql): + def verify_parent_children_relationship_split(root_shard_ids, shard_parents_map, shard_children_map): + # shard_parents_map will contain both generations, so we strip original one + shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None } + + # shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones + shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children } + + # shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is double of parents + assert len(shard_parents_map) == len(root_shard_ids) * 2 + + # shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents + # which must be equal to root_shard_ids + assert len(root_shard_ids) == len(shard_children_map) + + # all parents must be from first (original) generation (`root_shard_ids`) + assert sorted(shard_children_map.keys()) == sorted(root_shard_ids) + + # every parent must show up shard_parents_map twice + # set of parents must equal `root_shard_ids` + assert len(shard_parents_map.values()) == len(root_shard_ids) * 2 + assert set(root_shard_ids) == set(shard_parents_map.values()) + existence_count = collections.defaultdict(int) + for child in shard_parents_map.values(): + existence_count[child] += 1 + assert all(count == 2 for count in existence_count.values()) + + # every parent has exactly two children - we're splitting + assert all(len(children) == 2 for children in shard_children_map.values()) + + # every child must occur exactly once in children lists - we're splitting (2 -> 1) so + # for every parent there will be two distinct children + all_children = [] + for children in shard_children_map.values(): + for child in children: + all_children.append(child) + assert len(set(all_children)) == len(all_children) + + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [2], verify_parent_children_relationship_split) + +# the test will: +# - create a table with streams enabled +# - get initial tablet count +# - for each multiplier in tablet_multipliers: +# - modify tablet count to initial * multiplier - this will trigger +# creation of new stream shards in the stream as currently we have 1 to 1 +# mapping between tablets and stream shards +# - after all modifications, use DescribeStream to get all stream shards +# - build a map of stream shard -> parent stream shard (shard_parents_map) and +# a map of stream shard -> child stream shards (shard_children_map) +# - verify that: +# - number of root stream shards (stream shards without parent) is correct - +# should be equal to initial tablet count +# - starting from root stream shards and following children, each path +# down the tree has length equal to len(tablet_multipliers) - +# this is because every change in tablet count causes new generation +# of stream shards to be created. Each stream shard from previous generation will point +# to at least one stream shard in the next generation and never to some other generation. +def test_parent_filtering(dynamodb, dynamodbstreams, rest_api, cql): + tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1] + + def verify_parent_children_relationship(root_shard_ids, shard_parents_map, shard_children_map): + # Verify the split/merge invariant: in a split all children point + # back to the same parent; in a merge the child may point to only one + # of its two parents, but both parents will have the child in their CHILD_SHARDS result. + # + # First, build declared_children from the ParentShardId field recorded + # in shard_parents_map, then compare against the CHILD_SHARDS filter + # results to confirm consistency. + declared_children = {} # parent_shard_id -> [child_shard_ids derived from ParentShardId] + for shard_id, parent_shard_id in shard_parents_map.items(): + if parent_shard_id is not None: + declared_children.setdefault(parent_shard_id, []).append(shard_id) + + all_shards = set() + in_children = set() + for shard_id in shard_parents_map: + all_shards.add(shard_id) + filter_children = shard_children_map[shard_id] + for child_shard_id in filter_children: + in_children.add(child_shard_id) + # Verify that every child declared via ParentShardId is also returned + # by the CHILD_SHARDS filter for this shard. + declared = set(declared_children.get(shard_id, [])) + assert declared.issubset(set(filter_children)), \ + f"Declared children {declared} not subset of filter children {set(filter_children)} for shard {shard_id}" + # Between generations (currently) we can have either splits by half or merges two into one. + # In case of split - children count will be > 1 and all children will point exactly to parent + # (so declared == filter_children). + # In case of merge - the CHILD_SHARDS filter returns the merged child for both parents, + # but the child's ParentShardId can only point to one of them. So the non-designated + # parent will have filter_children = [child] but declared_children = []. + # This assert checks that either all filter children were declared (split case) or + # we have at most one child (merge case where this parent isn't the designated one). + assert set(filter_children) == declared or len(filter_children) <= 1, \ + f"Unexpected children mismatch for shard {shard_id}: filter={filter_children}, declared={list(declared)}" + + # Verify split/merge invariants across generation boundaries. + # Group shards by generation (extracted from the hex timestamp in the shard ID). + def get_gen(t): + return t[2:13] + gen_shards = {} + for shard_id in shard_parents_map: + gen = get_gen(shard_id) + gen_shards.setdefault(gen, []).append(shard_id) + # Sort generations chronologically and verify parent/child ratios. + sorted_gens = sorted(gen_shards.keys()) + for i in range(1, len(sorted_gens)): + prev_gen = sorted_gens[i - 1] + curr_gen = sorted_gens[i] + parent_count = len(gen_shards[prev_gen]) + child_count = len(gen_shards[curr_gen]) + if child_count > parent_count: + # Split: 2x children, every parent has 2 children pointing to it + assert child_count == 2 * parent_count, \ + f"Split ratio wrong: {parent_count} parents -> {child_count} children" + for p in gen_shards[prev_gen]: + kids = declared_children.get(p, []) + assert len(kids) == 2, f"Split parent {p} has {len(kids)} declared children, expected 2" + elif child_count < parent_count: + # Merge: 2x parents, every child has 2 parents (but only 1 declared via ParentShardId) + assert parent_count == 2 * child_count, \ + f"Merge ratio wrong: {parent_count} parents -> {child_count} children" + # Each child in the current generation should appear as a filter_child + # of exactly 2 parents from the previous generation. + for c in gen_shards[curr_gen]: + parents_of_c = [p for p in gen_shards[prev_gen] if c in shard_children_map.get(p, [])] + assert len(parents_of_c) == 2, \ + f"Merge child {c} has {len(parents_of_c)} parents, expected 2" + + # Verify that the set of shards with no parent exactly matches + # root_shard_ids collected earlier (i.e. no orphaned shards). + shards_without_parents = all_shards - in_children + assert shards_without_parents == set(root_shard_ids) + + # Walk the shard tree recursively from each root and verify + # that every root-to-leaf path has length exactly len(tablet_multipliers), + # confirming one new generation per tablet count change. + def run_and_verify(shard_id, depth): + children = shard_children_map.get(shard_id, None) + if not children: + assert depth == len(tablet_multipliers) + else: + for ch in children: + run_and_verify(ch, depth + 1) + + for r in root_shard_ids: + run_and_verify(r, 1) + run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, verify_parent_children_relationship) + +# this test will: +# - create a table with streams enabled +# - get initial tablet count +# - for each multiplier in tablet_multipliers: +# - modify tablet count to initial * multiplier - this will trigger +# creation of new stream shards in the stream as currently we have 1 to 1 +# mapping between tablets and stream shards +# - perform writes_per_tablet_multiplier writes to the table +# - after all modifications, use DescribeStream to build shard_parents_map (parent -> child) stream shards map +# - use GetRecords to read all stream records +# - verify that: +# - all written items are present in the stream (check count and then sorted content) +# - within each partition key, the records are in order of writes (check that `e` is monotonically increasing for each record) +# - the stream shard parent-child relationships are correct - stream shards create "generations" (all stream shards are split or merged at the same moment, +# when tablet count is changed). Every stream shard except first ones has a parent, but not every stream shard is being point to as a parent - when stream shard merge +# (let's say A & B merge into C), then next generation stream shard (C) has two parents (A & B), but the DynamoDB API allows to appoint only one - let's say A +# (the other one - B - will never be pointed to as a parent by anything else). +# NOTE: it's not the same as having no children - the stream shard (B) will have a child (C), but that child (C) will have it's sibling as parent (A). +# Then we try to walk the tree starting from every stream shard that is not pointed to as a parent. Depending on which generation that stream shard is in, +# the path will have different length. +def test_get_records_with_alternating_tablets_count(dynamodb, dynamodbstreams, rest_api, cql): + tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1] + writes_per_tablet_multiplier = 100 + partition_count = 32 + + with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table: + (arn, label) = wait_for_active_stream(dynamodbstreams, table) + + ks = f'alternator_{table.name}' + table_name = table.name + cdc_log_table_name = f'{table_name}_scylla_cdc_log' + init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) + + # --- Phase 1: Drive tablet count changes and write data --- + # For each multiplier, alter tablet count and write writes_per_tablet_multiplier + # items to the table. Each write uses a small, bounded set of partition keys + # (0..partition_count-1) to force per-partition ordering collisions, which + # lets us later verify that events within a partition appear in write order. + # `e` is a monotonically increasing counter that encodes the write order. + index = 0 + expected_items = [] + retrieved_items = [] + for tablet_mult in tablet_multipliers: + set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, init_table_count * tablet_mult) + for _ in range(0, writes_per_tablet_multiplier): + p = str(random.randint(0, partition_count - 1)) + index += 1 + # we want to partition keys by small set of partitions to force key collisions + # to detect any ordering issues within a partition + c = '1' + e = str(index) + table.put_item(Item={'p': p, 'c': c, 'e': e}) + expected_items.append((p, c, e)) + + # --- Phase 2: Read all stream records via GetRecords, building shard topology --- + # Iterate DescribeStream in a retry loop until all expected items have been + # retrieved from the stream. For each shard discovered: + # - Record its parent in shard_parents_map (child -> parent). + # - Track root shards (those without a parent). + # - Create a GetShardIterator starting at the shard's StartingSequenceNumber + # and later call GetRecords to drain it, collecting all the formerly put (p, c, e) triples. + iterators = {} + shard_parents_map = {} + root_shard_ids = [] + end_ts = time.time() + 30 + while len(retrieved_items) < len(expected_items): + for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts): + shard_id = shard['ShardId'] + parent_shard_id = shard.get('ParentShardId', None) + if parent_shard_id is None: + if shard_id not in root_shard_ids: + root_shard_ids.append(shard_id) + elif shard_id in shard_parents_map: + assert shard_parents_map[shard_id] == parent_shard_id + else: + shard_parents_map[shard_id] = parent_shard_id + if shard_id in iterators: + continue + start = shard['SequenceNumberRange']['StartingSequenceNumber'] + iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator'] + assert iter is not None + iterators[shard_id] = iter + + + for shard_id, iter in iterators.items(): + response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000) + if 'NextShardIterator' in response: + iterators[shard_id] = response['NextShardIterator'] + + records = response.get('Records', []) + for record in records: + dynamodb = record['dynamodb'] + keys = dynamodb['NewImage'] + + assert set(keys) == set(['p', 'c', 'e']) + p = keys['p'].get('S') + c = keys['c'].get('S') + e = keys['e'].get('S') + retrieved_items.append((p, c, e)) + + # --- Phase 3: Verify completeness and per-partition ordering of stream records --- + # Check that the set of records retrieved from the stream exactly matches the + # set of items written (same count, same content when sorted). + # Then verify that for every partition key `p`, the events appear in the same + # order they were written: the `e` value (write index) must be strictly + # increasing across successive reads for the same `p`. + assert len(retrieved_items) == len(expected_items) + assert sorted(retrieved_items) == sorted(expected_items) + previous_values = {} + # we iterate over retrieved items here in an order of how they were read from the stream + # implementation might (and will) reorder items for data with different partition keys as it pleases, + # but for the same partition key the order of events must be preserved. + # `previous_values` keeps track of last `e` value seen for each partition key `p`. When written for the same `p` + # next value will always have `e` greater than previous one. + for p, c, e in retrieved_items: + e = int(e) + pv = previous_values.get(p, -1) + assert pv < e + previous_values[p] = e + + # --- Phase 4: Verify shard topology - root shard count --- + # The number of root shards (those without a parent) must equal the initial + # tablet count. + assert len(root_shard_ids) == init_table_count + + # --- Phase 5: Verify shard topology - generational structure and path lengths --- + # Build `streams_superseded_by_next_gen`: the set of shard IDs that appear as a parent of some + # other shard. Shards NOT in this set are "leaves" (the most recent generation). + # + # Group all shards (except root ones) by their generation, inferred from the + # timestamp embedded in the shard ID (characters [2:13]). + # + # For each leaf shard, walk the parent chain up to the root (get_path), then + # assert that the number of siblings in the leaf's generation equals + # `init_table_count * tablet_multipliers[len(path) - 1]`. + # This verifies the expected doubling/halving pattern produced by the + # alternating tablet_multipliers sequence. + streams_superseded_by_next_gen = set() + for (shard_id, parent_shard_id) in shard_parents_map.items(): + streams_superseded_by_next_gen.add(parent_shard_id) + + def get_generation_from_shard(t): + # Shard IDs have the format "H:", e.g. + # "H 19b22b8563a:7fffffffffffffffe8547ce46400000" + # Characters [2:13] extract the hex generation timestamp, which groups + # shards that were created together during the same tablet count change. + return t[2:13] + count_map = {} + for r in shard_parents_map: + gen = get_generation_from_shard(r) + count = count_map.get(gen, 0) + count_map[gen] = count + 1 + # Returns the ancestor chain from shard_id up to the root (a shard with no parent). + # The returned list is ordered [shard_id, parent, grandparent, ..., root]. + def get_path(shard_id): + path = [ shard_id ] + current_shard_id = shard_id + while True: + parent_shard_id = shard_parents_map.get(current_shard_id, None) + if parent_shard_id is None: + return path + path.append(parent_shard_id) + current_shard_id = parent_shard_id + for r in shard_parents_map: + if r not in streams_superseded_by_next_gen: + path = get_path(r) + siblings_count = count_map[get_generation_from_shard(r)] + assert siblings_count == init_table_count * tablet_multipliers[len(path) - 1] diff --git a/test/alternator/test_tablets.py b/test/alternator/test_tablets.py index 8e0b796277..4072474baa 100644 --- a/test/alternator/test_tablets.py +++ b/test/alternator/test_tablets.py @@ -131,26 +131,6 @@ def test_tablets_tag_vs_config(dynamodb): with new_test_table(dynamodb, **schema_vnodes) as table: pass -# Before Alternator Streams is supported with tablets (#23838), let's verify -# that enabling Streams results in an orderly error. This test should be -# deleted when #23838 is fixed. -def test_streams_enable_error_with_tablets(dynamodb): - # Test attempting to create a table already with streams - with pytest.raises(ClientError, match='ValidationException.*tablets'): - with new_test_table(dynamodb, - Tags=[{'Key': initial_tablets_tag, 'Value': '4'}], - StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}, - KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ], - AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table: - pass - # Test attempting to add a stream to an existing table - with new_test_table(dynamodb, - Tags=[{'Key': initial_tablets_tag, 'Value': '4'}], - KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ], - AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table: - with pytest.raises(ClientError, match='ValidationException.*tablets'): - table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}); - # For a while (see #18068) it was possible to create an Alternator table with # tablets enabled and choose LWT for write isolation (always_use_lwt) # but the writes themselves failed. This test verifies that this is no longer diff --git a/test/alternator/test_ttl.py b/test/alternator/test_ttl.py index 2e4eb20699..8ac9a8517c 100644 --- a/test/alternator/test_ttl.py +++ b/test/alternator/test_ttl.py @@ -655,12 +655,6 @@ def test_ttl_expiration_lsi_key(dynamodb, waits_for_expiration): # content), and a special userIdentity flag saying that this is not a regular # REMOVE but an expiration. Reproduces issue #11523. def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration): - # Alternator Streams currently doesn't work with tablets, so until - # #23838 is solved, skip this test on tablets. - for tag in TAGS: - if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit(): - pytest.skip("Streams test skipped on tablets due to #23838") - # In my experiments, a 30-minute (1800 seconds) is the typical # expiration delay in this test. If the test doesn't finish within # max_duration, we report a failure.