mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 17:10:35 +00:00
DynamoDB Streams API can only convey a single parent per stream shard. Tablet merges produce 2 parents, which is incompatible. When streams are requested on a tablet table, block tablet merges via tablet_merge_blocked (the allocator suppresses new merge decisions and revokes any active merge decision). add_stream_options() sets tablet_merge_blocked=true alongside enabled=true, so CreateTable needs no special handling — the flag is inert on vnode tables and immediately effective on tablet tables. For UpdateTable, CDC enablement is deferred: store the user's intent via enable_requested, and let the topology coordinator finalize enablement once no in-progress merges remain. A new helper, defer_enabling_streams_block_tablet_merges(), amends the CDC options to this deferred state. Disabling streams clears all flags, immediately re-allowing merges. The tablet allocator accesses the merge-blocked flag through a schema::tablet_merges_forbidden() accessor rather than reaching into CDC options directly. Mark test_parent_children_merge as xfail and remove downward (merge) steps from tablet_multipliers in test_parent_filtering and test_get_records_with_alternating_tablets_count.
80 lines
2.7 KiB
C++
80 lines
2.7 KiB
C++
/*
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <seastar/core/sharded.hh>
|
|
#include "cdc/metadata.hh"
|
|
#include "cdc/generation_id.hh"
|
|
|
|
namespace db {
|
|
class system_keyspace;
|
|
}
|
|
|
|
namespace locator {
|
|
class tablet_map;
|
|
class token_metadata;
|
|
}
|
|
|
|
namespace cdc {
|
|
|
|
class generation_service : public peering_sharded_service<generation_service>
|
|
, public async_sharded_service<generation_service> {
|
|
public:
|
|
struct config {
|
|
std::chrono::milliseconds ring_delay;
|
|
};
|
|
|
|
private:
|
|
bool _stopped = false;
|
|
|
|
config _cfg;
|
|
sharded<db::system_keyspace>& _sys_ks;
|
|
replica::database& _db;
|
|
|
|
/* Maintains the set of known CDC generations used to pick streams for log writes (i.e., the partition keys of these log writes). */
|
|
cdc::metadata _cdc_metadata;
|
|
|
|
public:
|
|
generation_service(config cfg,
|
|
sharded<db::system_keyspace>& sys_ks,
|
|
replica::database& db);
|
|
|
|
future<> stop();
|
|
~generation_service();
|
|
|
|
cdc::metadata& get_cdc_metadata() {
|
|
return _cdc_metadata;
|
|
}
|
|
|
|
/* Retrieve the CDC generation with the given ID from local tables
|
|
* and start using it for CDC log writes if it's not obsolete.
|
|
* Precondition: the generation was committed using group 0 and locally applied.
|
|
*/
|
|
future<> handle_cdc_generation(cdc::generation_id);
|
|
|
|
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
|
|
|
|
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
|
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
|
|
|
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);
|
|
|
|
// Check for tables with enable_requested CDC option and finalize their
|
|
// stream enablement if no in-progress tablet merges remain.
|
|
// Returns schema mutations that transition enable_requested -> enabled,
|
|
// including CDC log table creation side effects.
|
|
future<utils::chunked_vector<canonical_mutation>> maybe_finalize_pending_stream_enables(const locator::token_metadata& tm, api::timestamp_type ts);
|
|
|
|
future<utils::chunked_vector<mutation>> garbage_collect_cdc_streams_for_table(table_id table, std::optional<std::chrono::seconds> ttl, api::timestamp_type ts);
|
|
future<> garbage_collect_cdc_streams(utils::chunked_vector<canonical_mutation>& muts, api::timestamp_type ts);
|
|
|
|
};
|
|
|
|
} // namespace cdc
|