/* * Copyright (C) 2019-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include #include "gms/endpoint_state.hh" #include "gms/versioned_value.hh" #include "keys/keys.hh" #include "replica/database.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "dht/token-sharding.hh" #include "locator/token_metadata.hh" #include "types/set.hh" #include "gms/application_state.hh" #include "gms/inet_address.hh" #include "gms/gossiper.hh" #include "gms/feature_service.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/UUID_gen.hh" #include "utils/stall_free.hh" #include "utils/to_string.hh" #include "cdc/generation.hh" #include "cdc/cdc_options.hh" #include "cdc/generation_service.hh" #include "cdc/log.hh" extern logging::logger cdc_log; static int get_shard_count(const locator::host_id& endpoint, const gms::gossiper& g) { auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT); return ep_state ? std::stoi(ep_state->value()) : -1; } static unsigned get_sharding_ignore_msb(const locator::host_id& endpoint, const gms::gossiper& g) { auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS); return ep_state ? std::stoi(ep_state->value()) : 0; } namespace db { extern thread_local data_type cdc_streams_set_type; } namespace cdc { api::timestamp_clock::duration get_generation_leeway() { static thread_local auto generation_leeway = std::chrono::duration_cast(std::chrono::seconds(5)); utils::get_local_injector().inject("increase_cdc_generation_leeway", [&] { generation_leeway = std::chrono::duration_cast(std::chrono::minutes(5)); }); return generation_leeway; } stream_state read_stream_state(int8_t val) { if (val != std::to_underlying(stream_state::current) && val != std::to_underlying(stream_state::closed) && val != std::to_underlying(stream_state::opened)) { throw std::runtime_error(format("invalid value {} for stream state", val)); } return static_cast(val); } static void copy_int_to_bytes(int64_t i, size_t offset, bytes& b) { i = net::hton(i); std::copy_n(reinterpret_cast(&i), sizeof(int64_t), b.begin() + offset); } static constexpr auto stream_id_version_bits = 4; static constexpr auto stream_id_random_bits = 38; static constexpr auto stream_id_index_bits = sizeof(uint64_t)*8 - stream_id_version_bits - stream_id_random_bits; static constexpr auto stream_id_version_shift = 0; static constexpr auto stream_id_index_shift = stream_id_version_shift + stream_id_version_bits; static constexpr auto stream_id_random_shift = stream_id_index_shift + stream_id_index_bits; /** * Responsibility for encoding stream_id moved from the create_stream_ids * function to this constructor, to keep knowledge of composition in a * single place. Note the make_new_generation_description function * defines the "order" in which we view vnodes etc. */ stream_id::stream_id(dht::token token, size_t vnode_index) : _value(bytes::initialized_later(), 2 * sizeof(int64_t)) { static thread_local std::mt19937_64 rand_gen(std::random_device{}()); static thread_local std::uniform_int_distribution rand_dist; auto rand = rand_dist(rand_gen); auto mask_shift = [](uint64_t val, size_t bits, size_t shift) { return (val & ((1ull << bits) - 1u)) << shift; }; /** * Low qword: * 0-4: version * 5-26: vnode index as when created (see generation below). This excludes shards * 27-64: random value (maybe to be replaced with timestamp) */ auto low_qword = mask_shift(version_1, stream_id_version_bits, stream_id_version_shift) | mask_shift(vnode_index, stream_id_index_bits, stream_id_index_shift) | mask_shift(rand, stream_id_random_bits, stream_id_random_shift) ; copy_int_to_bytes(dht::token::to_int64(token), 0, _value); copy_int_to_bytes(low_qword, sizeof(int64_t), _value); // not a hot code path. make sure we did not mess up the shifts and masks. SCYLLA_ASSERT(version() == version_1); SCYLLA_ASSERT(index() == vnode_index); } stream_id::stream_id(bytes b) : _value(std::move(b)) { // this is not a very solid check. Id:s previous to GA/versioned id:s // have fully random bits in low qword, so this could go either way... if (version() > version_1) { throw std::invalid_argument("Unknown CDC stream id version"); } } bool stream_id::is_set() const { return !_value.empty(); } static int64_t bytes_to_int64(bytes_view b, size_t offset) { SCYLLA_ASSERT(b.size() >= offset + sizeof(int64_t)); int64_t res; std::copy_n(b.begin() + offset, sizeof(int64_t), reinterpret_cast(&res)); return net::ntoh(res); } dht::token stream_id::token() const { return dht::token::from_int64(token_from_bytes(_value)); } int64_t stream_id::token_from_bytes(bytes_view b) { return bytes_to_int64(b, 0); } static uint64_t unpack_value(bytes_view b, size_t off, size_t shift, size_t bits) { return (uint64_t(bytes_to_int64(b, off)) >> shift) & ((1ull << bits) - 1u); } uint8_t stream_id::version() const { return unpack_value(_value, sizeof(int64_t), stream_id_version_shift, stream_id_version_bits); } size_t stream_id::index() const { return unpack_value(_value, sizeof(int64_t), stream_id_index_shift, stream_id_index_bits); } const bytes& stream_id::to_bytes() const { return _value; } bytes stream_id::to_bytes() && { return std::move(_value); } partition_key stream_id::to_partition_key(const schema& log_schema) const { return partition_key::from_single_value(log_schema, _value); } bool token_range_description::operator==(const token_range_description& o) const { return token_range_end == o.token_range_end && streams == o.streams && sharding_ignore_msb == o.sharding_ignore_msb; } topology_description::topology_description(utils::chunked_vector entries) : _entries(std::move(entries)) {} bool topology_description::operator==(const topology_description& o) const { return _entries == o._entries; } const utils::chunked_vector& topology_description::entries() const& { return _entries; } utils::chunked_vector&& topology_description::entries() && { return std::move(_entries); } future topology_description::clone_async() const { utils::chunked_vector vec{}; co_await utils::reserve_gently(vec, _entries.size()); for (const auto& entry : _entries) { vec.push_back(entry); co_await coroutine::maybe_yield(); } co_return topology_description{std::move(vec)}; } static std::vector create_stream_ids( size_t index, dht::token start, dht::token end, size_t shard_count, uint8_t ignore_msb) { std::vector result; result.reserve(shard_count); dht::static_sharder sharder(shard_count, ignore_msb); for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) { auto t = dht::find_first_token_for_shard(sharder, start, end, shard_idx); // compose the id from token and the "index" of the range end owning vnode // as defined by token sort order. Basically grouping within this // shard set. result.emplace_back(stream_id(t, index)); } return result; } bool should_propose_first_generation(const locator::host_id& my_host_id, const gms::gossiper& g) { return g.for_each_endpoint_state_until([&] (const gms::endpoint_state& eps) { return stop_iteration(my_host_id < eps.get_host_id()); }) == stop_iteration::no; } bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm) { if (tm.sorted_tokens().size() != gen.entries().size()) { // We probably have garbage streams from old generations cdc_log.info("Generation size does not match the token ring"); return false; } else { std::unordered_set gen_ends; for (const auto& entry : gen.entries()) { gen_ends.insert(entry.token_range_end); } for (const auto& metadata_token : tm.sorted_tokens()) { if (!gen_ends.contains(metadata_token)) { cdc_log.warn("CDC generation missing token {}", metadata_token); return false; } } return true; } } static future> get_common_cdc_generation_mutations( schema_ptr s, const partition_key& pkey, noncopyable_function&& get_ckey_from_range_end, const cdc::topology_description& desc, size_t mutation_size_threshold, api::timestamp_type ts) { utils::chunked_vector res; res.emplace_back(s, pkey); size_t size_estimate = 0; size_t total_size_estimate = 0; for (auto& e : desc.entries()) { if (size_estimate >= mutation_size_threshold) { total_size_estimate += size_estimate; res.emplace_back(s, pkey); size_estimate = 0; } set_type_impl::native_type streams; streams.reserve(e.streams.size()); for (auto& stream: e.streams) { streams.push_back(data_value(stream.to_bytes())); } size_estimate += e.streams.size() * 20; auto ckey = get_ckey_from_range_end(e.token_range_end); res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts); res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts); co_await coroutine::maybe_yield(); } total_size_estimate += size_estimate; // Copy mutations n times, where n is picked so that the memory size of all mutations together exceeds `max_command_size`. utils::get_local_injector().inject("cdc_generation_mutations_replication", [&res, total_size_estimate, mutation_size_threshold] { utils::chunked_vector new_res; size_t number_of_copies = (mutation_size_threshold / total_size_estimate + 1) * 2; for (size_t i = 0; i < number_of_copies; ++i) { std::copy(res.begin(), res.end(), std::back_inserter(new_res)); } res = std::move(new_res); }); co_return res; } future> get_cdc_generation_mutations_v2( schema_ptr s, utils::UUID id, const cdc::topology_description& desc, size_t mutation_size_threshold, api::timestamp_type ts) { auto pkey = partition_key::from_singular(*s, id); auto get_ckey = [s] (dht::token range_end) { return clustering_key::from_singular(*s, dht::token::to_int64(range_end)); }; auto res = co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts); res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); co_return res; } future> get_cdc_generation_mutations_v3( schema_ptr s, utils::UUID id, const cdc::topology_description& desc, size_t mutation_size_threshold, api::timestamp_type ts) { auto pkey = partition_key::from_singular(*s, CDC_GENERATIONS_V3_KEY); auto get_ckey = [&] (dht::token range_end) { return clustering_key::from_exploded(*s, {timeuuid_type->decompose(id), long_type->decompose(dht::token::to_int64(range_end))}) ; }; co_return co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts); } // non-static for testing size_t limit_of_streams_in_topology_description() { // Each stream takes 16B and we don't want to exceed 4MB so we can have // at most 262144 streams but not less than 1 per vnode. return 4 * 1024 * 1024 / 16; } // non-static for testing topology_description limit_number_of_streams_if_needed(topology_description&& desc) { uint64_t streams_count = 0; for (auto& tr_desc : desc.entries()) { streams_count += tr_desc.streams.size(); } size_t limit = std::max(limit_of_streams_in_topology_description(), desc.entries().size()); if (limit >= streams_count) { return std::move(desc); } size_t streams_per_vnode_limit = limit / desc.entries().size(); auto entries = std::move(desc).entries(); auto start = entries.back().token_range_end; for (size_t idx = 0; idx < entries.size(); ++idx) { auto end = entries[idx].token_range_end; if (entries[idx].streams.size() > streams_per_vnode_limit) { entries[idx].streams = create_stream_ids(idx, start, end, streams_per_vnode_limit, entries[idx].sharding_ignore_msb); } start = end; } return topology_description(std::move(entries)); } // Compute a set of tokens that split the token ring into vnodes. static auto get_tokens(const std::unordered_set& bootstrap_tokens, const locator::token_metadata_ptr tmptr) { auto tokens = tmptr->sorted_tokens(); auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end()); std::sort(it, tokens.end()); std::inplace_merge(tokens.begin(), it, tokens.end()); tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end()); return tokens; } static token_range_description create_token_range_description( size_t index, dht::token start, dht::token end, const noncopyable_function (dht::token)>& get_sharding_info) { token_range_description desc; desc.token_range_end = end; auto [shard_count, ignore_msb] = get_sharding_info(end); desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); desc.sharding_ignore_msb = ignore_msb; return desc; } cdc::topology_description make_new_generation_description( const std::unordered_set& bootstrap_tokens, const noncopyable_function(dht::token)>& get_sharding_info, const locator::token_metadata_ptr tmptr) { const auto tokens = get_tokens(bootstrap_tokens, tmptr); if (tokens.empty()) { on_internal_error(cdc_log, "Attempted to create a CDC generation from an empty list of tokens"); } utils::chunked_vector vnode_descriptions; vnode_descriptions.reserve(tokens.size()); vnode_descriptions.push_back(create_token_range_description(0, tokens.back(), tokens.front(), get_sharding_info)); for (size_t idx = 1; idx < tokens.size(); ++idx) { vnode_descriptions.push_back(create_token_range_description(idx, tokens[idx - 1], tokens[idx], get_sharding_info)); } return {std::move(vnode_descriptions)}; } db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) { using namespace std::chrono; using namespace std::chrono_literals; auto ts = db_clock::now(); if (add_delay && ring_delay != 0ms) { ts += 2 * ring_delay + duration_cast(get_generation_leeway()); } return ts; } future generation_service::legacy_make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay) { const locator::token_metadata_ptr tmptr = _token_metadata.get(); // Fetch sharding parameters for a node that owns vnode ending with this token // using gossiped application states. auto get_sharding_info = [&] (dht::token end) -> std::pair { if (bootstrap_tokens.contains(end)) { return {smp::count, _cfg.ignore_msb_bits}; } else { auto endpoint = tmptr->get_endpoint(end); if (!endpoint) { throw std::runtime_error( format("Can't find endpoint for token {}", end)); } auto sc = get_shard_count(*endpoint, _gossiper); return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)}; } }; auto uuid = utils::make_random_uuid(); auto gen = make_new_generation_description(bootstrap_tokens, get_sharding_info, tmptr); // Our caller should ensure that there are normal tokens in the token ring. auto normal_token_owners = tmptr->count_normal_token_owners(); SCYLLA_ASSERT(normal_token_owners); if (_feature_service.cdc_generations_v2) { cdc_log.info("Inserting new generation data at UUID {}", uuid); // This may take a while. co_await _sys_dist_ks.local().insert_cdc_generation(uuid, gen, { normal_token_owners }); // Begin the race. cdc::generation_id_v2 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay), uuid}; cdc_log.info("New CDC generation: {}", gen_id); co_return gen_id; } // The CDC_GENERATIONS_V2 feature is not enabled: some nodes may still not understand the V2 format. // We must create a generation in the old format. // If the cluster is large we may end up with a generation that contains // large number of streams. This is problematic because we store the // generation in a single row (V1 format). For a generation with large number of rows // this will lead to a row that can be as big as 32MB. This is much more // than the limit imposed by commitlog_segment_size_in_mb. If the size of // the row that describes a new generation grows above // commitlog_segment_size_in_mb, the write will fail and the new node won't // be able to join. To avoid such problem we make sure that such row is // always smaller than 4MB. We do that by removing some CDC streams from // each vnode if the total number of streams is too large. gen = limit_number_of_streams_if_needed(std::move(gen)); cdc_log.warn( "Creating a new CDC generation in the old storage format due to a partially upgraded cluster:" " the CDC_GENERATIONS_V2 feature is known by this node, but not enabled in the cluster." " The old storage format forces us to create a suboptimal generation." " It is recommended to finish the upgrade and then create a new generation either by bootstrapping" " a new node or running the checkAndRepairCdcStreams nodetool command."); // Begin the race. cdc::generation_id_v1 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay)}; co_await _sys_dist_ks.local().insert_cdc_topology_description(gen_id, std::move(gen), { normal_token_owners }); cdc_log.info("New CDC generation: {}", gen_id); co_return gen_id; } /* Retrieves CDC streams generation timestamp from the given endpoint's application state (broadcasted through gossip). * We might be during a rolling upgrade, so the timestamp might not be there (if the other node didn't upgrade yet), * but if the cluster already supports CDC, then every newly joining node will propose a new CDC generation, * which means it will gossip the generation's timestamp. */ static std::optional get_generation_id_for(const locator::host_id& endpoint, const gms::endpoint_state& eps) { const auto* gen_id_ptr = eps.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID); if (!gen_id_ptr) { return std::nullopt; } auto gen_id_string = gen_id_ptr->value(); cdc_log.trace("endpoint={}, gen_id_string={}", endpoint, gen_id_string); return gms::versioned_value::cdc_generation_id_from_string(gen_id_string); } static future> retrieve_generation_data_v2( cdc::generation_id_v2 id, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks) { auto cdc_gen = co_await sys_dist_ks.read_cdc_generation(id.id); if (!cdc_gen && id.id.is_timestamp()) { // If we entered legacy mode due to recovery, we (or some other node) // might gossip about a generation that was previously propagated // through raft. If that's the case, it will sit in // the system.cdc_generations_v3 table. // // If the provided id is not a timeuuid, we don't want to query // the system.cdc_generations_v3 table. This table stores generation // ids as timeuuids. If the provided id is not a timeuuid, the // generation cannot be in system.cdc_generations_v3. Also, the query // would fail with a marshaling error. cdc_gen = co_await sys_ks.read_cdc_generation_opt(id.id); } co_return cdc_gen; } static future> retrieve_generation_data( cdc::generation_id gen_id, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks, db::system_distributed_keyspace::context ctx) { return std::visit(make_visitor( [&] (const cdc::generation_id_v1& id) { return sys_dist_ks.read_cdc_topology_description(id, ctx); }, [&] (const cdc::generation_id_v2& id) { return retrieve_generation_data_v2(id, sys_ks, sys_dist_ks); } ), gen_id); } static future<> do_update_streams_description( cdc::generation_id gen_id, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks, db::system_distributed_keyspace::context ctx) { if (co_await sys_dist_ks.cdc_desc_exists(get_ts(gen_id), ctx)) { cdc_log.info("Generation {}: streams description table already updated.", gen_id); co_return; } // We might race with another node also inserting the description, but that's ok. It's an idempotent operation. auto topo = co_await retrieve_generation_data(gen_id, sys_ks, sys_dist_ks, ctx); if (!topo) { throw no_generation_data_exception(gen_id); } co_await sys_dist_ks.create_cdc_desc(get_ts(gen_id), *topo, ctx); cdc_log.info("CDC description table successfully updated with generation {}.", gen_id); } /* Inform CDC users about a generation of streams (identified by the given timestamp) * by inserting it into the cdc_streams table. * * Assumes that the cdc_generation_descriptions table contains this generation. * * Returning from this function does not mean that the table update was successful: the function * might run an asynchronous task in the background. */ static future<> update_streams_description( cdc::generation_id gen_id, db::system_keyspace& sys_ks, shared_ptr sys_dist_ks, noncopyable_function get_num_token_owners, abort_source& abort_src) { try { co_await do_update_streams_description(gen_id, sys_ks, *sys_dist_ks, { get_num_token_owners() }); } catch (...) { cdc_log.warn( "Could not update CDC description table with generation {}: {}. Will retry in the background.", gen_id, std::current_exception()); // It is safe to discard this future: we keep system distributed keyspace alive. (void)(([] (cdc::generation_id gen_id, db::system_keyspace& sys_ks, shared_ptr sys_dist_ks, noncopyable_function get_num_token_owners, abort_source& abort_src) -> future<> { while (true) { try { co_await sleep_abortable(std::chrono::seconds(60), abort_src); } catch (seastar::sleep_aborted&) { cdc_log.warn( "Aborted update CDC description table with generation {}", gen_id); co_return; } try { co_await do_update_streams_description(gen_id, sys_ks, *sys_dist_ks, { get_num_token_owners() }); co_return; } catch (...) { cdc_log.warn( "Could not update CDC description table with generation {}: {}. Will try again.", gen_id, std::current_exception()); } } })(gen_id, sys_ks, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src)); } } static db_clock::time_point as_timepoint(const utils::UUID& uuid) { return db_clock::time_point(utils::UUID_gen::unix_timestamp(uuid)); } static future> get_cdc_desc_v1_timestamps( db::system_distributed_keyspace& sys_dist_ks, abort_source& abort_src, const noncopyable_function& get_num_token_owners) { while (true) { try { co_return co_await sys_dist_ks.get_cdc_desc_v1_timestamps({ get_num_token_owners() }); } catch (...) { cdc_log.warn( "Failed to retrieve generation timestamps for rewriting: {}. Retrying in 60s.", std::current_exception()); } co_await sleep_abortable(std::chrono::seconds(60), abort_src); } } // Contains a CDC log table's creation time (extracted from its schema's id) // and its CDC TTL setting. struct time_and_ttl { db_clock::time_point creation_time; int ttl; }; /* * See `maybe_rewrite_streams_descriptions`. * This is the long-running-in-the-background part of that function. * It returns the timestamp of the last rewritten generation (if any). */ static future> rewrite_streams_descriptions( std::vector times_and_ttls, db::system_keyspace& sys_ks, shared_ptr sys_dist_ks, noncopyable_function get_num_token_owners, abort_source& abort_src) { cdc_log.info("Retrieving generation timestamps for rewriting..."); auto tss = co_await get_cdc_desc_v1_timestamps(*sys_dist_ks, abort_src, get_num_token_owners); cdc_log.info("Generation timestamps retrieved."); // Find first generation timestamp such that some CDC log table may contain data before this timestamp. // This predicate is monotonic w.r.t the timestamps. auto now = db_clock::now(); std::sort(tss.begin(), tss.end()); auto first = std::partition_point(tss.begin(), tss.end(), [&] (db_clock::time_point ts) { // partition_point finds first element that does *not* satisfy the predicate. return std::none_of(times_and_ttls.begin(), times_and_ttls.end(), [&] (const time_and_ttl& tat) { // In this CDC log table there are no entries older than the table's creation time // or (now - the table's ttl). We subtract 10s to account for some possible clock drift. // If ttl is set to 0 then entries in this table never expire. In that case we look // only at the table's creation time. auto no_entries_older_than = (tat.ttl == 0 ? tat.creation_time : std::max(tat.creation_time, now - std::chrono::seconds(tat.ttl))) - std::chrono::seconds(10); return no_entries_older_than < ts; }); }); // Find first generation timestamp such that some CDC log table may contain data in this generation. // This and all later generations need to be written to the new streams table. if (first != tss.begin()) { --first; } if (first == tss.end()) { cdc_log.info("No generations to rewrite."); co_return std::nullopt; } cdc_log.info("First generation to rewrite: {}", *first); bool each_success = true; co_await max_concurrent_for_each(first, tss.end(), 10, [&] (db_clock::time_point ts) -> future<> { while (true) { try { co_return co_await do_update_streams_description(cdc::generation_id_v1{ts}, sys_ks, *sys_dist_ks, { get_num_token_owners() }); } catch (const no_generation_data_exception& e) { cdc_log.error("Failed to rewrite streams for generation {}: {}. Giving up.", ts, e); each_success = false; co_return; } catch (...) { cdc_log.warn("Failed to rewrite streams for generation {}: {}. Retrying in 60s.", ts, std::current_exception()); } co_await sleep_abortable(std::chrono::seconds(60), abort_src); } }); if (each_success) { cdc_log.info("Rewriting stream tables finished successfully."); } else { cdc_log.info("Rewriting stream tables finished, but some generations could not be rewritten (check the logs)."); } if (first != tss.end()) { co_return cdc::generation_id_v1{*std::prev(tss.end())}; } co_return std::nullopt; } future<> generation_service::maybe_rewrite_streams_descriptions() { if (!_db.has_schema(_sys_dist_ks.local().NAME, _sys_dist_ks.local().CDC_DESC_V1)) { // This cluster never went through a Scylla version which used this table // or the user deleted the table. Nothing to do. co_return; } if (co_await _sys_ks.local().cdc_is_rewritten()) { co_return; } if (_cfg.dont_rewrite_streams) { cdc_log.warn("Stream rewriting disabled. Manual administrator intervention may be required..."); co_return; } // For each CDC log table get the TTL setting (from CDC options) and the table's creation time std::vector times_and_ttls; _db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr t) { auto& s = *t->schema(); auto base = cdc::get_base_table(_db, s.ks_name(), s.cf_name()); if (!base) { // Not a CDC log table. return; } auto& cdc_opts = base->cdc_options(); if (!cdc_opts.enabled()) { // This table is named like a CDC log table but it's not one. return; } times_and_ttls.push_back(time_and_ttl{as_timepoint(s.id().uuid()), cdc_opts.ttl()}); }); if (times_and_ttls.empty()) { // There's no point in rewriting old generations' streams (they don't contain any data). cdc_log.info("No CDC log tables present, not rewriting stream tables."); co_return co_await _sys_ks.local().cdc_set_rewritten(std::nullopt); } auto get_num_token_owners = [tm = _token_metadata.get()] { return tm->count_normal_token_owners(); }; // This code is racing with node startup. At this point, we're most likely still waiting for gossip to settle // and some nodes that are UP may still be marked as DOWN by us. // Let's sleep a bit to increase the chance that the first attempt at rewriting succeeds (it's still ok if // it doesn't - we'll retry - but it's nice if we succeed without any warnings). co_await sleep_abortable(std::chrono::seconds(10), _abort_src); cdc_log.info("Rewriting stream tables in the background..."); auto last_rewritten = co_await rewrite_streams_descriptions( std::move(times_and_ttls), _sys_ks.local(), _sys_dist_ks.local_shared(), std::move(get_num_token_owners), _abort_src); co_await _sys_ks.local().cdc_set_rewritten(last_rewritten); } static void assert_shard_zero(const sstring& where) { if (this_shard_id() != 0) { on_internal_error(cdc_log, format("`{}`: must be run on shard 0", where)); } } class and_reducer { private: bool _result = true; public: future<> operator()(bool value) { _result = value && _result; return make_ready_future<>(); } bool get() { return _result; } }; class or_reducer { private: bool _result = false; public: future<> operator()(bool value) { _result = value || _result; return make_ready_future<>(); } bool get() { return _result; } }; class generation_handling_nonfatal_exception : public std::runtime_error { using std::runtime_error::runtime_error; }; constexpr char could_not_retrieve_msg_template[] = "Could not retrieve CDC streams with timestamp {} upon gossip event. Reason: \"{}\". Action: {}."; generation_service::generation_service( config cfg, gms::gossiper& g, sharded& sys_dist_ks, sharded& sys_ks, abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f, replica::database& db, std::function raft_topology_change_enabled) : _cfg(std::move(cfg)) , _gossiper(g) , _sys_dist_ks(sys_dist_ks) , _sys_ks(sys_ks) , _abort_src(abort_src) , _token_metadata(stm) , _feature_service(f) , _db(db) , _raft_topology_change_enabled(std::move(raft_topology_change_enabled)) { } future<> generation_service::stop() { try { co_await std::move(_cdc_streams_rewrite_complete); } catch (...) { cdc_log.error("CDC stream rewrite failed: ", std::current_exception()); } if (_joined && (this_shard_id() == 0)) { co_await leave_ring(); } _stopped = true; } generation_service::~generation_service() { SCYLLA_ASSERT(_stopped); } future<> generation_service::after_join(std::optional&& startup_gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); _gen_id = std::move(startup_gen_id); _gossiper.register_(shared_from_this()); _joined = true; // Retrieve the latest CDC generation seen in gossip (if any). co_await legacy_scan_cdc_generations(); // Ensure that the new CDC stream description table has all required streams. // See the function's comment for details. // // Since this depends on the entire cluster (and therefore we cannot guarantee // timely completion), run it in the background and wait for it in stop(). _cdc_streams_rewrite_complete = maybe_rewrite_streams_descriptions(); } future<> generation_service::leave_ring() { assert_shard_zero(__PRETTY_FUNCTION__); _joined = false; co_await _gossiper.unregister_(shared_from_this()); } future<> generation_service::on_join(gms::inet_address ep, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) { return on_change(ep, id, ep_state->get_application_state_map(), pid); } future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) { assert_shard_zero(__PRETTY_FUNCTION__); if (_raft_topology_change_enabled()) { return make_ready_future<>(); } return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) { auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value()); cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id); return legacy_handle_cdc_generation(gen_id); }); } future<> generation_service::check_and_repair_cdc_streams() { // FIXME: support Raft group 0-based topology changes if (!_joined) { throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet"); } std::optional latest = _gen_id; _gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& state) { auto addr = state.get_host_id(); if (_gossiper.is_left(addr)) { cdc_log.info("check_and_repair_cdc_streams ignored node {} because it is in LEFT state", addr); return; } if (!_gossiper.is_normal(addr)) { throw std::runtime_error(fmt::format("All nodes must be in NORMAL or LEFT state while performing check_and_repair_cdc_streams" " ({} is in state {})", addr, _gossiper.get_gossip_status(state))); } const auto gen_id = get_generation_id_for(addr, state); if (!latest || (gen_id && get_ts(*gen_id) > get_ts(*latest))) { latest = gen_id; } }); auto tmptr = _token_metadata.get(); auto sys_dist_ks = get_sys_dist_ks(); bool should_regenerate = false; if (!latest) { cdc_log.warn("check_and_repair_cdc_streams: no generation observed in gossip"); should_regenerate = true; } else if (std::holds_alternative(*latest) && _feature_service.cdc_generations_v2) { cdc_log.info( "Cluster still using CDC generation storage format V1 (id: {}), even though it already understands the V2 format." " Creating a new generation using V2.", *latest); should_regenerate = true; } else { cdc_log.info("check_and_repair_cdc_streams: last generation observed in gossip: {}", *latest); static const auto timeout_msg = "Timeout while fetching CDC topology description"; static const auto topology_read_error_note = "Note: this is likely caused by" " node(s) being down or unreachable. It is recommended to check the network and" " restart/remove the failed node(s), then retry checkAndRepairCdcStreams command"; static const auto exception_translating_msg = "Translating the exception to `request_execution_exception`"; std::optional gen; try { gen = co_await retrieve_generation_data(*latest, _sys_ks.local(), *sys_dist_ks, { tmptr->count_normal_token_owners() }); } catch (exceptions::request_timeout_exception& e) { cdc_log.error("{}: \"{}\". {}.", timeout_msg, e.what(), exception_translating_msg); throw exceptions::request_execution_exception(exceptions::exception_code::READ_TIMEOUT, format("{}. {}.", timeout_msg, topology_read_error_note)); } catch (exceptions::unavailable_exception& e) { static const auto unavailable_msg = "Node(s) unavailable while fetching CDC topology description"; cdc_log.error("{}: \"{}\". {}.", unavailable_msg, e.what(), exception_translating_msg); throw exceptions::request_execution_exception(exceptions::exception_code::UNAVAILABLE, format("{}. {}.", unavailable_msg, topology_read_error_note)); } catch (...) { const auto ep = std::current_exception(); if (is_timeout_exception(ep)) { cdc_log.error("{}: \"{}\". {}.", timeout_msg, ep, exception_translating_msg); throw exceptions::request_execution_exception(exceptions::exception_code::READ_TIMEOUT, format("{}. {}.", timeout_msg, topology_read_error_note)); } // On exotic errors proceed with regeneration cdc_log.error("Exception while reading CDC topology description: \"{}\". Regenerating streams anyway.", ep); should_regenerate = true; } if (!gen) { cdc_log.error( "Could not find CDC generation with timestamp {} in distributed system tables (current time: {})," " even though some node gossiped about it.", latest, db_clock::now()); should_regenerate = true; } else if (!is_cdc_generation_optimal(*gen, *tmptr)) { should_regenerate = true; cdc_log.info("CDC generation {} needs repair, regenerating", latest); } } if (!should_regenerate) { if (latest != _gen_id) { co_await legacy_do_handle_cdc_generation(*latest); } cdc_log.info("CDC generation {} does not need repair", latest); co_return; } const auto new_gen_id = co_await legacy_make_new_generation({}, true); // Need to artificially update our STATUS so other nodes handle the generation ID change // FIXME: after 0e0282cd nodes do not require a STATUS update to react to CDC generation changes. // The artificial STATUS update here should eventually be removed (in a few releases). auto status = _gossiper.get_this_endpoint_state_ptr()->get_application_state_ptr(gms::application_state::STATUS); if (!status) { cdc_log.error("Our STATUS is missing"); cdc_log.error("Aborting CDC generation repair due to missing STATUS"); co_return; } // Update _gen_id first, so that legacy_do_handle_cdc_generation (which will get called due to the status update) // won't try to update the gossiper, which would result in a deadlock inside add_local_application_state _gen_id = new_gen_id; co_await _gossiper.add_local_application_state( std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(new_gen_id)), std::pair(gms::application_state::STATUS, *status) ); co_await _sys_ks.local().update_cdc_generation_id(new_gen_id); } future<> generation_service::handle_cdc_generation(cdc::generation_id_v2 gen_id) { auto ts = get_ts(gen_id); if (co_await container().map_reduce(and_reducer(), [ts] (generation_service& svc) { return !svc._cdc_metadata.prepare(ts); })) { co_return; } auto gen_data = co_await _sys_ks.local().read_cdc_generation(gen_id.id); bool using_this_gen = co_await container().map_reduce(or_reducer(), [ts, &gen_data] (generation_service& svc) -> future { // We need to copy it here before awaiting anything to avoid destruction of the captures. const auto timestamp = ts; topology_description gen_copy = co_await gen_data.clone_async(); co_return svc._cdc_metadata.insert(timestamp, std::move(gen_copy)); }); if (using_this_gen) { cdc_log.info("Started using generation {}.", gen_id); } } future<> generation_service::legacy_handle_cdc_generation(std::optional gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); if (!gen_id) { co_return; } if (!_sys_dist_ks.local_is_initialized() || !_sys_dist_ks.local().started()) { on_internal_error(cdc_log, "Legacy handle CDC generation with sys.dist.ks. down"); } // The service should not be listening for generation changes until after the node // is bootstrapped and since the node leaves the ring on decommission if (co_await container().map_reduce(and_reducer(), [ts = get_ts(*gen_id)] (generation_service& svc) { return !svc._cdc_metadata.prepare(ts); })) { co_return; } bool using_this_gen = false; try { using_this_gen = co_await legacy_do_handle_cdc_generation_intercept_nonfatal_errors(*gen_id); } catch (generation_handling_nonfatal_exception& e) { cdc_log.warn(could_not_retrieve_msg_template, gen_id, e.what(), "retrying in the background"); legacy_async_handle_cdc_generation(*gen_id); co_return; } catch (...) { cdc_log.error(could_not_retrieve_msg_template, gen_id, std::current_exception(), "not retrying"); co_return; // Exotic ("fatal") exception => do not retry } if (using_this_gen) { cdc_log.info("Starting to use generation {}", *gen_id); co_await update_streams_description(*gen_id, _sys_ks.local(), get_sys_dist_ks(), [&tm = _token_metadata] { return tm.get()->count_normal_token_owners(); }, _abort_src); } } void generation_service::legacy_async_handle_cdc_generation(cdc::generation_id gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); (void)(([] (cdc::generation_id gen_id, shared_ptr svc) -> future<> { while (true) { co_await sleep_abortable(std::chrono::seconds(5), svc->_abort_src); try { bool using_this_gen = co_await svc->legacy_do_handle_cdc_generation_intercept_nonfatal_errors(gen_id); if (using_this_gen) { cdc_log.info("Starting to use generation {}", gen_id); co_await update_streams_description(gen_id, svc->_sys_ks.local(), svc->get_sys_dist_ks(), [&tm = svc->_token_metadata] { return tm.get()->count_normal_token_owners(); }, svc->_abort_src); } co_return; } catch (generation_handling_nonfatal_exception& e) { cdc_log.warn(could_not_retrieve_msg_template, gen_id, e.what(), "continuing to retry in the background"); } catch (...) { cdc_log.error(could_not_retrieve_msg_template, gen_id, std::current_exception(), "not retrying anymore"); co_return; // Exotic ("fatal") exception => do not retry } if (co_await svc->container().map_reduce(and_reducer(), [ts = get_ts(gen_id)] (generation_service& svc) { return svc._cdc_metadata.known_or_obsolete(ts); })) { co_return; } } })(gen_id, shared_from_this())); } future<> generation_service::legacy_scan_cdc_generations() { assert_shard_zero(__PRETTY_FUNCTION__); std::optional latest; _gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& eps) { auto gen_id = get_generation_id_for(eps.get_host_id(), eps); if (!latest || (gen_id && get_ts(*gen_id) > get_ts(*latest))) { latest = gen_id; } }); if (latest) { cdc_log.info("Latest generation seen during startup: {}", *latest); co_await legacy_handle_cdc_generation(latest); } else { cdc_log.info("No generation seen during startup."); } } future generation_service::legacy_do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); // Use futurize_invoke to catch all exceptions from legacy_do_handle_cdc_generation. return futurize_invoke([this, gen_id] { return legacy_do_handle_cdc_generation(gen_id); }).handle_exception([] (std::exception_ptr ep) -> future { try { std::rethrow_exception(ep); } catch (exceptions::request_timeout_exception& e) { throw generation_handling_nonfatal_exception(e.what()); } catch (exceptions::unavailable_exception& e) { throw generation_handling_nonfatal_exception(e.what()); } catch (exceptions::read_failure_exception& e) { throw generation_handling_nonfatal_exception(e.what()); } catch (...) { const auto ep = std::current_exception(); if (is_timeout_exception(ep)) { throw generation_handling_nonfatal_exception(format("{}", ep)); } throw; } }); } future generation_service::legacy_do_handle_cdc_generation(cdc::generation_id gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); auto sys_dist_ks = get_sys_dist_ks(); auto gen = co_await retrieve_generation_data(gen_id, _sys_ks.local(), *sys_dist_ks, { _token_metadata.get()->count_normal_token_owners() }); if (!gen) { // This may happen during raft upgrade when a node gossips about a generation that // was propagated through raft and we didn't apply it yet. throw generation_handling_nonfatal_exception(fmt::format( "Could not find CDC generation {} in distributed system tables (current time: {})," " even though some node gossiped about it.", gen_id, db_clock::now())); } // We always gossip about the generation with the greatest timestamp. Specific nodes may remember older generations, // but eventually they forget when their clocks move past the latest generation's timestamp. // The cluster as a whole is only interested in the last generation so restarting nodes may learn what it is. // We assume that generation changes don't happen ``too often'' so every node can learn about a generation // before it is superseded by a newer one which causes nodes to start gossiping the about the newer one. // The assumption follows from the requirement of bootstrapping nodes sequentially. if (!_gen_id || get_ts(*_gen_id) < get_ts(gen_id)) { _gen_id = gen_id; co_await _sys_ks.local().update_cdc_generation_id(gen_id); co_await _gossiper.add_local_application_state( gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(gen_id)); } // Return `true` iff the generation was inserted on any of our shards. co_return co_await container().map_reduce(or_reducer(), [ts = get_ts(gen_id), &gen] (generation_service& svc) -> future { // We need to copy it here before awaiting anything to avoid destruction of the captures. const auto timestamp = ts; topology_description gen_copy = co_await gen->clone_async(); co_return svc._cdc_metadata.insert(timestamp, std::move(gen_copy)); }); } shared_ptr generation_service::get_sys_dist_ks() { assert_shard_zero(__PRETTY_FUNCTION__); if (!_sys_dist_ks.local_is_initialized()) { throw std::runtime_error("system distributed keyspace not initialized"); } return _sys_dist_ks.local_shared(); } db_clock::time_point get_ts(const generation_id& gen_id) { return std::visit([] (auto& id) { return id.ts; }, gen_id); } future create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const locator::tablet_map& map, api::timestamp_type ts) { auto s = db::system_keyspace::cdc_streams_state(); mutation m(s, partition_key::from_single_value(*s, data_value(table.uuid()).serialize_nonnull() )); m.set_static_cell("timestamp", stream_ts, ts); for (auto tid : map.tablet_ids()) { auto sid = cdc::stream_id(map.get_last_token(tid), 0); auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token())); m.set_cell(ck, "stream_id", data_value(std::move(sid).to_bytes()), ts); co_await coroutine::maybe_yield(); } co_return std::move(m); } future create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector& stream_ids, api::timestamp_type ts) { auto s = db::system_keyspace::cdc_streams_state(); mutation m(s, partition_key::from_single_value(*s, data_value(table.uuid()).serialize_nonnull() )); m.set_static_cell("timestamp", stream_ts, ts); for (const auto& sid : stream_ids) { auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token())); m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts); co_await coroutine::maybe_yield(); } co_return std::move(m); } utils::chunked_vector make_drop_table_streams_mutations(table_id table, api::timestamp_type ts) { utils::chunked_vector mutations; mutations.reserve(2); for (auto s : {db::system_keyspace::cdc_streams_state(), db::system_keyspace::cdc_streams_history()}) { mutation m(s, partition_key::from_single_value(*s, data_value(table.uuid()).serialize_nonnull() )); m.partition().apply(tombstone(ts, gc_clock::now())); mutations.emplace_back(std::move(m)); } return mutations; } future<> generation_service::load_cdc_tablet_streams(std::optional> changed_tables) { // track which tables we expect to get data for, and which we actually get. // when a table is dropped we won't get any streams for it. we will use this to // know which tables are dropped and remove their stream map from the metadata. std::unordered_set tables_to_process; if (changed_tables) { tables_to_process = *changed_tables; } else { tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to>(); } auto read_streams_state = [this] (const std::optional>& tables, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) -> future<> { if (tables) { for (auto table : *tables) { co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { return f(table, base_ts, std::move(base_stream_set)); }); } } else { co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { return f(table, base_ts, std::move(base_stream_set)); }); } }; co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { table_streams new_table_map; auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector stream_set) { auto ts = std::chrono::duration_cast(stream_tp.time_since_epoch()).count(); new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)}; }; // if we already have a loaded streams map, and the base timestamp is unchanged, then read // the history entries starting from the latest one we have and append it to the existing map. // we can do it because we only append new rows with higher timestamps to the history table. std::optional> from_streams; std::optional from_ts; const auto& all_streams = _cdc_metadata.get_all_tablet_streams(); if (auto it = all_streams.find(table); it != all_streams.end()) { const auto& current_map = *it->second; if (current_map.cbegin()->second.ts == base_ts) { const auto& latest_entry = current_map.crbegin()->second; from_streams = std::cref(latest_entry); from_ts = latest_entry.ts; } } if (!from_ts) { append_stream(base_ts, std::move(base_stream_set)); } co_await _sys_ks.local().read_cdc_streams_history(table, from_ts, [&] (table_id tid, db_clock::time_point ts, cdc_stream_diff diff) -> future<> { const auto& prev_stream_set = new_table_map.empty() ? from_streams->get().streams : std::crbegin(new_table_map)->second.streams; append_stream(ts, co_await cdc::metadata::construct_next_stream_set( prev_stream_set, std::move(diff.opened_streams), diff.closed_streams)); }); co_await container().invoke_on_all(coroutine::lambda([&] (generation_service& svc) -> future<> { table_streams new_table_map_copy; for (const auto& [ts, entry] : new_table_map) { new_table_map_copy[ts] = entry; co_await coroutine::maybe_yield(); } if (!from_ts) { svc._cdc_metadata.load_tablet_streams_map(table, std::move(new_table_map_copy)); } else { svc._cdc_metadata.append_tablet_streams_map(table, std::move(new_table_map_copy)); } })); tables_to_process.erase(table); }); // the remaining tables have no streams - remove them from the metadata co_await container().invoke_on_all([&] (generation_service& svc) { for (auto table : tables_to_process) { svc._cdc_metadata.remove_tablet_streams_map(table); } }); } future<> generation_service::query_cdc_timestamps(table_id table, bool ascending, noncopyable_function(db_clock::time_point)> f) { const auto& all_tables = _cdc_metadata.get_all_tablet_streams(); auto table_it = all_tables.find(table); if (table_it == all_tables.end()) { co_return; } const auto table_streams_ptr = table_it->second; // keep alive const auto& table_streams = *table_streams_ptr; if (ascending) { for (auto it = table_streams.cbegin(); it != table_streams.cend(); ++it) { co_await f(it->second.ts); } } else { for (auto it = table_streams.crbegin(); it != table_streams.crend(); ++it) { co_await f(it->second.ts); } } } future<> generation_service::query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const utils::chunked_vector& current, cdc::cdc_stream_diff)> f) { const auto& all_tables = _cdc_metadata.get_all_tablet_streams(); auto table_it = all_tables.find(table); if (table_it == all_tables.end()) { co_return; } const auto table_streams_ptr = table_it->second; // keep alive const auto& table_streams = *table_streams_ptr; auto it_prev = table_streams.end(); auto it = table_streams.begin(); while (it != table_streams.end()) { const auto& entry = it->second; if (it_prev != table_streams.end()) { const auto& prev_entry = it_prev->second; auto diff = co_await cdc::metadata::generate_stream_diff(prev_entry.streams, entry.streams); co_await f(entry.ts, entry.streams, std::move(diff)); } else { co_await f(entry.ts, entry.streams, cdc::cdc_stream_diff{.closed_streams = {}, .opened_streams = entry.streams}); } it_prev = it; ++it; } } future get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts) { auto history_schema = db::system_keyspace::cdc_streams_history(); auto decomposed_ts = timestamp_type->decompose(stream_ts); auto closed_kind = byte_type->decompose(std::to_underlying(stream_state::closed)); auto opened_kind = byte_type->decompose(std::to_underlying(stream_state::opened)); mutation m(history_schema, partition_key::from_single_value(*history_schema, data_value(table.uuid()).serialize_nonnull() )); for (auto&& sid : diff.closed_streams) { co_await coroutine::maybe_yield(); auto ck = clustering_key::from_exploded(*history_schema, { decomposed_ts, closed_kind, long_type->decompose(dht::token::to_int64(sid.token())) }); m.set_cell(ck, "stream_id", data_value(std::move(sid).to_bytes()), ts); } for (auto&& sid : diff.opened_streams) { co_await coroutine::maybe_yield(); auto ck = clustering_key::from_exploded(*history_schema, { decomposed_ts, opened_kind, long_type->decompose(dht::token::to_int64(sid.token())) }); m.set_cell(ck, "stream_id", data_value(std::move(sid).to_bytes()), ts); } co_return std::move(m); } future<> generation_service::generate_tablet_resize_update(utils::chunked_vector& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts) { if (!_cdc_metadata.get_all_tablet_streams().contains(table)) { // not a CDC table co_return; } utils::chunked_vector new_streams; co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count()); for (auto tid : new_tablet_map.tablet_ids()) { new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0); co_await coroutine::maybe_yield(); } const auto& table_streams = *_cdc_metadata.get_all_tablet_streams().at(table); auto current_streams_it = std::crbegin(table_streams); if (current_streams_it == std::crend(table_streams)) { // no streams at all - this should not happen on_internal_error(cdc_log, format("generate_tablet_resize_update: no streams for table {}", table)); } const auto& current_streams = current_streams_it->second; auto new_ts = new_generation_timestamp(true, _cfg.ring_delay); new_ts = std::max(new_ts, current_streams.ts + std::chrono::milliseconds(1)); // ensure timestamps are increasing auto diff = co_await _cdc_metadata.generate_stream_diff(current_streams.streams, new_streams); auto mut = co_await get_switch_streams_mutation(table, new_ts, std::move(diff), ts); muts.emplace_back(std::move(mut)); } future> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector& base_stream_set, api::timestamp_type ts) { utils::chunked_vector muts; muts.reserve(2); auto gc_now = gc_clock::now(); auto tombstone_ts = ts - 1; { // write the new base stream set to cdc_streams_state auto s = db::system_keyspace::cdc_streams_state(); mutation m(s, partition_key::from_single_value(*s, data_value(table.uuid()).serialize_nonnull() )); m.partition().apply(tombstone(tombstone_ts, gc_now)); m.set_static_cell("timestamp", data_value(base_ts), ts); for (const auto& sid : base_stream_set) { co_await coroutine::maybe_yield(); auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token())); m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts); } muts.emplace_back(std::move(m)); } { // remove all entries from cdc_streams_history up to the new base auto s = db::system_keyspace::cdc_streams_history(); mutation m(s, partition_key::from_single_value(*s, data_value(table.uuid()).serialize_nonnull() )); auto range = query::clustering_range::make_ending_with({ clustering_key_prefix::from_single_value(*s, timestamp_type->decompose(base_ts)), true}); auto bv = bound_view::from_range(range); m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_now}}); muts.emplace_back(std::move(m)); } co_return std::move(muts); } table_streams::const_iterator get_new_base_for_gc(const table_streams& streams_map, std::chrono::seconds ttl) { // find the most recent timestamp that is older than ttl_seconds, which will become the new base. // all streams with older timestamps can be removed because they are closed for more than ttl_seconds // (they are all replaced by streams with the newer timestamp). auto ts_upper_bound = db_clock::now() - ttl; auto it = streams_map.begin(); while (it != streams_map.end()) { auto next_it = std::next(it); if (next_it == streams_map.end()) { break; } auto next_tp = next_it->second.ts; if (next_tp <= ts_upper_bound) { // the next timestamp is older than ttl_seconds, so the current one is obsolete it = next_it; } else { break; } } return it; } future> generation_service::garbage_collect_cdc_streams_for_table(table_id table, std::optional ttl, api::timestamp_type ts) { const auto& table_streams = *_cdc_metadata.get_all_tablet_streams().at(table); // if TTL is not provided by the caller then use the table's CDC TTL auto base_schema = cdc::get_base_table(_db, *_db.find_schema(table)); ttl = ttl.or_else([&] -> std::optional { auto ttl_seconds = base_schema->cdc_options().ttl(); if (ttl_seconds > 0) { return std::chrono::seconds(ttl_seconds); } else { // ttl=0 means no ttl return std::nullopt; } }); if (!ttl) { co_return utils::chunked_vector{}; } auto new_base_it = get_new_base_for_gc(table_streams, *ttl); if (new_base_it == table_streams.begin() || new_base_it == table_streams.end()) { // nothing to gc co_return utils::chunked_vector{}; } for (auto it = table_streams.begin(); it != new_base_it; ++it) { cdc_log.info("Garbage collecting CDC stream metadata for table {}: removing generation {} because it is older than the CDC TTL of {} seconds", table, it->second.ts, *ttl); } co_return co_await get_cdc_stream_gc_mutations(table, new_base_it->second.ts, new_base_it->second.streams, ts); } future<> generation_service::garbage_collect_cdc_streams(utils::chunked_vector& muts, api::timestamp_type ts) { for (auto table : _cdc_metadata.get_tables_with_cdc_tablet_streams()) { co_await coroutine::maybe_yield(); auto table_muts = co_await garbage_collect_cdc_streams_for_table(table, std::nullopt, ts); for (auto&& m : table_muts) { muts.emplace_back(std::move(m)); } } } } // namespace cdc