cdc: move cdc table creation to pre_create

When creating a new table with CDC enabled, we create also a CDC log
table by adding the CDC table's mutations in the same operation.

Previously, it works by the CDC log service subscribing to
on_before_create_column_family and adding the CDC table's mutations
there when being notified about a new created table.

The problem is that when we create the tables we also create their
tablet maps in the tablet allocator, and we want to created the two
tables as co-located tables: we allocate a tablet map for the base
table, and the CDC table is co-located with the base table.

This doesn't work well with the previous approach because the
notification that creates the CDC table is the same notification that
the tablet allocator creates the base tablet map, so the two operations
are independent, but really we want the tablet allocator to work on both
tables together, so that we have the base table's schema and tablet map
when we create the CDC table's co-located tablet map.

In order to achieve this, we want to create and add the CDC table's
schema, and only after that notify using before_create_column_families
with a vector that contains both the base table and CDC table. The
tablet allocator will then have all the information it needs to create
the co-located tablet map.

We move the creation of the CDC log table - instead of adding the
table's mutations in on_before_create_column_family, we create the table
schema and add it to the new tables vector in
on_pre_create_column_families, which is called by the migration manager
in do_prepare_new_column_families_announcement. The migration manager
will then create and add all mutations for creating the tables, and
notify about the tables being created together.
This commit is contained in:
Michael Litvak
2025-07-02 15:47:35 +03:00
parent b9ee28eaab
commit fed1048059
3 changed files with 44 additions and 18 deletions

View File

@@ -171,8 +171,16 @@ public:
});
}
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
if (schema.cdc_options().enabled()) {
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) override {
std::vector<schema_ptr> new_cfms;
for (auto sp : cfms) {
const auto& schema = *sp;
if (!schema.cdc_options().enabled()) {
continue;
}
auto& db = _ctxt._proxy.get_db().local();
auto logname = log_name(schema.cf_name());
check_that_cdc_log_table_does_not_exist(db, schema, logname);
@@ -181,11 +189,10 @@ public:
// in seastar thread
auto log_schema = create_log_schema(schema, db, ksm);
auto log_mut = db::schema_tables::make_create_table_mutations(log_schema, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
new_cfms.push_back(std::move(log_schema));
}
cfms.insert(cfms.end(), std::make_move_iterator(new_cfms.begin()), std::make_move_iterator(new_cfms.end()));
}
void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {

View File

@@ -80,6 +80,7 @@ public:
// of the column family's keyspace. The reason for this is that we sometimes create a keyspace
// and its column families together. Therefore, listeners can't load the keyspace from the
// database. Instead, they should use the `ksm` parameter if needed.
virtual void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&) {}
virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type) {}
virtual void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp);
virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>&, api::timestamp_type) {}
@@ -145,6 +146,14 @@ public:
future<> drop_function(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
future<> drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
// This notification allows the subscriber to modify the cfms vector before
// we create the tables mutations and notify about them. For example, we
// can add a new table here (e.g. CDC).
// We want to do this before calling `before_create_column_families`,
// because in `before_create_column_families` we want the subscriber to get
// the final list of tables.
void pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&);
void before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type);
void before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>&, utils::chunked_vector<mutation>&, api::timestamp_type);
void before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>&, api::timestamp_type);

View File

@@ -587,6 +587,13 @@ void migration_notifier::before_create_column_family(const keyspace_metadata& ks
});
}
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) {
_listeners.thread_for_each([&ksm, &cfms] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_pre_create_column_families(ksm, cfms);
});
}
void migration_notifier::before_create_column_families(const keyspace_metadata& ksm,
const std::vector<schema_ptr>& schemas, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) {
@@ -648,20 +655,23 @@ static future<utils::chunked_vector<mutation>> include_keyspace(
static future<utils::chunked_vector<mutation>> do_prepare_new_column_families_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
auto& db = sp.local_db();
for (auto cfm : cfms) {
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
}
for (auto cfm : cfms) {
mlogger.info("Create new ColumnFamily: {}", cfm);
}
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] mutable {
db.get_notifier().pre_create_column_families(ksm, cfms);
for (auto cfm : cfms) {
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
}
for (auto cfm : cfms) {
mlogger.info("Create new ColumnFamily: {}", cfm);
}
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] {
utils::chunked_vector<mutation> mutations;
for (schema_ptr cfm : cfms) {
auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp);