alternator: implement streams for tablets

Add a code, that will handle Streams reading, when table is
using tablets underneath.

Fixes #23838
This commit is contained in:
Radosław Cybulski
2026-01-05 19:14:53 +01:00
parent eb35a7b6ce
commit d5df3ec07c

View File

@@ -896,10 +896,22 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// TODO: label
// TODO: creation time
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
std::map<db_clock::time_point, cdc::streams_version> topologies;
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
if (schema->table().uses_tablets()) {
// We can't use table creation time here, as tablets might report a
// generation timestamp just before table creation. This is safe
// because CDC generations are per-table and cannot pre-date the
// table, so expanding the window won't pull in unrelated data.
auto low_ts = db_clock::now() - ttl;
topologies = co_await _system_keyspace.read_cdc_for_tablets_versioned_streams(bs->ks_name(), bs->cf_name(), low_ts);
} else {
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
}
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
const auto e = topologies.end();
std::optional<shard_id> shard_filter;
@@ -1485,9 +1497,15 @@ future<executor::request_return_type> executor::get_records(client_state& client
}
// ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
db_clock::time_point ts;
if (schema->table().uses_tablets()) {
ts = co_await _system_keyspace.read_cdc_for_tablets_current_generation_timestamp(base->ks_name(), base->cf_name());
} else {
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
}
auto& shard = iter.shard;
if (shard.time < ts && ts < high_ts) {