cdc: move cdc table creation to pre_create
When creating a new table with CDC enabled, we create also a CDC log table by adding the CDC table's mutations in the same operation. Previously, it works by the CDC log service subscribing to on_before_create_column_family and adding the CDC table's mutations there when being notified about a new created table. The problem is that when we create the tables we also create their tablet maps in the tablet allocator, and we want to created the two tables as co-located tables: we allocate a tablet map for the base table, and the CDC table is co-located with the base table. This doesn't work well with the previous approach because the notification that creates the CDC table is the same notification that the tablet allocator creates the base tablet map, so the two operations are independent, but really we want the tablet allocator to work on both tables together, so that we have the base table's schema and tablet map when we create the CDC table's co-located tablet map. In order to achieve this, we want to create and add the CDC table's schema, and only after that notify using before_create_column_families with a vector that contains both the base table and CDC table. The tablet allocator will then have all the information it needs to create the co-located tablet map. We move the creation of the CDC log table - instead of adding the table's mutations in on_before_create_column_family, we create the table schema and add it to the new tables vector in on_pre_create_column_families, which is called by the migration manager in do_prepare_new_column_families_announcement. The migration manager will then create and add all mutations for creating the tables, and notify about the tables being created together.
This commit is contained in:
19
cdc/log.cc
19
cdc/log.cc
@@ -171,8 +171,16 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
||||
if (schema.cdc_options().enabled()) {
|
||||
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) override {
|
||||
std::vector<schema_ptr> new_cfms;
|
||||
|
||||
for (auto sp : cfms) {
|
||||
const auto& schema = *sp;
|
||||
|
||||
if (!schema.cdc_options().enabled()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& db = _ctxt._proxy.get_db().local();
|
||||
auto logname = log_name(schema.cf_name());
|
||||
check_that_cdc_log_table_does_not_exist(db, schema, logname);
|
||||
@@ -181,11 +189,10 @@ public:
|
||||
|
||||
// in seastar thread
|
||||
auto log_schema = create_log_schema(schema, db, ksm);
|
||||
|
||||
auto log_mut = db::schema_tables::make_create_table_mutations(log_schema, timestamp);
|
||||
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
new_cfms.push_back(std::move(log_schema));
|
||||
}
|
||||
|
||||
cfms.insert(cfms.end(), std::make_move_iterator(new_cfms.begin()), std::make_move_iterator(new_cfms.end()));
|
||||
}
|
||||
|
||||
void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
||||
|
||||
@@ -80,6 +80,7 @@ public:
|
||||
// of the column family's keyspace. The reason for this is that we sometimes create a keyspace
|
||||
// and its column families together. Therefore, listeners can't load the keyspace from the
|
||||
// database. Instead, they should use the `ksm` parameter if needed.
|
||||
virtual void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&) {}
|
||||
virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type) {}
|
||||
virtual void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp);
|
||||
virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>&, api::timestamp_type) {}
|
||||
@@ -145,6 +146,14 @@ public:
|
||||
future<> drop_function(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
|
||||
future<> drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
|
||||
|
||||
// This notification allows the subscriber to modify the cfms vector before
|
||||
// we create the tables mutations and notify about them. For example, we
|
||||
// can add a new table here (e.g. CDC).
|
||||
// We want to do this before calling `before_create_column_families`,
|
||||
// because in `before_create_column_families` we want the subscriber to get
|
||||
// the final list of tables.
|
||||
void pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&);
|
||||
|
||||
void before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type);
|
||||
void before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>&, utils::chunked_vector<mutation>&, api::timestamp_type);
|
||||
void before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>&, api::timestamp_type);
|
||||
|
||||
@@ -587,6 +587,13 @@ void migration_notifier::before_create_column_family(const keyspace_metadata& ks
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) {
|
||||
_listeners.thread_for_each([&ksm, &cfms] (migration_listener* listener) {
|
||||
// allow exceptions. so a listener can effectively kill a create-table
|
||||
listener->on_pre_create_column_families(ksm, cfms);
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::before_create_column_families(const keyspace_metadata& ksm,
|
||||
const std::vector<schema_ptr>& schemas, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
|
||||
_listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) {
|
||||
@@ -648,20 +655,23 @@ static future<utils::chunked_vector<mutation>> include_keyspace(
|
||||
static future<utils::chunked_vector<mutation>> do_prepare_new_column_families_announcement(storage_proxy& sp,
|
||||
const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
|
||||
auto& db = sp.local_db();
|
||||
for (auto cfm : cfms) {
|
||||
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
|
||||
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
|
||||
}
|
||||
if (db.column_family_exists(cfm->id())) {
|
||||
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto cfm : cfms) {
|
||||
mlogger.info("Create new ColumnFamily: {}", cfm);
|
||||
}
|
||||
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] mutable {
|
||||
db.get_notifier().pre_create_column_families(ksm, cfms);
|
||||
|
||||
for (auto cfm : cfms) {
|
||||
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
|
||||
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
|
||||
}
|
||||
if (db.column_family_exists(cfm->id())) {
|
||||
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto cfm : cfms) {
|
||||
mlogger.info("Create new ColumnFamily: {}", cfm);
|
||||
}
|
||||
|
||||
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] {
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
for (schema_ptr cfm : cfms) {
|
||||
auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp);
|
||||
|
||||
Reference in New Issue
Block a user