mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-27 03:45:11 +00:00
Merge 'alternator: Add stream support for tablets' from Radosław Cybulski
Implements neccesary changes for Streams to work with tablet based tables. - add utility functions to `system_keyspace` that helps reading cdc content from cdc log tables for tablet based base tables (similar api to ones for vnodes) - remove antitablet `if` checks, update tests that fail / skip if tablets are selected - add two tests to extensively test tablet based version, especially while manipulating stream count Fixes #23838 Fixes SCYLLADB-463 Closes scylladb/scylladb#28500 * github.com:scylladb/scylladb: alternator: add streams with tablets tests alternator: remove antitablet guards when using Streams alternator: implement streams for tablets treewide: add cdc helper functions to system_keyspace alternator: add system_keyspace reference
This commit is contained in:
@@ -32,6 +32,7 @@ controller::controller(
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
sharded<service::memory_limiter>& memory_limiter,
|
||||
sharded<auth::service>& auth_service,
|
||||
@@ -45,6 +46,7 @@ controller::controller(
|
||||
, _ss(ss)
|
||||
, _mm(mm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _sys_ks(sys_ks)
|
||||
, _cdc_gen_svc(cdc_gen_svc)
|
||||
, _memory_limiter(memory_limiter)
|
||||
, _auth_service(auth_service)
|
||||
@@ -94,7 +96,7 @@ future<> controller::start_server() {
|
||||
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
|
||||
return cfg.alternator_timeout_in_ms;
|
||||
};
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), std::ref(_sys_ks),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), std::ref(_vsc), _ssg.value(),
|
||||
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
|
||||
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
|
||||
@@ -22,6 +22,7 @@ class memory_limiter;
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
class system_keyspace;
|
||||
class config;
|
||||
}
|
||||
|
||||
@@ -65,6 +66,7 @@ class controller : public protocol_server {
|
||||
sharded<service::storage_service>& _ss;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
sharded<cdc::generation_service>& _cdc_gen_svc;
|
||||
sharded<service::memory_limiter>& _memory_limiter;
|
||||
sharded<auth::service>& _auth_service;
|
||||
@@ -84,6 +86,7 @@ public:
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
sharded<service::memory_limiter>& memory_limiter,
|
||||
sharded<auth::service>& auth_service,
|
||||
|
||||
@@ -271,6 +271,7 @@ executor::executor(gms::gossiper& gossiper,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
db::system_keyspace& system_keyspace,
|
||||
cdc::metadata& cdc_metadata,
|
||||
vector_search::vector_store_client& vsc,
|
||||
smp_service_group ssg,
|
||||
@@ -280,6 +281,7 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_proxy(proxy),
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
_system_keyspace(system_keyspace),
|
||||
_cdc_metadata(cdc_metadata),
|
||||
_vsc(vsc),
|
||||
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
|
||||
@@ -1645,16 +1647,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
|
||||
if (rs->uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Vector indexes is a new feature that we decided to only support
|
||||
// on tablets.
|
||||
if (vector_indexes && vector_indexes->Size() > 0) {
|
||||
@@ -1846,14 +1839,9 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
if (add_stream_options(*stream_specification, builder, p.local())) {
|
||||
validate_cdc_log_name_length(builder.cf_name());
|
||||
}
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool()) {
|
||||
if (stream_enabled->GetBool()) {
|
||||
if (p.local().local_db().find_keyspace(tab->ks_name()).get_replication_strategy().uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
"If you want to enable streams, re-create this table with vnodes (with the tag 'system:initial_tablets' set to 'none').");
|
||||
}
|
||||
if (tab->cdc_options().enabled()) {
|
||||
co_return api_error::validation("Table already has an enabled stream: TableName: " + tab->cf_name());
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
class system_keyspace;
|
||||
}
|
||||
|
||||
namespace audit {
|
||||
@@ -88,6 +89,7 @@ class executor : public peering_sharded_service<executor> {
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_manager& _mm;
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
db::system_keyspace& _system_keyspace;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
vector_search::vector_store_client& _vsc;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
@@ -131,6 +133,7 @@ public:
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
db::system_keyspace& system_keyspace,
|
||||
cdc::metadata& cdc_metadata,
|
||||
vector_search::vector_store_client& vsc,
|
||||
smp_service_group ssg,
|
||||
|
||||
@@ -899,10 +899,22 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
// TODO: label
|
||||
// TODO: creation time
|
||||
|
||||
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
|
||||
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
|
||||
std::map<db_clock::time_point, cdc::streams_version> topologies;
|
||||
|
||||
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
|
||||
if (schema->table().uses_tablets()) {
|
||||
// We can't use table creation time here, as tablets might report a
|
||||
// generation timestamp just before table creation. This is safe
|
||||
// because CDC generations are per-table and cannot pre-date the
|
||||
// table, so expanding the window won't pull in unrelated data.
|
||||
auto low_ts = db_clock::now() - ttl;
|
||||
topologies = co_await _system_keyspace.read_cdc_for_tablets_versioned_streams(bs->ks_name(), bs->cf_name(), low_ts);
|
||||
} else {
|
||||
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
||||
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
|
||||
topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
|
||||
}
|
||||
|
||||
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
|
||||
const auto e = topologies.end();
|
||||
std::optional<shard_id> shard_filter;
|
||||
|
||||
@@ -1488,9 +1500,15 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
}
|
||||
|
||||
// ugh. figure out if we are and end-of-shard
|
||||
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
||||
|
||||
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
|
||||
db_clock::time_point ts;
|
||||
if (schema->table().uses_tablets()) {
|
||||
ts = co_await _system_keyspace.read_cdc_for_tablets_current_generation_timestamp(base->ks_name(), base->cf_name());
|
||||
} else {
|
||||
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
||||
ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
|
||||
}
|
||||
|
||||
auto& shard = iter.shard;
|
||||
|
||||
if (shard.time < ts && ts < high_ts) {
|
||||
|
||||
@@ -93,8 +93,16 @@ public:
|
||||
future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context);
|
||||
future<bool> cdc_desc_exists(db_clock::time_point, context);
|
||||
|
||||
// Reads and builds generation map - a map from generation timestamps to vector of all stream ids for that generation.
|
||||
// Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation).
|
||||
// Returns empty map if there are no generations with timestamp >= `not_older_than`.
|
||||
// NOTE: there's a sibling `read_cdc_for_tablets_versioned_streams`, that reads the same data for tables backed by tablets. The data returned is the same.
|
||||
// NOTE: currently used only by alternator
|
||||
future<std::map<db_clock::time_point, cdc::streams_version>> cdc_get_versioned_streams(db_clock::time_point not_older_than, context);
|
||||
|
||||
// Read current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found.
|
||||
// NOTE: there's a sibling `read_cdc_for_tablets_current_generation_timestamp` in `system_keyspace`, that does the same for tables backed up by tablets.
|
||||
// NOTE: currently used only by alternator
|
||||
future<db_clock::time_point> cdc_current_generation_timestamp(context);
|
||||
|
||||
future<qos::service_levels_info> get_service_levels(qos::query_context ctx) const;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <ranges>
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/loop.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
@@ -2057,6 +2058,67 @@ future<std::unordered_set<dht::token>> system_keyspace::get_local_tokens() {
|
||||
});
|
||||
}
|
||||
|
||||
// Tablet-based counterpart of system_distributed_keyspace::cdc_current_generation_timestamp.
|
||||
// Unlike the vnode version which reads from a replicated system_distributed table (and thus
|
||||
// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet
|
||||
// metadata, so consistency_level::ONE is the only meaningful level.
|
||||
future<db_clock::time_point> system_keyspace::read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name) {
|
||||
// note: we only care about the newest ("current") timestamp, hence limit 1.
|
||||
// this depends on table's native clustering order being DESC on timestamp, which is the case for CDC_TIMESTAMPS.
|
||||
static const sstring query = format("SELECT timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ? limit 1", NAME, CDC_TIMESTAMPS);
|
||||
auto timestamp_cql = co_await _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
{ ks_name, table_name },
|
||||
cql3::query_processor::cache_internal::no);
|
||||
|
||||
co_return timestamp_cql->one().get_as<db_clock::time_point>("timestamp");
|
||||
}
|
||||
|
||||
// Tablet-based counterpart of system_distributed_keyspace::cdc_get_versioned_streams.
|
||||
// Unlike the vnode version which reads from a replicated system_distributed table (and thus
|
||||
// requires QUORUM), this reads from a virtual table backed by local Raft-managed tablet
|
||||
// metadata, so consistency_level::ONE is the only meaningful level.
|
||||
future<std::map<db_clock::time_point, cdc::streams_version>> system_keyspace::read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than) {
|
||||
// We read all streams and filter in memory, because we need to include
|
||||
// the generation that straddles the `not_older_than` boundary (i.e. the
|
||||
// one just before it), and the virtual table does not support the
|
||||
// required range query directly.
|
||||
static const sstring stream_id_query = format("SELECT stream_id, stream_state, timestamp FROM {}.{} WHERE keyspace_name = ? and table_name = ?", NAME, CDC_STREAMS);
|
||||
|
||||
std::map<db_clock::time_point, utils::chunked_vector<cdc::stream_id>> temp_result;
|
||||
|
||||
co_await _qp.query_internal(stream_id_query,
|
||||
db::consistency_level::ONE,
|
||||
data_value_list{ ks_name, table_name },
|
||||
1000, // page size
|
||||
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
auto stream_state = cdc::read_stream_state(row.get_as<int8_t>("stream_state"));
|
||||
if (stream_state != cdc::stream_state::current) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
auto ts = row.get_as<db_clock::time_point>("timestamp");
|
||||
|
||||
temp_result[ts].push_back(cdc::stream_id{ row.get_as<bytes>("stream_id") });
|
||||
co_await coroutine::maybe_yield();
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
|
||||
std::map<db_clock::time_point, cdc::streams_version> result;
|
||||
// Include the generation that straddles the boundary, matching the vnode
|
||||
// counterpart (cdc_get_versioned_streams) which does the same adjustment.
|
||||
auto it = temp_result.lower_bound(not_older_than);
|
||||
if (it != temp_result.begin()) {
|
||||
--it;
|
||||
}
|
||||
for (; it != temp_result.end(); ++it) {
|
||||
auto& ts = it->first;
|
||||
auto& streams = it->second;
|
||||
result.insert_or_assign(ts, cdc::streams_version{ std::move(streams), ts });
|
||||
}
|
||||
co_return std::move(result);
|
||||
}
|
||||
|
||||
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
|
||||
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
|
||||
|
||||
@@ -456,7 +456,7 @@ public:
|
||||
// For this purpose, records of cleanup operations (the affected token ranges
|
||||
// and commitlog ranges) are kept in a system table.
|
||||
//
|
||||
// The below functions manipulate these records.
|
||||
// The below functions manipulate these records.
|
||||
|
||||
// Saves a record of a token range affected by cleanup.
|
||||
// After reboot, tokens from this range will be replayed only if they are on replay positions
|
||||
@@ -587,6 +587,19 @@ public:
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
|
||||
|
||||
// Reads current generation timestamp for the given table. Throws runtime_error (see `cql3::untyped_result_set::one()`) if table not found.
|
||||
// NOTE: there's a sibling `cdc_current_generation_timestamp` in `system_distributed_keyspace`, that does the same for tables backed up by vnodes.
|
||||
// NOTE: currently used only by alternator
|
||||
future<db_clock::time_point> read_cdc_for_tablets_current_generation_timestamp(std::string_view ks_name, std::string_view table_name);
|
||||
|
||||
// Reads and builds generation map for a given table - a map from generation timestamps to vector of all stream ids for that generation.
|
||||
// Generations with timestamp >= `not_older_than` are returned, plus the one just before it (the straddling generation).
|
||||
// If `not_older_than` is not provided (defaults to min), all generations will be returned.
|
||||
// Returns empty map if table is not found or if there are no generations.
|
||||
// NOTE: there's a sibling `cdc_get_versioned_streams`, that reads the same data for tables backed by legacy vnodes. The data returned is the same.
|
||||
// NOTE: currently used only by alternator
|
||||
future<std::map<db_clock::time_point, cdc::streams_version>> read_cdc_for_tablets_versioned_streams(std::string_view ks_name, std::string_view table_name, db_clock::time_point not_older_than = db_clock::time_point::min());
|
||||
|
||||
// Load Raft Group 0 id from scylla.local
|
||||
future<utils::UUID> get_raft_group0_id();
|
||||
|
||||
|
||||
@@ -296,14 +296,6 @@ experimental:
|
||||
considered experimental so needs to be enabled explicitly with the
|
||||
`--experimental-features=alternator-streams` configuration option.
|
||||
|
||||
In this version, Alternator Streams is only supported if the base table
|
||||
uses vnodes instead of tablets. However, by default new tables use tablets
|
||||
so to create a table that can be used with Streams, you must set the tag
|
||||
`system:initial_tablets` set to `none` during CreateTable - so that the
|
||||
new table will use vnodes. Streams cannot be enabled on an already-existing
|
||||
table that uses tablets.
|
||||
See <https://github.com/scylladb/scylla/issues/23838>.
|
||||
|
||||
Alternator streams also differ in some respects from DynamoDB Streams:
|
||||
* The number of separate "shards" in Alternator's streams is significantly
|
||||
larger than is typical on DynamoDB.
|
||||
|
||||
@@ -184,7 +184,7 @@ entire data center, or other data centers, in that case.
|
||||
It replaces the older approach which was named "vnodes". See
|
||||
[Data Distribution with Tablets](../architecture/tablets.rst) for details.
|
||||
|
||||
In this version, tablet support is almost complete, so new
|
||||
In this version, tablet support is complete, so new
|
||||
Alternator tables default to following what the global configuration flag
|
||||
[tablets_mode_for_new_keyspaces](../reference/configuration-parameters.rst#confval-tablets_mode_for_new_keyspaces)
|
||||
tells them to.
|
||||
@@ -207,9 +207,3 @@ in the CreateTable operation. The value of this tag can be:
|
||||
The `system:initial_tablets` tag only has any effect while creating
|
||||
a new table with CreateTable - changing it later has no effect.
|
||||
|
||||
Because the tablets support is incomplete, when tablets are enabled for an
|
||||
Alternator table, the following features will not work for this table:
|
||||
|
||||
* Enabling Streams with CreateTable or UpdateTable doesn't work
|
||||
(results in an error).
|
||||
See <https://github.com/scylladb/scylla/issues/23838>.
|
||||
|
||||
2
main.cc
2
main.cc
@@ -2609,7 +2609,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
api::set_server_service_levels(ctx, cql_server_ctl, qp).get();
|
||||
|
||||
alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group);
|
||||
alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, sys_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group);
|
||||
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
auto do_drain = defer_verbose_shutdown("local storage", [&ss] {
|
||||
|
||||
@@ -75,6 +75,11 @@ def run_alternator_cmd(pid, dir):
|
||||
# We only list here Alternator-specific experimental features - CQL
|
||||
# ones are listed in test/cqlpy/run.py.
|
||||
'--experimental-features=alternator-streams',
|
||||
# this is required by test_streams.py test_parent_filtering and test_get_records_with_alternating_tablets_count
|
||||
# setting the value using scylla_config_temporary won't work, because the value is read
|
||||
# at the start and then periodically with `tablet-load-stats-refresh-interval-in-seconds`
|
||||
# interval, which by default is 60 seconds.
|
||||
'--tablet-load-stats-refresh-interval-in-seconds=1',
|
||||
]
|
||||
if '--https' in sys.argv:
|
||||
run.setup_ssl_certificate(dir)
|
||||
|
||||
@@ -9,6 +9,8 @@ extra_scylla_cmdline_options:
|
||||
- '--logger-log-level=alternator_controller=trace'
|
||||
- '--logger-log-level=alternator_ttl=trace'
|
||||
- '--logger-log-level=paxos=trace'
|
||||
- '--tablet-load-stats-refresh-interval-in-seconds=1'
|
||||
- '--tablets-initial-scale-factor=1'
|
||||
extra_scylla_config_options:
|
||||
{
|
||||
experimental_features: [
|
||||
|
||||
@@ -112,12 +112,6 @@ def test_alternator_vs_cql(dynamodb, test_table, cql_keyspace, cql_table, option
|
||||
@pytest.fixture(scope='module')
|
||||
def table1(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
# Alternator Streams is expected to fail with tablets due to #23838.
|
||||
# To ensure that this test still runs, instead of xfailing it, we
|
||||
# temporarily coerce Alternator to avoid using default tablets
|
||||
# setting, even if it's available. Remove this "Tags=" line when
|
||||
# issue #23838 is solved.
|
||||
Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}],
|
||||
KeySchema=[
|
||||
# Must have both hash key and range key to allow LSI creation
|
||||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
|
||||
@@ -542,12 +542,6 @@ def test_streams_latency(dynamodb, dynamodbstreams, metrics):
|
||||
# latency metrics are only updated for *successful* operations so we
|
||||
# need to use a real Alternator Stream in this test.
|
||||
with new_test_table(dynamodb,
|
||||
# Alternator Streams is expected to fail with tablets due to #23838.
|
||||
# To ensure that this test still runs, instead of xfailing it, we
|
||||
# temporarily coerce Alternator to avoid using default tablets
|
||||
# setting, even if it's available. We do this by using the following
|
||||
# tags when creating the table:
|
||||
Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}],
|
||||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }],
|
||||
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}
|
||||
|
||||
@@ -16,12 +16,29 @@ from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request
|
||||
|
||||
# All tests in this file are expected to fail with tablets due to #23838.
|
||||
# To ensure that Alternator Streams is still being tested, instead of
|
||||
# xfailing these tests, we temporarily coerce the tests below to avoid
|
||||
# using default tablets setting, even if it's available. We do this by
|
||||
# using the following tags when creating each table below:
|
||||
TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
|
||||
TAGS = []
|
||||
# The following fixture is to ensure that tests in this module will be tested with both vnodes and tablets.
|
||||
# This fixture runs automatically for every test in this module.
|
||||
# To avoid relying on semantics of a similar fixture used in TTL tests, we define it locally here instead of reusing
|
||||
# that fixture, and we do not import or reuse fixtures across modules.
|
||||
# It sets the TAGS variable in the module’s global namespace to the current parameter value before each test.
|
||||
# All tests will be run with both values. On AWS the parameterization is meaningless, so we skip the second variant.
|
||||
@pytest.fixture(params=[
|
||||
[{'Key': 'system:initial_tablets', 'Value': 'none'}],
|
||||
[{'Key': 'system:initial_tablets', 'Value': '0'}],
|
||||
], ids=["using vnodes", "using tablets"], autouse=True)
|
||||
def tags_param(request, dynamodb):
|
||||
if is_aws(dynamodb) and request.param[0].get('Value') != 'none':
|
||||
pytest.skip('vnodes/tablets parameterization not applicable on AWS')
|
||||
# Set TAGS in the global namespace of this module
|
||||
global TAGS
|
||||
TAGS = request.param
|
||||
|
||||
def running_using_vnodes():
|
||||
for tag in TAGS:
|
||||
if tag['Key'] == 'system:initial_tablets':
|
||||
v = tag['Value']
|
||||
return not v or not v.isdigit()
|
||||
|
||||
stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']
|
||||
|
||||
@@ -54,12 +71,14 @@ def disable_stream(dynamodbstreams, table):
|
||||
# So we have to create and delete a table per test. And not run this
|
||||
# test to often against aws.
|
||||
@contextmanager
|
||||
def create_stream_test_table(dynamodb, StreamViewType=None):
|
||||
def create_stream_test_table(dynamodb, StreamViewType=None, Tags=None):
|
||||
if Tags is None:
|
||||
Tags = TAGS
|
||||
spec = { 'StreamEnabled': False }
|
||||
if StreamViewType != None:
|
||||
spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}
|
||||
table = create_test_table(dynamodb, StreamSpecification=spec,
|
||||
Tags=TAGS,
|
||||
Tags=Tags,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
|
||||
],
|
||||
@@ -415,7 +434,7 @@ def test_get_records(dynamodb, dynamodbstreams):
|
||||
if 'NextShardIterator' in response:
|
||||
next_iterators.append(response['NextShardIterator'])
|
||||
|
||||
records = response.get('Records')
|
||||
records = response.get('Records', [])
|
||||
# print("Query {} -> {}".format(iter, records))
|
||||
if records:
|
||||
for record in records:
|
||||
|
||||
543
test/alternator/test_streams_tablets.py
Normal file
543
test/alternator/test_streams_tablets.py
Normal file
@@ -0,0 +1,543 @@
|
||||
# Copyright 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
|
||||
# Tests for stream operations' nuanses that are intristic to ScyllaDB (parent-children relationship on stream shards).
|
||||
|
||||
import time, random, collections
|
||||
|
||||
import pytest
|
||||
from test.alternator.test_streams import create_stream_test_table, wait_for_active_stream
|
||||
|
||||
TABLET_TAGS = [{'Key': 'system:initial_tablets', 'Value': '0'}]
|
||||
|
||||
# get table_id from keyspace and table name
|
||||
def get_table_or_view_id(cql, keyspace: str, table: str):
|
||||
rows = cql.execute(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'")
|
||||
try:
|
||||
row = rows.one()
|
||||
except Exception:
|
||||
row = None
|
||||
if row is not None:
|
||||
return row.id
|
||||
rows = cql.execute(f"select id from system_schema.views where keyspace_name = '{keyspace}' and view_name = '{table}'")
|
||||
return rows.one().id
|
||||
|
||||
# get user table id from cdc_log table id
|
||||
def get_base_table(cql, table_id):
|
||||
# copied from tablets.py:get_base_table
|
||||
# this might return more than one row, but we don't care as all rows
|
||||
# will have the same base_table value
|
||||
rows = cql.execute(f"SELECT base_table FROM system.tablets where table_id = {table_id} limit 1")
|
||||
return rows.one().base_table
|
||||
|
||||
# validate that streams tables are synchronized with tablets count - all new cdc shards have been created
|
||||
def assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name):
|
||||
# we'll try twice here, as in my tests occasionally i've got into a situation, where this function
|
||||
# failed, it seems streams were not yet fully updated (debug build).
|
||||
for x in range(0, 2):
|
||||
tablet_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name)
|
||||
ts = cql.execute(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='{table_name}' ORDER BY timestamp DESC LIMIT 1").one().ts
|
||||
CdcStreamState_CURRENT = 0 # from test.cluster.test_cdc_with_tablets.CdcStreamState.CURRENT
|
||||
count = cql.execute(f"SELECT count(*) FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='{table_name}' AND timestamp = {ts} AND stream_state = {CdcStreamState_CURRENT} limit 1").one().count
|
||||
if count == tablet_count:
|
||||
break
|
||||
# on debug occasionally we need more time
|
||||
time.sleep(0.1)
|
||||
assert count == tablet_count
|
||||
|
||||
# return tablet count for given cdc_log table (table that holds cdc data for given user table)
|
||||
def get_tablet_count_for_base_table_of_table(rest_api, cql, keyspace_name: str, table_name: str):
|
||||
table_id = get_table_or_view_id(cql, keyspace_name, table_name)
|
||||
table_id = get_base_table(cql, table_id)
|
||||
# this might return more than one row, but we don't care as all rows
|
||||
# will have the same tablet_count value
|
||||
rows = cql.execute(f"SELECT tablet_count FROM system.tablets where table_id = {table_id} limit 1")
|
||||
return rows.one().tablet_count
|
||||
|
||||
# modify tablet count for given cdc_log table and wait until the change is applied
|
||||
# this calls alter table with new `min_tablet_count`, which requires additional scylla options to work reliably
|
||||
# (--tablet-load-stats-refresh-interval-in-seconds=1 and --tablets-initial-scale-factor=1)
|
||||
def set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, expected_tablet_count):
|
||||
assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name)
|
||||
cql.execute(f"ALTER TABLE \"{ks}\".\"{cdc_log_table_name}\" WITH tablets = {{'min_tablet_count': {expected_tablet_count}}};")
|
||||
start = time.time()
|
||||
while time.time() < start + 10:
|
||||
if get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name) == expected_tablet_count:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
pytest.fail(f'Tablet count did not reach expected value {expected_tablet_count} within timeout')
|
||||
|
||||
def iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=None):
|
||||
params = {
|
||||
'StreamArn': arn
|
||||
}
|
||||
if filter_shard_id is not None:
|
||||
params['ShardFilter'] = {
|
||||
'Type': 'CHILD_SHARDS',
|
||||
'ShardId': filter_shard_id
|
||||
}
|
||||
desc = dynamodbstreams.describe_stream(**params)
|
||||
|
||||
while True:
|
||||
assert time.time() <= end_ts, "Time ran out"
|
||||
shards = desc['StreamDescription']['Shards']
|
||||
|
||||
for shard in shards:
|
||||
yield shard
|
||||
assert time.time() <= end_ts, "Time ran out"
|
||||
|
||||
last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")
|
||||
if not last_shard:
|
||||
break
|
||||
|
||||
desc = dynamodbstreams.describe_stream(ExclusiveStartShardId=last_shard, **params)
|
||||
|
||||
# helper function for parent-child relationship test - it will:
|
||||
# - drive changes in tablet count
|
||||
# - wait for expected number of stream shards to be created
|
||||
# - build parent map (shard -> parent) and children map (shard -> list of children) between stream shards
|
||||
# - call the callback with the data
|
||||
def run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, callback):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table:
|
||||
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
||||
|
||||
ks = f'alternator_{table.name}'
|
||||
table_name = table.name
|
||||
cdc_log_table_name = f'{table_name}_scylla_cdc_log'
|
||||
# Record initial tablet count, which determines the expected
|
||||
# number of root stream shards (shards without a parent).
|
||||
init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name)
|
||||
|
||||
# Drive tablet count changes according to tablet_multipliers.
|
||||
# Each change triggers a new "generation" of stream shards to be created.
|
||||
# tablet multiplier might be less than 1
|
||||
for tablet_mult in tablet_multipliers:
|
||||
tablet_count = int(init_table_count * tablet_mult)
|
||||
assert tablet_count >= 1
|
||||
set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, tablet_count)
|
||||
|
||||
# Strip multipliers that don't change the tablet count from the
|
||||
# previous state. The table starts with a multiplier of 1 (i.e.
|
||||
# init_table_count tablets), so a leading 1 is a no-op. Subsequent
|
||||
# duplicates are also no-ops since ALTER TABLE with the current
|
||||
# tablet count doesn't trigger a new CDC stream generation.
|
||||
effective_multipliers = []
|
||||
for m in tablet_multipliers:
|
||||
if effective_multipliers:
|
||||
if m != effective_multipliers[-1]:
|
||||
effective_multipliers.append(m)
|
||||
elif m != 1:
|
||||
effective_multipliers.append(m)
|
||||
|
||||
# The total number of stream shards across all generations:
|
||||
# 1) The root generation created when the table is first created.
|
||||
total_shard_count = init_table_count
|
||||
# 2) One generation per effective multiplier.
|
||||
total_shard_count += int(sum(effective_multipliers) * init_table_count)
|
||||
|
||||
# Poll DescribeStream until all expected stream shards have appeared,
|
||||
# building a shard_id -> parent_shard_id map (shard_parents_map) and
|
||||
# collecting root shard IDs (shards with no parent).
|
||||
end_ts = time.time() + 30
|
||||
root_shard_ids = []
|
||||
shard_parents_map = {}
|
||||
while time.time() < end_ts:
|
||||
root_shard_ids = []
|
||||
shard_parents_map = {}
|
||||
|
||||
for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts):
|
||||
shard_id = shard['ShardId']
|
||||
parent_shard_id = shard.get('ParentShardId', None)
|
||||
assert shard_id not in shard_parents_map
|
||||
if parent_shard_id is None:
|
||||
root_shard_ids.append(shard_id)
|
||||
shard_parents_map[shard_id] = parent_shard_id
|
||||
|
||||
if len(shard_parents_map) >= total_shard_count:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# For each shard, use DescribeStream's CHILD_SHARDS filter to
|
||||
# build a shard_id -> [child_shard_ids] map (shard_children_map).
|
||||
|
||||
shard_children_map = {}
|
||||
for shard_id in shard_parents_map:
|
||||
filter_children = []
|
||||
for child_shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts, filter_shard_id=shard_id):
|
||||
child_shard_id = child_shard['ShardId']
|
||||
filter_children.append(child_shard_id)
|
||||
shard_children_map[shard_id] = filter_children
|
||||
|
||||
callback(root_shard_ids, shard_parents_map, shard_children_map)
|
||||
|
||||
# run a test, where we create two cdc log generations (parent and children)
|
||||
# and children count is half of parents (thus parents are merged into children)
|
||||
# validate merge assumptions:
|
||||
# - every two parents has the same child
|
||||
# - only half of parents is marked as parent by any child (because child can provide only single parent and in case of merge it has two parents)
|
||||
# - various sizes
|
||||
# NOTE: this test assumes starting tablet count is bigger than one
|
||||
# NOTE: this test fails if `system:initial_tablets` is set to anything but 0 - maybe because 0 means start with some default
|
||||
# while non-zero means start with this value, but never go any lower?
|
||||
def test_parent_children_merge(dynamodb, dynamodbstreams, rest_api, cql):
|
||||
def verify_parent_children_relationship_merge(root_shard_ids, shard_parents_map, shard_children_map):
|
||||
# shard_parents_map will contain both generations, so we strip original one
|
||||
shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None }
|
||||
|
||||
# shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones
|
||||
shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children }
|
||||
|
||||
# shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is half of parents
|
||||
assert len(shard_parents_map) * 2 == len(root_shard_ids)
|
||||
|
||||
# shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents
|
||||
# which must be equal to root_shard_ids
|
||||
assert len(root_shard_ids) == len(shard_children_map)
|
||||
|
||||
# all parents must be from first (original) generation (`root_shard_ids`)
|
||||
assert sorted(shard_children_map.keys()) == sorted(root_shard_ids)
|
||||
|
||||
# only half of parents will show up as parents in shard_parents_map, but all of them must be from root_shard_ids
|
||||
assert len(shard_parents_map.values()) * 2 == len(root_shard_ids)
|
||||
assert set(root_shard_ids).issuperset(set(shard_parents_map.values()))
|
||||
|
||||
# every parent has exactly one child - we're merging
|
||||
assert all(len(children) == 1 for children in shard_children_map.values())
|
||||
|
||||
# every child must occur exactly twice in children lists - we're merging (2 -> 1) so
|
||||
# for every two parents there will be a one child, thus two parents have the same child as children
|
||||
existence_count = collections.defaultdict(int)
|
||||
for children in shard_children_map.values():
|
||||
for child in children:
|
||||
existence_count[child] += 1
|
||||
assert all(count == 2 for count in existence_count.values())
|
||||
|
||||
run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [0.5], verify_parent_children_relationship_merge)
|
||||
|
||||
# run a test, where we create two cdc log generations (parent and children)
|
||||
# and children count is double of parents (thus parents are splited into children)
|
||||
# validate split assumptions:
|
||||
# - every parents has the two distinct children
|
||||
# - every two children has the same parent
|
||||
# - various sizes
|
||||
def test_parent_children_split(dynamodb, dynamodbstreams, rest_api, cql):
|
||||
def verify_parent_children_relationship_split(root_shard_ids, shard_parents_map, shard_children_map):
|
||||
# shard_parents_map will contain both generations, so we strip original one
|
||||
shard_parents_map = { shard_id: parent_id for shard_id, parent_id in shard_parents_map.items() if parent_id is not None }
|
||||
|
||||
# shard_children_map will contain both generations, but only original generation will have children, so we strip non-children ones
|
||||
shard_children_map = { shard_id: children for shard_id, children in shard_children_map.items() if children }
|
||||
|
||||
# shard_parents_map contains child -> parent map, thus it's size is equal to total number of children, which is double of parents
|
||||
assert len(shard_parents_map) == len(root_shard_ids) * 2
|
||||
|
||||
# shard_children_map contains parent -> list of children map, thus it's size is equal to total number of parents
|
||||
# which must be equal to root_shard_ids
|
||||
assert len(root_shard_ids) == len(shard_children_map)
|
||||
|
||||
# all parents must be from first (original) generation (`root_shard_ids`)
|
||||
assert sorted(shard_children_map.keys()) == sorted(root_shard_ids)
|
||||
|
||||
# every parent must show up shard_parents_map twice
|
||||
# set of parents must equal `root_shard_ids`
|
||||
assert len(shard_parents_map.values()) == len(root_shard_ids) * 2
|
||||
assert set(root_shard_ids) == set(shard_parents_map.values())
|
||||
existence_count = collections.defaultdict(int)
|
||||
for child in shard_parents_map.values():
|
||||
existence_count[child] += 1
|
||||
assert all(count == 2 for count in existence_count.values())
|
||||
|
||||
# every parent has exactly two children - we're splitting
|
||||
assert all(len(children) == 2 for children in shard_children_map.values())
|
||||
|
||||
# every child must occur exactly once in children lists - we're splitting (2 -> 1) so
|
||||
# for every parent there will be two distinct children
|
||||
all_children = []
|
||||
for children in shard_children_map.values():
|
||||
for child in children:
|
||||
all_children.append(child)
|
||||
assert len(set(all_children)) == len(all_children)
|
||||
|
||||
run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, [2], verify_parent_children_relationship_split)
|
||||
|
||||
# the test will:
|
||||
# - create a table with streams enabled
|
||||
# - get initial tablet count
|
||||
# - for each multiplier in tablet_multipliers:
|
||||
# - modify tablet count to initial * multiplier - this will trigger
|
||||
# creation of new stream shards in the stream as currently we have 1 to 1
|
||||
# mapping between tablets and stream shards
|
||||
# - after all modifications, use DescribeStream to get all stream shards
|
||||
# - build a map of stream shard -> parent stream shard (shard_parents_map) and
|
||||
# a map of stream shard -> child stream shards (shard_children_map)
|
||||
# - verify that:
|
||||
# - number of root stream shards (stream shards without parent) is correct -
|
||||
# should be equal to initial tablet count
|
||||
# - starting from root stream shards and following children, each path
|
||||
# down the tree has length equal to len(tablet_multipliers) -
|
||||
# this is because every change in tablet count causes new generation
|
||||
# of stream shards to be created. Each stream shard from previous generation will point
|
||||
# to at least one stream shard in the next generation and never to some other generation.
|
||||
def test_parent_filtering(dynamodb, dynamodbstreams, rest_api, cql):
|
||||
tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1]
|
||||
|
||||
def verify_parent_children_relationship(root_shard_ids, shard_parents_map, shard_children_map):
|
||||
# Verify the split/merge invariant: in a split all children point
|
||||
# back to the same parent; in a merge the child may point to only one
|
||||
# of its two parents, but both parents will have the child in their CHILD_SHARDS result.
|
||||
#
|
||||
# First, build declared_children from the ParentShardId field recorded
|
||||
# in shard_parents_map, then compare against the CHILD_SHARDS filter
|
||||
# results to confirm consistency.
|
||||
declared_children = {} # parent_shard_id -> [child_shard_ids derived from ParentShardId]
|
||||
for shard_id, parent_shard_id in shard_parents_map.items():
|
||||
if parent_shard_id is not None:
|
||||
declared_children.setdefault(parent_shard_id, []).append(shard_id)
|
||||
|
||||
all_shards = set()
|
||||
in_children = set()
|
||||
for shard_id in shard_parents_map:
|
||||
all_shards.add(shard_id)
|
||||
filter_children = shard_children_map[shard_id]
|
||||
for child_shard_id in filter_children:
|
||||
in_children.add(child_shard_id)
|
||||
# Verify that every child declared via ParentShardId is also returned
|
||||
# by the CHILD_SHARDS filter for this shard.
|
||||
declared = set(declared_children.get(shard_id, []))
|
||||
assert declared.issubset(set(filter_children)), \
|
||||
f"Declared children {declared} not subset of filter children {set(filter_children)} for shard {shard_id}"
|
||||
# Between generations (currently) we can have either splits by half or merges two into one.
|
||||
# In case of split - children count will be > 1 and all children will point exactly to parent
|
||||
# (so declared == filter_children).
|
||||
# In case of merge - the CHILD_SHARDS filter returns the merged child for both parents,
|
||||
# but the child's ParentShardId can only point to one of them. So the non-designated
|
||||
# parent will have filter_children = [child] but declared_children = [].
|
||||
# This assert checks that either all filter children were declared (split case) or
|
||||
# we have at most one child (merge case where this parent isn't the designated one).
|
||||
assert set(filter_children) == declared or len(filter_children) <= 1, \
|
||||
f"Unexpected children mismatch for shard {shard_id}: filter={filter_children}, declared={list(declared)}"
|
||||
|
||||
# Verify split/merge invariants across generation boundaries.
|
||||
# Group shards by generation (extracted from the hex timestamp in the shard ID).
|
||||
def get_gen(t):
|
||||
return t[2:13]
|
||||
gen_shards = {}
|
||||
for shard_id in shard_parents_map:
|
||||
gen = get_gen(shard_id)
|
||||
gen_shards.setdefault(gen, []).append(shard_id)
|
||||
# Sort generations chronologically and verify parent/child ratios.
|
||||
sorted_gens = sorted(gen_shards.keys())
|
||||
for i in range(1, len(sorted_gens)):
|
||||
prev_gen = sorted_gens[i - 1]
|
||||
curr_gen = sorted_gens[i]
|
||||
parent_count = len(gen_shards[prev_gen])
|
||||
child_count = len(gen_shards[curr_gen])
|
||||
if child_count > parent_count:
|
||||
# Split: 2x children, every parent has 2 children pointing to it
|
||||
assert child_count == 2 * parent_count, \
|
||||
f"Split ratio wrong: {parent_count} parents -> {child_count} children"
|
||||
for p in gen_shards[prev_gen]:
|
||||
kids = declared_children.get(p, [])
|
||||
assert len(kids) == 2, f"Split parent {p} has {len(kids)} declared children, expected 2"
|
||||
elif child_count < parent_count:
|
||||
# Merge: 2x parents, every child has 2 parents (but only 1 declared via ParentShardId)
|
||||
assert parent_count == 2 * child_count, \
|
||||
f"Merge ratio wrong: {parent_count} parents -> {child_count} children"
|
||||
# Each child in the current generation should appear as a filter_child
|
||||
# of exactly 2 parents from the previous generation.
|
||||
for c in gen_shards[curr_gen]:
|
||||
parents_of_c = [p for p in gen_shards[prev_gen] if c in shard_children_map.get(p, [])]
|
||||
assert len(parents_of_c) == 2, \
|
||||
f"Merge child {c} has {len(parents_of_c)} parents, expected 2"
|
||||
|
||||
# Verify that the set of shards with no parent exactly matches
|
||||
# root_shard_ids collected earlier (i.e. no orphaned shards).
|
||||
shards_without_parents = all_shards - in_children
|
||||
assert shards_without_parents == set(root_shard_ids)
|
||||
|
||||
# Walk the shard tree recursively from each root and verify
|
||||
# that every root-to-leaf path has length exactly len(tablet_multipliers),
|
||||
# confirming one new generation per tablet count change.
|
||||
def run_and_verify(shard_id, depth):
|
||||
children = shard_children_map.get(shard_id, None)
|
||||
if not children:
|
||||
assert depth == len(tablet_multipliers)
|
||||
else:
|
||||
for ch in children:
|
||||
run_and_verify(ch, depth + 1)
|
||||
|
||||
for r in root_shard_ids:
|
||||
run_and_verify(r, 1)
|
||||
run_parent_children_relationship_test(dynamodb, dynamodbstreams, rest_api, cql, tablet_multipliers, verify_parent_children_relationship)
|
||||
|
||||
# this test will:
|
||||
# - create a table with streams enabled
|
||||
# - get initial tablet count
|
||||
# - for each multiplier in tablet_multipliers:
|
||||
# - modify tablet count to initial * multiplier - this will trigger
|
||||
# creation of new stream shards in the stream as currently we have 1 to 1
|
||||
# mapping between tablets and stream shards
|
||||
# - perform writes_per_tablet_multiplier writes to the table
|
||||
# - after all modifications, use DescribeStream to build shard_parents_map (parent -> child) stream shards map
|
||||
# - use GetRecords to read all stream records
|
||||
# - verify that:
|
||||
# - all written items are present in the stream (check count and then sorted content)
|
||||
# - within each partition key, the records are in order of writes (check that `e` is monotonically increasing for each record)
|
||||
# - the stream shard parent-child relationships are correct - stream shards create "generations" (all stream shards are split or merged at the same moment,
|
||||
# when tablet count is changed). Every stream shard except first ones has a parent, but not every stream shard is being point to as a parent - when stream shard merge
|
||||
# (let's say A & B merge into C), then next generation stream shard (C) has two parents (A & B), but the DynamoDB API allows to appoint only one - let's say A
|
||||
# (the other one - B - will never be pointed to as a parent by anything else).
|
||||
# NOTE: it's not the same as having no children - the stream shard (B) will have a child (C), but that child (C) will have it's sibling as parent (A).
|
||||
# Then we try to walk the tree starting from every stream shard that is not pointed to as a parent. Depending on which generation that stream shard is in,
|
||||
# the path will have different length.
|
||||
def test_get_records_with_alternating_tablets_count(dynamodb, dynamodbstreams, rest_api, cql):
|
||||
tablet_multipliers = [1, 2, 4, 8, 16, 8, 4, 2, 1]
|
||||
writes_per_tablet_multiplier = 100
|
||||
partition_count = 32
|
||||
|
||||
with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES', Tags=TABLET_TAGS) as table:
|
||||
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
||||
|
||||
ks = f'alternator_{table.name}'
|
||||
table_name = table.name
|
||||
cdc_log_table_name = f'{table_name}_scylla_cdc_log'
|
||||
init_table_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name)
|
||||
|
||||
# --- Phase 1: Drive tablet count changes and write data ---
|
||||
# For each multiplier, alter tablet count and write writes_per_tablet_multiplier
|
||||
# items to the table. Each write uses a small, bounded set of partition keys
|
||||
# (0..partition_count-1) to force per-partition ordering collisions, which
|
||||
# lets us later verify that events within a partition appear in write order.
|
||||
# `e` is a monotonically increasing counter that encodes the write order.
|
||||
index = 0
|
||||
expected_items = []
|
||||
retrieved_items = []
|
||||
for tablet_mult in tablet_multipliers:
|
||||
set_tablet_count_and_wait(rest_api, cql, ks, table_name, cdc_log_table_name, init_table_count * tablet_mult)
|
||||
for _ in range(0, writes_per_tablet_multiplier):
|
||||
p = str(random.randint(0, partition_count - 1))
|
||||
index += 1
|
||||
# we want to partition keys by small set of partitions to force key collisions
|
||||
# to detect any ordering issues within a partition
|
||||
c = '1'
|
||||
e = str(index)
|
||||
table.put_item(Item={'p': p, 'c': c, 'e': e})
|
||||
expected_items.append((p, c, e))
|
||||
|
||||
# --- Phase 2: Read all stream records via GetRecords, building shard topology ---
|
||||
# Iterate DescribeStream in a retry loop until all expected items have been
|
||||
# retrieved from the stream. For each shard discovered:
|
||||
# - Record its parent in shard_parents_map (child -> parent).
|
||||
# - Track root shards (those without a parent).
|
||||
# - Create a GetShardIterator starting at the shard's StartingSequenceNumber
|
||||
# and later call GetRecords to drain it, collecting all the formerly put (p, c, e) triples.
|
||||
iterators = {}
|
||||
shard_parents_map = {}
|
||||
root_shard_ids = []
|
||||
end_ts = time.time() + 30
|
||||
while len(retrieved_items) < len(expected_items):
|
||||
for shard in iterate_over_describe_stream(dynamodbstreams, arn, end_ts):
|
||||
shard_id = shard['ShardId']
|
||||
parent_shard_id = shard.get('ParentShardId', None)
|
||||
if parent_shard_id is None:
|
||||
if shard_id not in root_shard_ids:
|
||||
root_shard_ids.append(shard_id)
|
||||
elif shard_id in shard_parents_map:
|
||||
assert shard_parents_map[shard_id] == parent_shard_id
|
||||
else:
|
||||
shard_parents_map[shard_id] = parent_shard_id
|
||||
if shard_id in iterators:
|
||||
continue
|
||||
start = shard['SequenceNumberRange']['StartingSequenceNumber']
|
||||
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']
|
||||
assert iter is not None
|
||||
iterators[shard_id] = iter
|
||||
|
||||
|
||||
for shard_id, iter in iterators.items():
|
||||
response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
|
||||
if 'NextShardIterator' in response:
|
||||
iterators[shard_id] = response['NextShardIterator']
|
||||
|
||||
records = response.get('Records', [])
|
||||
for record in records:
|
||||
dynamodb = record['dynamodb']
|
||||
keys = dynamodb['NewImage']
|
||||
|
||||
assert set(keys) == set(['p', 'c', 'e'])
|
||||
p = keys['p'].get('S')
|
||||
c = keys['c'].get('S')
|
||||
e = keys['e'].get('S')
|
||||
retrieved_items.append((p, c, e))
|
||||
|
||||
# --- Phase 3: Verify completeness and per-partition ordering of stream records ---
|
||||
# Check that the set of records retrieved from the stream exactly matches the
|
||||
# set of items written (same count, same content when sorted).
|
||||
# Then verify that for every partition key `p`, the events appear in the same
|
||||
# order they were written: the `e` value (write index) must be strictly
|
||||
# increasing across successive reads for the same `p`.
|
||||
assert len(retrieved_items) == len(expected_items)
|
||||
assert sorted(retrieved_items) == sorted(expected_items)
|
||||
previous_values = {}
|
||||
# we iterate over retrieved items here in an order of how they were read from the stream
|
||||
# implementation might (and will) reorder items for data with different partition keys as it pleases,
|
||||
# but for the same partition key the order of events must be preserved.
|
||||
# `previous_values` keeps track of last `e` value seen for each partition key `p`. When written for the same `p`
|
||||
# next value will always have `e` greater than previous one.
|
||||
for p, c, e in retrieved_items:
|
||||
e = int(e)
|
||||
pv = previous_values.get(p, -1)
|
||||
assert pv < e
|
||||
previous_values[p] = e
|
||||
|
||||
# --- Phase 4: Verify shard topology - root shard count ---
|
||||
# The number of root shards (those without a parent) must equal the initial
|
||||
# tablet count.
|
||||
assert len(root_shard_ids) == init_table_count
|
||||
|
||||
# --- Phase 5: Verify shard topology - generational structure and path lengths ---
|
||||
# Build `streams_superseded_by_next_gen`: the set of shard IDs that appear as a parent of some
|
||||
# other shard. Shards NOT in this set are "leaves" (the most recent generation).
|
||||
#
|
||||
# Group all shards (except root ones) by their generation, inferred from the
|
||||
# timestamp embedded in the shard ID (characters [2:13]).
|
||||
#
|
||||
# For each leaf shard, walk the parent chain up to the root (get_path), then
|
||||
# assert that the number of siblings in the leaf's generation equals
|
||||
# `init_table_count * tablet_multipliers[len(path) - 1]`.
|
||||
# This verifies the expected doubling/halving pattern produced by the
|
||||
# alternating tablet_multipliers sequence.
|
||||
streams_superseded_by_next_gen = set()
|
||||
for (shard_id, parent_shard_id) in shard_parents_map.items():
|
||||
streams_superseded_by_next_gen.add(parent_shard_id)
|
||||
|
||||
def get_generation_from_shard(t):
|
||||
# Shard IDs have the format "H<hex_generation_ts>:<hex_token>", e.g.
|
||||
# "H 19b22b8563a:7fffffffffffffffe8547ce46400000"
|
||||
# Characters [2:13] extract the hex generation timestamp, which groups
|
||||
# shards that were created together during the same tablet count change.
|
||||
return t[2:13]
|
||||
count_map = {}
|
||||
for r in shard_parents_map:
|
||||
gen = get_generation_from_shard(r)
|
||||
count = count_map.get(gen, 0)
|
||||
count_map[gen] = count + 1
|
||||
# Returns the ancestor chain from shard_id up to the root (a shard with no parent).
|
||||
# The returned list is ordered [shard_id, parent, grandparent, ..., root].
|
||||
def get_path(shard_id):
|
||||
path = [ shard_id ]
|
||||
current_shard_id = shard_id
|
||||
while True:
|
||||
parent_shard_id = shard_parents_map.get(current_shard_id, None)
|
||||
if parent_shard_id is None:
|
||||
return path
|
||||
path.append(parent_shard_id)
|
||||
current_shard_id = parent_shard_id
|
||||
for r in shard_parents_map:
|
||||
if r not in streams_superseded_by_next_gen:
|
||||
path = get_path(r)
|
||||
siblings_count = count_map[get_generation_from_shard(r)]
|
||||
assert siblings_count == init_table_count * tablet_multipliers[len(path) - 1]
|
||||
@@ -131,26 +131,6 @@ def test_tablets_tag_vs_config(dynamodb):
|
||||
with new_test_table(dynamodb, **schema_vnodes) as table:
|
||||
pass
|
||||
|
||||
# Before Alternator Streams is supported with tablets (#23838), let's verify
|
||||
# that enabling Streams results in an orderly error. This test should be
|
||||
# deleted when #23838 is fixed.
|
||||
def test_streams_enable_error_with_tablets(dynamodb):
|
||||
# Test attempting to create a table already with streams
|
||||
with pytest.raises(ClientError, match='ValidationException.*tablets'):
|
||||
with new_test_table(dynamodb,
|
||||
Tags=[{'Key': initial_tablets_tag, 'Value': '4'}],
|
||||
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||||
pass
|
||||
# Test attempting to add a stream to an existing table
|
||||
with new_test_table(dynamodb,
|
||||
Tags=[{'Key': initial_tablets_tag, 'Value': '4'}],
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||||
with pytest.raises(ClientError, match='ValidationException.*tablets'):
|
||||
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
|
||||
|
||||
# For a while (see #18068) it was possible to create an Alternator table with
|
||||
# tablets enabled and choose LWT for write isolation (always_use_lwt)
|
||||
# but the writes themselves failed. This test verifies that this is no longer
|
||||
|
||||
@@ -655,12 +655,6 @@ def test_ttl_expiration_lsi_key(dynamodb, waits_for_expiration):
|
||||
# content), and a special userIdentity flag saying that this is not a regular
|
||||
# REMOVE but an expiration. Reproduces issue #11523.
|
||||
def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration):
|
||||
# Alternator Streams currently doesn't work with tablets, so until
|
||||
# #23838 is solved, skip this test on tablets.
|
||||
for tag in TAGS:
|
||||
if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit():
|
||||
pytest.skip("Streams test skipped on tablets due to #23838")
|
||||
|
||||
# In my experiments, a 30-minute (1800 seconds) is the typical
|
||||
# expiration delay in this test. If the test doesn't finish within
|
||||
# max_duration, we report a failure.
|
||||
|
||||
Reference in New Issue
Block a user