Files
scylladb/service/migration_manager.cc
Botond Dénes 475220b9c9 Merge 'Remove the rest of pre raft topology code' from Gleb Natapov
Remove the rest of the code that assumes that either group0 does not exist yet or a cluster is till not upgraded to raft topology. Both of those are not supported any more.

No need to backport since we remove functionality here.

Closes scylladb/scylladb#28841

* github.com:scylladb/scylladb:
  service level: remove version 1 service level code
  features: move GROUP0_SCHEMA_VERSIONING to deprecated features list
  migration_manager: remove unused forward definitions
  test: remove unused code
  auth: drop auth_migration_listener since it does nothing now
  schema: drop schema_registry_entry::maybe_sync() function
  schema: drop make_table_deleting_mutations since it should not be needed with raft
  schema: remove calculate_schema_digest function
  schema: drop recalculate_schema_version function and its uses
  migration_manager: drop check for group0_schema_versioning feature
  cdc: drop usage of cdc_local table and v1 generation definition
  storage_service: no need to add yourself to the topology during reboot since raft state loading already did it
  storage_service: remove unused functions
  group0: drop with_raft() function from group0_guard since it always returns true now
  gossiper: do not gossip TOKENS and CDC_GENERATION_ID any more
  gossiper: drop tokens from loaded_endpoint_state
  gossiper: remove unused functions
  storage_service: do not pass loaded_peer_features to join_topology()
  storage_service: remove unused fields from replacement_info
  gossiper: drop is_safe_for_restart() function and its use
  storage_service: remove unused variables from join_topology
  gossiper: remove the code that was only used in gossiper topology
  storage_service: drop the check for raft mode from recovery code
  cdc: remove legacy code
  test: remove unused injection points
  auth: remove legacy auth mode and upgrade code
  treewide: remove schema pull code since we never pull schema any more
  raft topology: drop upgrade_state and its type from the topology state machine since it is not used any longer
  group0: hoist the checks for an illegal upgrade into main.cc
  api: drop get_topology_upgrade_state and always report upgrade status as done
  service_level_controller: drop service level upgrade code
  test: drop run_with_raft_recovery parameter to cql_test_env
  group0: get rid of group0_upgrade_state
  storage_service: drop topology_change_kind as it is no longer needed
  storage_service: drop check_ability_to_perform_topology_operation since no upgrades can happen any more
  service_storage: remove unused functions
  storage_service: remove non raft rebuild code
  storage_service: set topology change kind only once
  group0: drop in_recovery function and its uses
  group0: rename use_raft to maintenance_mode and make it sync
2026-03-11 10:24:20 +02:00

1118 lines
54 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include <algorithm>
#include <ranges>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "auth/resource.hh"
#include "locator/host_id.hh"
#include "schema/schema_registry.hh"
#include "service/migration_manager.hh"
#include <seastar/core/on_internal_error.hh>
#include "service/storage_proxy.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/migration_listener.hh"
#include "message/messaging_service.hh"
#include "gms/feature_service.hh"
#include "db/view/view_building_state.hh"
#include "utils/UUID_gen.hh"
#include "utils/assert.hh"
#include "gms/gossiper.hh"
#include "view_info.hh"
#include "schema/schema_builder.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include "db/schema_applier.hh"
#include "db/schema_tables.hh"
#include "types/user.hh"
#include "db/system_keyspace.hh"
#include "cql3/functions/user_aggregate.hh"
#include "cql3/functions/user_function.hh"
#include "cql3/functions/function_name.hh"
#include "unimplemented.hh"
#include "idl/migration_manager.dist.hh"
namespace service {
static logging::logger mlogger("migration_manager");
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp);
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
_notifier(notifier)
, _background_tasks("migration_manager::background_tasks")
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss("migration_manager::storage_service"), _gossiper(gossiper), _group0_client(group0_client)
, _sys_ks(sysks)
, _group0_barrier(this_shard_id() == 0 ?
std::function<future<>()>([this] () -> future<> {
// This will run raft barrier and will sync schema with the leader
co_await with_scheduling_group(_gossiper.get_scheduling_group(), [this] {
return start_group0_operation().discard_result();
});
}) :
std::function<future<>()>([this] () -> future<> {
co_await container().invoke_on(0, [] (migration_manager& mm) -> future<> {
// batch group0 raft barriers
co_await mm._group0_barrier.trigger();
});
})
)
, _schema_push([this] { return passive_announce(); })
, _concurrent_ddl_retries{10}
{
init_messaging_service();
}
future<> migration_manager::stop() {
if (!_as.abort_requested()) {
co_await drain();
}
try {
co_await _schema_push.join();
} catch (...) {
mlogger.error("schema_push failed: {}", std::current_exception());
}
}
void migration_manager::plug_storage_service(service::storage_service& ss) {
_ss.plug(ss.shared_from_this());
}
future<> migration_manager::unplug_storage_service() {
return _ss.unplug();
}
future<> migration_manager::drain()
{
mlogger.info("stopping migration service");
_as.request_abort();
co_await uninit_messaging_service();
co_await _group0_barrier.join();
co_await _background_tasks.close();
}
void migration_manager::init_messaging_service()
{
ser::migration_manager_rpc_verbs::register_migration_request(&_messaging, [this] (const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options) {
return container().invoke_on(0, std::bind_front(
[] (locator::host_id src, rpc::optional<netw::schema_pull_options> options, migration_manager& self)
-> future<rpc::tuple<utils::chunked_vector<frozen_mutation>, utils::chunked_vector<canonical_mutation>>> {
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
if (!cm_retval_supported) {
// Canonical mutations support was added way back in scylla-3.2 and we don't support
// skipping versions during upgrades (certainly not a 3.2 -> 5.4 upgrade).
on_internal_error(mlogger, ::format(
"canonical mutations not supported by {}", src));
}
auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto& db = proxy.local().get_db();
semaphore_units<> guard;
if (options->group0_snapshot_transfer) {
guard = co_await self._group0_client.hold_read_apply_mutex(self._as);
}
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
if (options->group0_snapshot_transfer) {
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
co_await replica::read_tablet_mutations(db, [&] (canonical_mutation m) {
cm.emplace_back(std::move(m));
});
}
// If the schema we're returning was last modified in group 0 mode, we also need to return
// the persisted schema version so the pulling node uses it instead of calculating a schema digest.
//
// If it was modified in RECOVERY mode, we still need to return the mutation as it may contain a tombstone
// that will force the pulling node to revert to digest calculation instead of using a version that it
// could've persisted earlier.
auto group0_schema_version = co_await self._sys_ks.local().get_group0_schema_version();
if (group0_schema_version) {
cm.emplace_back(std::move(*group0_schema_version));
}
co_return rpc::tuple(utils::chunked_vector<frozen_mutation>{}, std::move(cm));
}, cinfo.retrieve_auxiliary<locator::host_id>("host_id"), std::move(options)));
});
ser::migration_manager_rpc_verbs::register_schema_check(&_messaging, [this] {
return make_ready_future<table_schema_version>(_storage_proxy.get_db().local().get_version());
});
ser::migration_manager_rpc_verbs::register_get_schema_version(&_messaging, [this] (unsigned shard, table_schema_version v) {
// FIXME: should this get an smp_service_group? Probably one separate from reads and writes.
return container().invoke_on(shard, [v] (auto&& sp) {
mlogger.debug("Schema version request for {}", v);
return local_schema_registry().get_frozen(v);
});
});
}
future<> migration_manager::uninit_messaging_service()
{
co_await ser::migration_manager_rpc_verbs::unregister(&_messaging);
}
void migration_manager::register_feature_listeners() {
auto reload_schema_in_bg = [this] {
(void) with_gate(_background_tasks, [this] {
return reload_schema().handle_exception([] (std::exception_ptr ep) {
// Due to features being unordered, reload might fail because
// some tables still have the wrong version and looking up e.g.
// the base-table of a view will fail.
mlogger.debug("Failed to reload schema: {}", ep);
});
});
};
if (this_shard_id() == 0) {
for (const gms::feature& feature : {
std::cref(_feat.table_digest_insensitive_to_expiry)}) {
if (!feature) {
_feature_listeners.push_back(feature.when_enabled(reload_schema_in_bg));
}
}
_feature_listeners.push_back(_feat.in_memory_tables.when_enabled(reload_schema_in_bg));
}
}
void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.add(listener);
}
future<> migration_notifier::unregister_listener(migration_listener* listener)
{
return _listeners.remove(listener);
}
bool migration_manager::have_schema_agreement() {
if (_gossiper.num_endpoints() == 1) {
// Us.
return true;
}
auto our_version = _storage_proxy.get_db().local().get_version();
bool match = false;
static thread_local logger::rate_limit rate_limit{std::chrono::seconds{5}};
_gossiper.for_each_endpoint_state_until([&, my_address = _gossiper.my_host_id()] (const gms::endpoint_state& eps) {
auto endpoint = eps.get_host_id();
if (endpoint == my_address || !_gossiper.is_alive(eps.get_host_id())) {
return stop_iteration::no;
}
mlogger.debug("Checking schema state for {}.", endpoint);
auto schema = eps.get_application_state_ptr(gms::application_state::SCHEMA);
if (!schema) {
mlogger.log(log_level::info, rate_limit, "Schema state not yet available for {}.", endpoint);
match = false;
return stop_iteration::yes;
}
auto remote_version = table_schema_version(utils::UUID{schema->value()});
if (our_version != remote_version) {
mlogger.log(log_level::info, rate_limit, "Schema mismatch for {} ({} != {}).",
endpoint, our_version, remote_version);
match = false;
return stop_iteration::yes;
} else {
match = true;
}
return stop_iteration::no;
});
if (match) {
mlogger.info("Schema agreement check passed.");
}
return match;
}
future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) {
while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) {
if (as) {
as->check();
}
if (db::timeout_clock::now() > deadline) {
throw std::runtime_error("Unable to reach schema agreement");
}
co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500)));
}
}
future<> migration_manager::merge_schema_from(locator::host_id src, const utils::chunked_vector<canonical_mutation>& canonical_mutations) {
canonical_mutation_merge_count++;
mlogger.debug("Applying schema mutations from {}", src);
auto& proxy = _storage_proxy;
const auto& db = proxy.get_db().local();
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
if (_as.abort_requested()) {
throw abort_requested_exception{};
}
utils::chunked_vector<mutation> mutations;
mutations.reserve(canonical_mutations.size());
try {
for (const auto& cm : canonical_mutations) {
auto& tbl = db.find_column_family(cm.column_family_id());
mutations.emplace_back(cm.to_mutation(
tbl.schema()));
}
} catch (replica::no_such_column_family& e) {
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
throw std::runtime_error(fmt::format("Error while applying schema mutations: {}", e));
}
co_await db::schema_tables::merge_schema(_sys_ks, proxy.container(), ss.get()->container(), std::move(mutations));
}
future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
utils::chunked_vector<mutation> mutations;
co_await db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), std::move(mutations), true);
}
future<> migration_notifier::on_schema_change(std::function<void(migration_listener*)> notify, std::function<std::string(std::exception_ptr)> describe_error) {
return seastar::async([this, notify = std::move(notify), describe_error = std::move(describe_error)] {
std::exception_ptr ex;
_listeners.thread_for_each([&] (migration_listener* listener) {
try {
notify(listener);
} catch (...) {
ex = std::current_exception();
mlogger.error("{}", describe_error(ex));
}
});
if (ex) {
std::rethrow_exception(std::move(ex));
}
});
}
future<> migration_notifier::create_keyspace(const sstring& ks_name) {
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_create_keyspace(ks_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Create keyspace notification failed {}: {}", ks_name, ex);
});
}
future<> migration_notifier::create_column_family(schema_ptr cfm) {
const auto& ks_name = cfm->ks_name();
const auto& cf_name = cfm->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_create_column_family(ks_name, cf_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Create column family notification failed {}.{}: {}", ks_name, cf_name, ex);
});
}
future<> migration_notifier::create_user_type(user_type type) {
const auto& ks_name = type->_keyspace;
const auto& type_name = type->get_name_as_string();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_create_user_type(ks_name, type_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Create user type notification failed {}.{}: {}", ks_name, type_name, ex);
});
}
future<> migration_notifier::create_view(view_ptr view) {
const auto& ks_name = view->ks_name();
const auto& view_name = view->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_create_view(ks_name, view_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Create view notification failed {}.{}: {}", ks_name, view_name, ex);
});
}
future<> migration_notifier::update_keyspace(const sstring& ks_name) {
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_update_keyspace(ks_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Update keyspace notification failed {}: {}", ks_name, ex);
});
}
future<> migration_notifier::update_column_family(schema_ptr cfm, bool columns_changed) {
const auto& ks_name = cfm->ks_name();
const auto& cf_name = cfm->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_update_column_family(ks_name, cf_name, columns_changed);
}, [&] (std::exception_ptr ex) {
return fmt::format("Update column family notification failed {}.{}: {}", ks_name, cf_name, ex);
});
}
future<> migration_notifier::update_user_type(user_type type) {
const auto& ks_name = type->_keyspace;
const auto& type_name = type->get_name_as_string();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_update_user_type(ks_name, type_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Update user type notification failed {}.{}: {}", ks_name, type_name, ex);
});
}
future<> migration_notifier::update_view(view_ptr view, bool columns_changed) {
const auto& ks_name = view->ks_name();
const auto& view_name = view->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_update_view(ks_name, view_name, columns_changed);
}, [&] (std::exception_ptr ex) {
return fmt::format("Update view notification failed {}.{}: {}", ks_name, view_name, ex);
});
}
future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_keyspace(ks_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop keyspace notification failed {}: {}", ks_name, ex);
});
}
future<> migration_notifier::drop_column_family(schema_ptr cfm) {
const auto& ks_name = cfm->ks_name();
const auto& cf_name = cfm->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_column_family(ks_name, cf_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop column family notification failed {}.{}: {}", ks_name, cf_name, ex);
});
}
future<> migration_notifier::drop_user_type(user_type type) {
const auto& ks_name = type->_keyspace;
const auto& type_name = type->get_name_as_string();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_user_type(ks_name, type_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop user type notification failed {}.{}: {}", ks_name, type_name, ex);
});
}
future<> migration_notifier::drop_view(view_ptr view) {
const auto& ks_name = view->ks_name();
const auto& view_name = view->cf_name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_view(ks_name, view_name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop view notification failed {}.{}: {}", ks_name, view_name, ex);
});
}
future<> migration_notifier::drop_function(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types) {
auto&& ks_name = fun_name.keyspace;
auto&& sig = auth::encode_signature(fun_name.name, arg_types);
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_function(ks_name, sig);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop function notification failed {}.{}: {}", ks_name, sig, ex);
});
}
future<> migration_notifier::drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types) {
auto&& ks_name = fun_name.keyspace;
auto&& sig = auth::encode_signature(fun_name.name, arg_types);
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_aggregate(ks_name, sig);
}, [&] (std::exception_ptr ex) {
return fmt::format("Drop aggregate notification failed {}.{}: {}", ks_name, sig, ex);
});
}
void migration_notifier::before_create_column_family(const keyspace_metadata& ksm,
const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &schema, &mutations, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_before_create_column_family(ksm, schema, mutations, timestamp);
});
}
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &cfms, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_pre_create_column_families(ksm, cfms, timestamp);
});
}
void migration_notifier::before_create_column_families(const keyspace_metadata& ksm,
const std::vector<schema_ptr>& schemas, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_before_create_column_families(ksm, schemas, mutations, timestamp);
});
}
void migration_notifier::before_update_column_family(const schema& new_schema,
const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill an update-column
listener->on_before_update_column_family(new_schema, old_schema, mutations, ts);
});
}
void migration_notifier::before_drop_column_family(const schema& schema,
utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each([&mutations, &schema, ts] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a drop-column
listener->on_before_drop_column_family(schema, mutations, ts);
});
}
void migration_notifier::before_drop_keyspace(const sstring& keyspace_name,
utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each([&mutations, &keyspace_name, ts] (migration_listener* listener) {
listener->on_before_drop_keyspace(keyspace_name, mutations, ts);
});
}
void migration_notifier::before_allocate_tablet_map(const locator::tablet_map& map,
const schema& s, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each([&map, &s, &mutations, ts] (migration_listener* listener) {
listener->on_before_allocate_tablet_map(map, s, mutations, ts);
});
}
void migration_notifier::before_allocate_tablet_map_in_notification(const locator::tablet_map& map,
const schema& s, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each_nested([&map, &s, &mutations, ts] (migration_listener* listener) {
listener->on_before_allocate_tablet_map(map, s, mutations, ts);
});
}
utils::chunked_vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
db.validate_keyspace_update(*ksm);
mlogger.info("Update Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, ts);
}
utils::chunked_vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
db.validate_new_keyspace(*ksm);
mlogger.info("Create new Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, timestamp);
}
static
future<> validate(schema_ptr schema) {
return do_for_each(schema->extensions(), [schema](auto & p) {
return p.second->validate(*schema);
});
}
static future<utils::chunked_vector<mutation>> include_keyspace(
storage_proxy& sp, const keyspace_metadata& keyspace, utils::chunked_vector<mutation> mutations) {
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
mutation m = co_await db::schema_tables::read_keyspace_mutation(sp.container(), keyspace.name());
mutations.push_back(std::move(m));
co_return std::move(mutations);
}
static future<utils::chunked_vector<mutation>> do_prepare_new_column_families_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
auto& db = sp.local_db();
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] mutable {
for (auto cfm : cfms) {
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
}
for (auto cfm : cfms) {
mlogger.info("Create new ColumnFamily: {}", cfm);
}
db.get_notifier().pre_create_column_families(ksm, cfms, timestamp);
utils::chunked_vector<mutation> mutations;
for (schema_ptr cfm : cfms) {
auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(table_muts.begin()), std::make_move_iterator(table_muts.end()));
}
db.get_notifier().before_create_column_families(ksm, cfms, mutations, timestamp);
return mutations;
}).then([&sp, &ksm](utils::chunked_vector<mutation> mutations) {
return include_keyspace(sp, ksm, std::move(mutations));
});
}
static future<utils::chunked_vector<mutation>> do_prepare_new_column_family_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
return do_prepare_new_column_families_announcement(sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
}
future<utils::chunked_vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
return validate(cfm).then([&sp, cfm, timestamp] {
try {
auto& db = sp.get_db().local();
auto ksm = db.find_keyspace(cfm->ks_name()).metadata();
return do_prepare_new_column_family_announcement(sp, *ksm, cfm, timestamp);
} catch (const replica::no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
}
});
}
future<> prepare_new_column_family_announcement(utils::chunked_vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
return prepare_new_column_families_announcement(mutations, sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
}
future<> prepare_new_column_families_announcement(utils::chunked_vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
for (auto cfm : cfms) {
try {
co_await validate(cfm);
} catch (...) {
std::throw_with_nested(std::runtime_error(seastar::format("Validation of schema extensions failed for ColumnFamily: {}", cfm)));
}
}
auto& db = sp.local_db();
// If the keyspace exists, ensure that we use the current metadata.
const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm;
auto new_mutations = co_await do_prepare_new_column_families_announcement(sp, current_ksm, cfms, timestamp);
std::move(new_mutations.begin(), new_mutations.end(), std::back_inserter(mutations));
}
future<utils::chunked_vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
warn(unimplemented::cause::VALIDATION);
co_await validate(cfm);
try {
auto& db = sp.local_db();
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
mlogger.info("Update table '{}.{}' From {} To {}", cfm->ks_name(), cfm->cf_name(), *old_schema, *cfm);
auto&& keyspace = db.find_keyspace(cfm->ks_name()).metadata();
auto mutations = co_await seastar::async([&] {
// Can call notifier when it creates new indexes, so needs to run in Seastar thread
return db::schema_tables::make_update_table_mutations(sp, keyspace, old_schema, cfm, ts);
});
for (auto&& view : view_updates) {
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto view_mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, false);
std::move(view_mutations.begin(), view_mutations.end(), std::back_inserter(mutations));
co_await coroutine::maybe_yield();
}
co_await seastar::async([&] {
db.get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
});
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
cfm->cf_name(), cfm->ks_name())));
co_return coroutine::exception(std::move(ex));
}
}
static future<utils::chunked_vector<mutation>> do_prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<utils::chunked_vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
mlogger.info("Prepare Create new User Type: {}", new_type->get_name_as_string());
return do_prepare_new_type_announcement(sp, std::move(new_type), ts);
}
future<utils::chunked_vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts) {
mlogger.info("Prepare Update User Type: {}", updated_type->get_name_as_string());
return do_prepare_new_type_announcement(sp, updated_type, ts);
}
future<utils::chunked_vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_create_function_mutations(func, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<utils::chunked_vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_drop_function_mutations(func, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<utils::chunked_vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_create_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<utils::chunked_vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_drop_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_proxy& sp, lw_shared_ptr<keyspace_metadata> ks_meta, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
using namespace db::view;
mlogger.info("Cleaning view building state for all views in keyspace {} ", ks_meta->name());
auto& sys_ks = sp.system_keyspace();
auto& vb_state_machine = sp.view_building_state_machine();
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) -> future<> {
for (auto& [id, _]: task_map) {
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
out.push_back(std::move(mut));
mlogger.trace("Aborting view building task with ID: {} because the keyspace is being dropped", id);
}
};
// Drop view building tasks - this operation will also automatically abort them if any is already started
for (auto& table: ks_meta->tables()) {
auto tid = table->id();
if (!vb_state_machine.building_state.tasks_state.contains(tid)) {
continue;
}
for (auto [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(tid)) {
for (auto& [_, views_tasks]: replica_tasks.view_tasks) {
co_await drop_all_tasks_in_task_map(views_tasks);
}
co_await drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
}
}
for (auto& view: ks_meta->views()) {
// Remove entries from `system.view_build_status_v2`
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
out.push_back(std::move(build_status_mut));
}
}
future<utils::chunked_vector<mutation>> prepare_keyspace_drop_announcement(storage_proxy& sp, const sstring& ks_name, api::timestamp_type ts) {
auto& db = sp.local_db();
if (!db.has_keyspace(ks_name)) {
throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name));
}
auto& keyspace = db.find_keyspace(ks_name);
mlogger.info("Drop Keyspace '{}'", ks_name);
return seastar::async([&sp, &keyspace, ts, ks_name] {
auto& db = sp.local_db();
auto mutations = db::schema_tables::make_drop_keyspace_mutations(db.features().cluster_schema_features(), keyspace.metadata(), ts);
if (sp.features().view_building_coordinator && keyspace.uses_tablets()) {
add_cleanup_view_building_state_drop_keyspace_mutations(sp, keyspace.metadata(), mutations, ts).get();
}
db.get_notifier().before_drop_keyspace(ks_name, mutations, ts);
return mutations;
});
}
future<utils::chunked_vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
try {
auto& db = sp.local_db();
auto& old_cfm = db.find_column_family(ks_name, cf_name);
auto& schema = old_cfm.schema();
if (schema->is_view()) {
co_await coroutine::return_exception(exceptions::invalid_request_exception("Cannot use DROP TABLE on Materialized View. (Did you mean DROP MATERIALIZED VIEW)?"));
}
auto keyspace = db.find_keyspace(ks_name).metadata();
// If drop_views is false (the default), we don't allow to delete a
// table which has views which aren't part of an index. If drop_views
// is true, we delete those views as well.
auto&& views = old_cfm.views();
if (!drop_views && views.size() > schema->all_indices().size()) {
auto explicit_view_names = views
| std::views::filter([&old_cfm](const view_ptr& v) { return !old_cfm.get_index_manager().is_index(v); })
| std::views::transform([](const view_ptr& v) { return v->cf_name(); });
co_await coroutine::return_exception(exceptions::invalid_request_exception(seastar::format("Cannot drop table when materialized views still depend on it ({}.{{{}}})",
schema->ks_name(), fmt::join(explicit_view_names, ", "))));
}
mlogger.info("Drop table '{}.{}'", schema->ks_name(), schema->cf_name());
utils::chunked_vector<mutation> drop_si_mutations;
if (!schema->all_indices().empty()) {
auto builder = schema_builder(schema).without_indexes();
drop_si_mutations = co_await seastar::async([&] {
return db::schema_tables::make_update_table_mutations(sp, keyspace, schema, builder.build(), ts);
});
}
auto mutations = db::schema_tables::make_drop_table_mutations(keyspace, schema, ts);
mutations.insert(mutations.end(), std::make_move_iterator(drop_si_mutations.begin()), std::make_move_iterator(drop_si_mutations.end()));
for (auto& v : views) {
if (!old_cfm.get_index_manager().is_index(v)) {
mlogger.info("Drop view '{}.{}' of table '{}'", v->ks_name(), v->cf_name(), schema->cf_name());
auto m = db::schema_tables::make_drop_view_mutations(keyspace, v, ts);
mutations.insert(mutations.end(), std::make_move_iterator(m.begin()), std::make_move_iterator(m.end()));
}
}
// notifiers must run in seastar thread
co_await seastar::async([&] {
for (auto& view : views) {
db.get_notifier().before_drop_column_family(*view, mutations, ts);
}
db.get_notifier().before_drop_column_family(*schema, mutations, ts);
});
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name)));
co_return coroutine::exception(std::move(ex));
}
}
future<utils::chunked_vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
auto mutations =
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, ts);
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
static future<> add_view_building_tasks_mutations(storage_proxy& sp, view_ptr view, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
using namespace db::view;
auto& db = sp.local_db();
auto& sys_ks = sp.system_keyspace();
auto base_id = view->view_info()->base_id();
auto& base_cf = db.find_column_family(base_id);
auto erm = base_cf.get_effective_replication_map();
auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(base_id);
co_await tablet_map.for_each_tablet([&] (auto tid, const auto& tablet_info) -> future<> {
auto last_token = tablet_map.get_last_token(tid);
for (auto& replica: tablet_info.replicas) {
auto id = utils::UUID_gen::get_time_UUID();
view_building_task task {
id, view_building_task::task_type::build_range, false,
base_id, view->id(), replica, last_token
};
auto mut = co_await sys_ks.make_view_building_task_mutation(ts, task);
out.push_back(std::move(mut));
mlogger.trace("Creating view building task: {} with ID: {} for replica: {}", task, id, replica);
}
});
}
future<utils::chunked_vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
co_await validate(view);
auto& db = sp.local_db();
try {
auto keyspace = db.find_keyspace(view->ks_name()).metadata();
if (keyspace->cf_meta_data().contains(view->cf_name())) {
throw exceptions::already_exists_exception(view->ks_name(), view->cf_name());
}
mlogger.info("Create new view: {}", view);
co_return co_await seastar::async([&db, keyspace = std::move(keyspace), &sp, view = std::move(view), ts] {
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, view, ts);
if (sp.features().view_building_coordinator && keyspace->uses_tablets()) {
add_view_building_tasks_mutations(sp, view, mutations, ts).get();
}
// We don't have a separate on_before_create_view() listener to
// call. But a view is also a column family, and we need to call
// the on_before_create_column_family listener - notably, to
// create tablets for the new view table.
db.get_notifier().before_create_column_family(*keyspace, *view, mutations, ts);
return include_keyspace(sp, *keyspace, std::move(mutations)).get();
});
} catch (const replica::no_such_keyspace& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.",
view->cf_name(), view->ks_name())));
co_return coroutine::exception(std::move(ex));
}
}
future<utils::chunked_vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
co_await validate(view);
auto db = sp.data_dictionary();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
if (!old_view->is_view()) {
co_await coroutine::return_exception(exceptions::invalid_request_exception("Cannot use ALTER MATERIALIZED VIEW on Table. (Did you mean ALTER TABLE)?"));
}
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true);
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const std::out_of_range& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
view->cf_name(), view->ks_name())));
co_return coroutine::exception(std::move(ex));
}
}
static future<> add_cleanup_view_building_state_drop_view_mutations(storage_proxy& sp, schema_ptr view, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
using namespace db::view;
mlogger.info("Cleaning view building state for view {} ({}.{})", view->id(), view->ks_name(), view->cf_name());
auto& sys_ks = sp.system_keyspace();
auto& vb_state_machine = sp.view_building_state_machine();
// Drop view building tasks - this operation will also automatically abort them if any is already started
auto base_id = view->view_info()->base_id();
if (vb_state_machine.building_state.tasks_state.contains(base_id)) {
for (auto& [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(base_id)) {
if (!replica_tasks.view_tasks.contains(view->id())) {
continue;
}
// Abort all view building tasks for this view
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
out.push_back(std::move(mut));
mlogger.trace("Aborting view building task with ID: {} because the view is being dropped", id);
}
}
}
// Remove entries from `system.view_build_status_v2`
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
out.push_back(std::move(build_status_mut));
}
future<utils::chunked_vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
auto& db = sp.local_db();
try {
auto& view = db.find_column_family(ks_name, cf_name).schema();
if (!view->is_view()) {
throw exceptions::invalid_request_exception("Cannot use DROP MATERIALIZED VIEW on Table. (Did you mean DROP TABLE)?");
}
if (db.find_column_family(view->view_info()->base_id()).get_index_manager().is_index(view_ptr(view))) {
throw exceptions::invalid_request_exception("Cannot use DROP MATERIALIZED VIEW on Index. (Did you mean DROP INDEX)?");
}
auto keyspace = db.find_keyspace(ks_name).metadata();
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), ts);
if (sp.features().view_building_coordinator && keyspace->uses_tablets()) {
co_await add_cleanup_view_building_state_drop_view_mutations(sp, view, mutations, ts);
}
// notifiers must run in seastar thread
co_await seastar::async([&] {
db.get_notifier().before_drop_column_family(*view, mutations, ts);
});
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
cf_name, ks_name));
}
}
template<typename mutation_type>
future<> migration_manager::announce_with_raft(utils::chunked_vector<mutation> schema, group0_guard guard, std::string_view description, std::optional<raft_timeout> timeout) {
SCYLLA_ASSERT(this_shard_id() == 0);
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features);
auto group0_cmd = _group0_client.prepare_command(
mutation_type {
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
},
guard, std::move(description));
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as, timeout.value_or(raft_timeout{}));
}
static mutation make_group0_schema_version_mutation(const data_dictionary::database db, const group0_guard& guard) {
auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
auto* cdef = s->get_column_definition("value");
SCYLLA_ASSERT(cdef);
mutation m(s, partition_key::from_singular(*s, "group0_schema_version"));
auto cell = atomic_cell::make_live(*cdef->type, guard.write_timestamp(),
cdef->type->decompose(fmt::to_string(guard.new_group0_state_id())));
m.set_clustered_cell(clustering_key::make_empty(), *cdef, std::move(cell));
return m;
}
// Precondition: GROUP0_SCHEMA_VERSIONING feature is enabled in the cluster.
//
// See the description of this column in db/schema_tables.cc.
static void add_committed_by_group0_flag(utils::chunked_vector<mutation>& schema, const group0_guard& guard) {
auto timestamp = guard.write_timestamp();
for (auto& mut: schema) {
if (mut.schema()->cf_name() != db::schema_tables::v3::SCYLLA_TABLES) {
continue;
}
auto& scylla_tables_schema = *mut.schema();
auto cdef = scylla_tables_schema.get_column_definition("committed_by_group0");
SCYLLA_ASSERT(cdef);
for (auto& cr: mut.partition().clustered_rows()) {
cr.row().cells().apply(*cdef, atomic_cell::make_live(
*cdef->type, timestamp, cdef->type->decompose(true)));
}
}
}
// Returns a future on the local application of the schema
template<typename mutation_type>
future<> migration_manager::announce(utils::chunked_vector<mutation> schema, group0_guard guard, std::string_view description, std::optional<raft_timeout> timeout) {
schema.push_back(make_group0_schema_version_mutation(_storage_proxy.data_dictionary(), guard));
add_committed_by_group0_flag(schema, guard);
return announce_with_raft<mutation_type>(std::move(schema), std::move(guard), std::move(description), std::move(timeout));
}
template
future<> migration_manager::announce_with_raft<schema_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout);
template
future<> migration_manager::announce_with_raft<topology_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout);
template
future<> migration_manager::announce<schema_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout = std::nullopt);
template
future<> migration_manager::announce<topology_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout = std::nullopt);
future<group0_guard> migration_manager::start_group0_operation(std::optional<raft_timeout> timeout) {
SCYLLA_ASSERT(this_shard_id() == 0);
return _group0_client.start_operation(_as, timeout.value_or(raft_timeout{}));
}
/**
* Announce my version passively over gossip.
* Used to notify nodes as they arrive in the cluster.
*
* @param version The schema version to announce
*/
void migration_manager::passive_announce(table_schema_version version) {
_schema_version_to_publish = version;
(void)_schema_push.trigger().handle_exception([version = std::move(version)] (std::exception_ptr ex) {
mlogger.warn("Passive announcing of version {} failed: {}. Ignored.", version, ex);
});
}
future<> migration_manager::passive_announce() {
SCYLLA_ASSERT(this_shard_id() == 0);
mlogger.info("Gossiping my schema version {}", _schema_version_to_publish);
return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish));
}
// Returns schema of given version, either from cache or from remote node identified by 'from'.
// Doesn't affect current node's schema in any way.
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& storage_proxy) {
return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst, shard] (table_schema_version v) {
mlogger.debug("Requesting schema {} from {}", v, dst);
return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, shard, v).then([&storage_proxy] (frozen_schema s) {
auto& proxy = storage_proxy.container();
// Since the latest schema version is always present in the schema registry
// we only happen to query already outdated schema version, which is
// referenced by the incoming request.
// That means the column mapping for the schema should always be inserted
// with TTL (refresh TTL in case column mapping already existed prior to that).
// We don't set the CDC schema here because it's not included in the RPC and we're
// not using raft mode.
auto us = s.unfreeze(db::schema_ctxt(proxy), nullptr);
// if this is a view - sanity check that its schema doesn't need fixing.
schema_ptr base_schema;
if (us->is_view()) {
auto& db = proxy.local().local_db();
base_schema = db.find_schema(us->view_info()->base_id());
db::schema_tables::check_no_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema);
}
return db::schema_tables::store_column_mapping(proxy, us, true).then([us, base_schema] -> extended_frozen_schema {
return extended_frozen_schema(us);
});
});
});
}
future<schema_ptr> migration_manager::get_schema_for_read(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
return get_schema_for_write(v, dst, shard, ms, as);
}
future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
if (_as.abort_requested()) {
co_return coroutine::exception(std::make_exception_ptr(abort_requested_exception()));
}
auto s = local_schema_registry().get_or_null(v);
if (!s || !s->is_synced()) {
// Schema is synchronized through Raft, so perform a group 0 read barrier.
// Batch the barriers so we don't invoke them redundantly.
mlogger.trace("Performing raft read barrier because schema is not synced, version: {}", v);
co_await _group0_barrier.trigger(as);
}
if (!s) {
// The schema returned by get_schema_definition comes (eventually) from the schema registry,
// so if it is a view, it already has base info and we don't need to set it later
s = co_await get_schema_definition(v, dst, shard, ms, _storage_proxy);
}
// Schema is synced already (through barrier above), mark it as such.
mlogger.trace("Mark schema {} as synced", v);
s->registry_entry()->mark_synced();
co_return s;
}
future<> migration_manager::sync_schema(const replica::database& db, const std::vector<locator::host_id>& nodes) {
using schema_and_hosts = std::unordered_map<table_schema_version, std::vector<locator::host_id>>;
schema_and_hosts schema_map;
co_await coroutine::parallel_for_each(nodes, [this, &schema_map, &db] (const locator::host_id& node) -> future<> {
const auto& my_version = db.get_version();
abort_source as;
auto remote_version = co_await ser::migration_manager_rpc_verbs::send_schema_check(&_messaging, node, as);
if (my_version != remote_version) {
schema_map[remote_version].emplace_back(node);
}
});
if (schema_map.empty()) {
co_return;
}
co_await _group0_barrier.trigger(_as);
}
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version v) {
schema_ptr s = local_schema_registry().get_or_null(v);
if (s) {
return make_ready_future<column_mapping>(s->get_column_mapping());
}
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
}
future<> migration_manager::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) {
return make_ready_future();
}
future<> migration_manager::on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
return make_ready_future();
}
future<> migration_manager::on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) {
return make_ready_future();
}
void migration_manager::set_concurrent_ddl_retries(size_t n) {
_concurrent_ddl_retries = n;
}
void migration_listener::on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
for (auto cfm : cfms) {
on_before_create_column_family(ksm, *cfm, mutations, timestamp);
}
}
}