diff --git a/alternator/streams.cc b/alternator/streams.cc index 1f77229e6d..a2e0eb701c 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -896,10 +896,22 @@ future executor::describe_stream(client_state& cl // TODO: label // TODO: creation time - // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) - auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + std::map topologies; + + // filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h) + if (schema->table().uses_tablets()) { + // We can't use table creation time here, as tablets might report a + // generation timestamp just before table creation. This is safe + // because CDC generations are per-table and cannot pre-date the + // table, so expanding the window won't pull in unrelated data. + auto low_ts = db_clock::now() - ttl; + topologies = co_await _system_keyspace.read_cdc_for_tablets_versioned_streams(bs->ks_name(), bs->cf_name(), low_ts); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl); + topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); + } - std::map topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }); const auto e = topologies.end(); std::optional shard_filter; @@ -1485,9 +1497,15 @@ future executor::get_records(client_state& client } // ugh. figure out if we are and end-of-shard - auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); - db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + db_clock::time_point ts; + if (schema->table().uses_tablets()) { + ts = co_await _system_keyspace.read_cdc_for_tablets_current_generation_timestamp(base->ks_name(), base->cf_name()); + } else { + auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners(); + ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners }); + } + auto& shard = iter.shard; if (shard.time < ts && ts < high_ts) {