Remove the rest of the code that assumes that either group0 does not exist yet or a cluster is till not upgraded to raft topology. Both of those are not supported any more. No need to backport since we remove functionality here. Closes scylladb/scylladb#28841 * github.com:scylladb/scylladb: service level: remove version 1 service level code features: move GROUP0_SCHEMA_VERSIONING to deprecated features list migration_manager: remove unused forward definitions test: remove unused code auth: drop auth_migration_listener since it does nothing now schema: drop schema_registry_entry::maybe_sync() function schema: drop make_table_deleting_mutations since it should not be needed with raft schema: remove calculate_schema_digest function schema: drop recalculate_schema_version function and its uses migration_manager: drop check for group0_schema_versioning feature cdc: drop usage of cdc_local table and v1 generation definition storage_service: no need to add yourself to the topology during reboot since raft state loading already did it storage_service: remove unused functions group0: drop with_raft() function from group0_guard since it always returns true now gossiper: do not gossip TOKENS and CDC_GENERATION_ID any more gossiper: drop tokens from loaded_endpoint_state gossiper: remove unused functions storage_service: do not pass loaded_peer_features to join_topology() storage_service: remove unused fields from replacement_info gossiper: drop is_safe_for_restart() function and its use storage_service: remove unused variables from join_topology gossiper: remove the code that was only used in gossiper topology storage_service: drop the check for raft mode from recovery code cdc: remove legacy code test: remove unused injection points auth: remove legacy auth mode and upgrade code treewide: remove schema pull code since we never pull schema any more raft topology: drop upgrade_state and its type from the topology state machine since it is not used any longer group0: hoist the checks for an illegal upgrade into main.cc api: drop get_topology_upgrade_state and always report upgrade status as done service_level_controller: drop service level upgrade code test: drop run_with_raft_recovery parameter to cql_test_env group0: get rid of group0_upgrade_state storage_service: drop topology_change_kind as it is no longer needed storage_service: drop check_ability_to_perform_topology_operation since no upgrades can happen any more service_storage: remove unused functions storage_service: remove non raft rebuild code storage_service: set topology change kind only once group0: drop in_recovery function and its uses group0: rename use_raft to maintenance_mode and make it sync
1118 lines
54 KiB
C++
1118 lines
54 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include <algorithm>
|
|
#include <ranges>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include "auth/resource.hh"
|
|
#include "locator/host_id.hh"
|
|
#include "schema/schema_registry.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include "service/storage_proxy.hh"
|
|
#include "service/raft/group0_state_machine.hh"
|
|
|
|
#include "service/migration_listener.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "db/view/view_building_state.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "utils/assert.hh"
|
|
#include "gms/gossiper.hh"
|
|
#include "view_info.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "replica/database.hh"
|
|
#include "replica/tablets.hh"
|
|
#include "db/schema_applier.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "types/user.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "cql3/functions/user_aggregate.hh"
|
|
#include "cql3/functions/user_function.hh"
|
|
#include "cql3/functions/function_name.hh"
|
|
#include "unimplemented.hh"
|
|
#include "idl/migration_manager.dist.hh"
|
|
|
|
namespace service {
|
|
|
|
static logging::logger mlogger("migration_manager");
|
|
|
|
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp);
|
|
|
|
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
|
|
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
|
|
_notifier(notifier)
|
|
, _background_tasks("migration_manager::background_tasks")
|
|
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss("migration_manager::storage_service"), _gossiper(gossiper), _group0_client(group0_client)
|
|
, _sys_ks(sysks)
|
|
, _group0_barrier(this_shard_id() == 0 ?
|
|
std::function<future<>()>([this] () -> future<> {
|
|
// This will run raft barrier and will sync schema with the leader
|
|
co_await with_scheduling_group(_gossiper.get_scheduling_group(), [this] {
|
|
return start_group0_operation().discard_result();
|
|
});
|
|
}) :
|
|
std::function<future<>()>([this] () -> future<> {
|
|
co_await container().invoke_on(0, [] (migration_manager& mm) -> future<> {
|
|
// batch group0 raft barriers
|
|
co_await mm._group0_barrier.trigger();
|
|
});
|
|
})
|
|
)
|
|
, _schema_push([this] { return passive_announce(); })
|
|
, _concurrent_ddl_retries{10}
|
|
{
|
|
init_messaging_service();
|
|
}
|
|
|
|
future<> migration_manager::stop() {
|
|
if (!_as.abort_requested()) {
|
|
co_await drain();
|
|
}
|
|
try {
|
|
co_await _schema_push.join();
|
|
} catch (...) {
|
|
mlogger.error("schema_push failed: {}", std::current_exception());
|
|
}
|
|
}
|
|
|
|
void migration_manager::plug_storage_service(service::storage_service& ss) {
|
|
_ss.plug(ss.shared_from_this());
|
|
}
|
|
|
|
future<> migration_manager::unplug_storage_service() {
|
|
return _ss.unplug();
|
|
}
|
|
|
|
future<> migration_manager::drain()
|
|
{
|
|
mlogger.info("stopping migration service");
|
|
_as.request_abort();
|
|
|
|
co_await uninit_messaging_service();
|
|
co_await _group0_barrier.join();
|
|
co_await _background_tasks.close();
|
|
}
|
|
|
|
void migration_manager::init_messaging_service()
|
|
{
|
|
ser::migration_manager_rpc_verbs::register_migration_request(&_messaging, [this] (const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options) {
|
|
return container().invoke_on(0, std::bind_front(
|
|
[] (locator::host_id src, rpc::optional<netw::schema_pull_options> options, migration_manager& self)
|
|
-> future<rpc::tuple<utils::chunked_vector<frozen_mutation>, utils::chunked_vector<canonical_mutation>>> {
|
|
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
|
|
if (!cm_retval_supported) {
|
|
// Canonical mutations support was added way back in scylla-3.2 and we don't support
|
|
// skipping versions during upgrades (certainly not a 3.2 -> 5.4 upgrade).
|
|
on_internal_error(mlogger, ::format(
|
|
"canonical mutations not supported by {}", src));
|
|
}
|
|
|
|
auto features = self._feat.cluster_schema_features();
|
|
auto& proxy = self._storage_proxy.container();
|
|
auto& db = proxy.local().get_db();
|
|
semaphore_units<> guard;
|
|
if (options->group0_snapshot_transfer) {
|
|
guard = co_await self._group0_client.hold_read_apply_mutex(self._as);
|
|
}
|
|
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
|
|
if (options->group0_snapshot_transfer) {
|
|
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
|
|
co_await replica::read_tablet_mutations(db, [&] (canonical_mutation m) {
|
|
cm.emplace_back(std::move(m));
|
|
});
|
|
}
|
|
|
|
// If the schema we're returning was last modified in group 0 mode, we also need to return
|
|
// the persisted schema version so the pulling node uses it instead of calculating a schema digest.
|
|
//
|
|
// If it was modified in RECOVERY mode, we still need to return the mutation as it may contain a tombstone
|
|
// that will force the pulling node to revert to digest calculation instead of using a version that it
|
|
// could've persisted earlier.
|
|
auto group0_schema_version = co_await self._sys_ks.local().get_group0_schema_version();
|
|
if (group0_schema_version) {
|
|
cm.emplace_back(std::move(*group0_schema_version));
|
|
}
|
|
|
|
co_return rpc::tuple(utils::chunked_vector<frozen_mutation>{}, std::move(cm));
|
|
}, cinfo.retrieve_auxiliary<locator::host_id>("host_id"), std::move(options)));
|
|
});
|
|
ser::migration_manager_rpc_verbs::register_schema_check(&_messaging, [this] {
|
|
return make_ready_future<table_schema_version>(_storage_proxy.get_db().local().get_version());
|
|
});
|
|
ser::migration_manager_rpc_verbs::register_get_schema_version(&_messaging, [this] (unsigned shard, table_schema_version v) {
|
|
// FIXME: should this get an smp_service_group? Probably one separate from reads and writes.
|
|
return container().invoke_on(shard, [v] (auto&& sp) {
|
|
mlogger.debug("Schema version request for {}", v);
|
|
return local_schema_registry().get_frozen(v);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> migration_manager::uninit_messaging_service()
|
|
{
|
|
co_await ser::migration_manager_rpc_verbs::unregister(&_messaging);
|
|
}
|
|
|
|
void migration_manager::register_feature_listeners() {
|
|
auto reload_schema_in_bg = [this] {
|
|
(void) with_gate(_background_tasks, [this] {
|
|
return reload_schema().handle_exception([] (std::exception_ptr ep) {
|
|
// Due to features being unordered, reload might fail because
|
|
// some tables still have the wrong version and looking up e.g.
|
|
// the base-table of a view will fail.
|
|
mlogger.debug("Failed to reload schema: {}", ep);
|
|
});
|
|
});
|
|
};
|
|
if (this_shard_id() == 0) {
|
|
for (const gms::feature& feature : {
|
|
std::cref(_feat.table_digest_insensitive_to_expiry)}) {
|
|
if (!feature) {
|
|
_feature_listeners.push_back(feature.when_enabled(reload_schema_in_bg));
|
|
}
|
|
}
|
|
_feature_listeners.push_back(_feat.in_memory_tables.when_enabled(reload_schema_in_bg));
|
|
}
|
|
}
|
|
|
|
void migration_notifier::register_listener(migration_listener* listener)
|
|
{
|
|
_listeners.add(listener);
|
|
}
|
|
|
|
future<> migration_notifier::unregister_listener(migration_listener* listener)
|
|
{
|
|
return _listeners.remove(listener);
|
|
}
|
|
|
|
bool migration_manager::have_schema_agreement() {
|
|
if (_gossiper.num_endpoints() == 1) {
|
|
// Us.
|
|
return true;
|
|
}
|
|
auto our_version = _storage_proxy.get_db().local().get_version();
|
|
bool match = false;
|
|
static thread_local logger::rate_limit rate_limit{std::chrono::seconds{5}};
|
|
_gossiper.for_each_endpoint_state_until([&, my_address = _gossiper.my_host_id()] (const gms::endpoint_state& eps) {
|
|
auto endpoint = eps.get_host_id();
|
|
if (endpoint == my_address || !_gossiper.is_alive(eps.get_host_id())) {
|
|
return stop_iteration::no;
|
|
}
|
|
mlogger.debug("Checking schema state for {}.", endpoint);
|
|
auto schema = eps.get_application_state_ptr(gms::application_state::SCHEMA);
|
|
if (!schema) {
|
|
mlogger.log(log_level::info, rate_limit, "Schema state not yet available for {}.", endpoint);
|
|
match = false;
|
|
return stop_iteration::yes;
|
|
}
|
|
auto remote_version = table_schema_version(utils::UUID{schema->value()});
|
|
if (our_version != remote_version) {
|
|
mlogger.log(log_level::info, rate_limit, "Schema mismatch for {} ({} != {}).",
|
|
endpoint, our_version, remote_version);
|
|
match = false;
|
|
return stop_iteration::yes;
|
|
} else {
|
|
match = true;
|
|
}
|
|
return stop_iteration::no;
|
|
});
|
|
if (match) {
|
|
mlogger.info("Schema agreement check passed.");
|
|
}
|
|
return match;
|
|
}
|
|
|
|
future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) {
|
|
while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) {
|
|
if (as) {
|
|
as->check();
|
|
}
|
|
if (db::timeout_clock::now() > deadline) {
|
|
throw std::runtime_error("Unable to reach schema agreement");
|
|
}
|
|
co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500)));
|
|
}
|
|
}
|
|
|
|
future<> migration_manager::merge_schema_from(locator::host_id src, const utils::chunked_vector<canonical_mutation>& canonical_mutations) {
|
|
canonical_mutation_merge_count++;
|
|
mlogger.debug("Applying schema mutations from {}", src);
|
|
auto& proxy = _storage_proxy;
|
|
const auto& db = proxy.get_db().local();
|
|
auto ss = _ss.get_permit();
|
|
if (!ss) {
|
|
co_return;
|
|
}
|
|
|
|
if (_as.abort_requested()) {
|
|
throw abort_requested_exception{};
|
|
}
|
|
|
|
utils::chunked_vector<mutation> mutations;
|
|
mutations.reserve(canonical_mutations.size());
|
|
try {
|
|
for (const auto& cm : canonical_mutations) {
|
|
auto& tbl = db.find_column_family(cm.column_family_id());
|
|
mutations.emplace_back(cm.to_mutation(
|
|
tbl.schema()));
|
|
}
|
|
} catch (replica::no_such_column_family& e) {
|
|
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
|
|
throw std::runtime_error(fmt::format("Error while applying schema mutations: {}", e));
|
|
}
|
|
co_await db::schema_tables::merge_schema(_sys_ks, proxy.container(), ss.get()->container(), std::move(mutations));
|
|
}
|
|
|
|
future<> migration_manager::reload_schema() {
|
|
mlogger.info("Reloading schema");
|
|
auto ss = _ss.get_permit();
|
|
if (!ss) {
|
|
co_return;
|
|
}
|
|
utils::chunked_vector<mutation> mutations;
|
|
co_await db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), std::move(mutations), true);
|
|
}
|
|
|
|
future<> migration_notifier::on_schema_change(std::function<void(migration_listener*)> notify, std::function<std::string(std::exception_ptr)> describe_error) {
|
|
return seastar::async([this, notify = std::move(notify), describe_error = std::move(describe_error)] {
|
|
std::exception_ptr ex;
|
|
_listeners.thread_for_each([&] (migration_listener* listener) {
|
|
try {
|
|
notify(listener);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
mlogger.error("{}", describe_error(ex));
|
|
}
|
|
});
|
|
if (ex) {
|
|
std::rethrow_exception(std::move(ex));
|
|
}
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::create_keyspace(const sstring& ks_name) {
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_create_keyspace(ks_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Create keyspace notification failed {}: {}", ks_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::create_column_family(schema_ptr cfm) {
|
|
const auto& ks_name = cfm->ks_name();
|
|
const auto& cf_name = cfm->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_create_column_family(ks_name, cf_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Create column family notification failed {}.{}: {}", ks_name, cf_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::create_user_type(user_type type) {
|
|
const auto& ks_name = type->_keyspace;
|
|
const auto& type_name = type->get_name_as_string();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_create_user_type(ks_name, type_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Create user type notification failed {}.{}: {}", ks_name, type_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::create_view(view_ptr view) {
|
|
const auto& ks_name = view->ks_name();
|
|
const auto& view_name = view->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_create_view(ks_name, view_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Create view notification failed {}.{}: {}", ks_name, view_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::update_keyspace(const sstring& ks_name) {
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_update_keyspace(ks_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Update keyspace notification failed {}: {}", ks_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::update_column_family(schema_ptr cfm, bool columns_changed) {
|
|
const auto& ks_name = cfm->ks_name();
|
|
const auto& cf_name = cfm->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_update_column_family(ks_name, cf_name, columns_changed);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Update column family notification failed {}.{}: {}", ks_name, cf_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::update_user_type(user_type type) {
|
|
const auto& ks_name = type->_keyspace;
|
|
const auto& type_name = type->get_name_as_string();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_update_user_type(ks_name, type_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Update user type notification failed {}.{}: {}", ks_name, type_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::update_view(view_ptr view, bool columns_changed) {
|
|
const auto& ks_name = view->ks_name();
|
|
const auto& view_name = view->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_update_view(ks_name, view_name, columns_changed);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Update view notification failed {}.{}: {}", ks_name, view_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_keyspace(ks_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop keyspace notification failed {}: {}", ks_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_column_family(schema_ptr cfm) {
|
|
const auto& ks_name = cfm->ks_name();
|
|
const auto& cf_name = cfm->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_column_family(ks_name, cf_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop column family notification failed {}.{}: {}", ks_name, cf_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_user_type(user_type type) {
|
|
const auto& ks_name = type->_keyspace;
|
|
const auto& type_name = type->get_name_as_string();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_user_type(ks_name, type_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop user type notification failed {}.{}: {}", ks_name, type_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_view(view_ptr view) {
|
|
const auto& ks_name = view->ks_name();
|
|
const auto& view_name = view->cf_name();
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_view(ks_name, view_name);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop view notification failed {}.{}: {}", ks_name, view_name, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_function(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types) {
|
|
auto&& ks_name = fun_name.keyspace;
|
|
auto&& sig = auth::encode_signature(fun_name.name, arg_types);
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_function(ks_name, sig);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop function notification failed {}.{}: {}", ks_name, sig, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_notifier::drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types) {
|
|
auto&& ks_name = fun_name.keyspace;
|
|
auto&& sig = auth::encode_signature(fun_name.name, arg_types);
|
|
co_await on_schema_change([&] (migration_listener* listener) {
|
|
listener->on_drop_aggregate(ks_name, sig);
|
|
}, [&] (std::exception_ptr ex) {
|
|
return fmt::format("Drop aggregate notification failed {}.{}: {}", ks_name, sig, ex);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_create_column_family(const keyspace_metadata& ksm,
|
|
const schema& schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
|
|
_listeners.thread_for_each([&ksm, &schema, &mutations, timestamp] (migration_listener* listener) {
|
|
// allow exceptions. so a listener can effectively kill a create-table
|
|
listener->on_before_create_column_family(ksm, schema, mutations, timestamp);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type timestamp) {
|
|
_listeners.thread_for_each([&ksm, &cfms, timestamp] (migration_listener* listener) {
|
|
// allow exceptions. so a listener can effectively kill a create-table
|
|
listener->on_pre_create_column_families(ksm, cfms, timestamp);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_create_column_families(const keyspace_metadata& ksm,
|
|
const std::vector<schema_ptr>& schemas, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
|
|
_listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) {
|
|
// allow exceptions. so a listener can effectively kill a create-table
|
|
listener->on_before_create_column_families(ksm, schemas, mutations, timestamp);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_update_column_family(const schema& new_schema,
|
|
const schema& old_schema, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
|
|
_listeners.thread_for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) {
|
|
// allow exceptions. so a listener can effectively kill an update-column
|
|
listener->on_before_update_column_family(new_schema, old_schema, mutations, ts);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_drop_column_family(const schema& schema,
|
|
utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
|
|
_listeners.thread_for_each([&mutations, &schema, ts] (migration_listener* listener) {
|
|
// allow exceptions. so a listener can effectively kill a drop-column
|
|
listener->on_before_drop_column_family(schema, mutations, ts);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_drop_keyspace(const sstring& keyspace_name,
|
|
utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
|
|
_listeners.thread_for_each([&mutations, &keyspace_name, ts] (migration_listener* listener) {
|
|
listener->on_before_drop_keyspace(keyspace_name, mutations, ts);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_allocate_tablet_map(const locator::tablet_map& map,
|
|
const schema& s, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
|
|
_listeners.thread_for_each([&map, &s, &mutations, ts] (migration_listener* listener) {
|
|
listener->on_before_allocate_tablet_map(map, s, mutations, ts);
|
|
});
|
|
}
|
|
|
|
void migration_notifier::before_allocate_tablet_map_in_notification(const locator::tablet_map& map,
|
|
const schema& s, utils::chunked_vector<mutation>& mutations, api::timestamp_type ts) {
|
|
_listeners.thread_for_each_nested([&map, &s, &mutations, ts] (migration_listener* listener) {
|
|
listener->on_before_allocate_tablet_map(map, s, mutations, ts);
|
|
});
|
|
}
|
|
|
|
utils::chunked_vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
|
|
db.validate_keyspace_update(*ksm);
|
|
mlogger.info("Update Keyspace: {}", ksm);
|
|
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, ts);
|
|
}
|
|
|
|
utils::chunked_vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
|
|
db.validate_new_keyspace(*ksm);
|
|
mlogger.info("Create new Keyspace: {}", ksm);
|
|
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, timestamp);
|
|
}
|
|
|
|
static
|
|
future<> validate(schema_ptr schema) {
|
|
return do_for_each(schema->extensions(), [schema](auto & p) {
|
|
return p.second->validate(*schema);
|
|
});
|
|
}
|
|
|
|
static future<utils::chunked_vector<mutation>> include_keyspace(
|
|
storage_proxy& sp, const keyspace_metadata& keyspace, utils::chunked_vector<mutation> mutations) {
|
|
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
|
mutation m = co_await db::schema_tables::read_keyspace_mutation(sp.container(), keyspace.name());
|
|
mutations.push_back(std::move(m));
|
|
co_return std::move(mutations);
|
|
}
|
|
|
|
static future<utils::chunked_vector<mutation>> do_prepare_new_column_families_announcement(storage_proxy& sp,
|
|
const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
|
|
auto& db = sp.local_db();
|
|
|
|
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] mutable {
|
|
for (auto cfm : cfms) {
|
|
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
|
|
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
|
|
}
|
|
if (db.column_family_exists(cfm->id())) {
|
|
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
|
|
}
|
|
}
|
|
|
|
for (auto cfm : cfms) {
|
|
mlogger.info("Create new ColumnFamily: {}", cfm);
|
|
}
|
|
|
|
db.get_notifier().pre_create_column_families(ksm, cfms, timestamp);
|
|
|
|
utils::chunked_vector<mutation> mutations;
|
|
for (schema_ptr cfm : cfms) {
|
|
auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(table_muts.begin()), std::make_move_iterator(table_muts.end()));
|
|
}
|
|
db.get_notifier().before_create_column_families(ksm, cfms, mutations, timestamp);
|
|
return mutations;
|
|
}).then([&sp, &ksm](utils::chunked_vector<mutation> mutations) {
|
|
return include_keyspace(sp, ksm, std::move(mutations));
|
|
});
|
|
}
|
|
|
|
static future<utils::chunked_vector<mutation>> do_prepare_new_column_family_announcement(storage_proxy& sp,
|
|
const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
|
|
return do_prepare_new_column_families_announcement(sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
|
|
return validate(cfm).then([&sp, cfm, timestamp] {
|
|
try {
|
|
auto& db = sp.get_db().local();
|
|
auto ksm = db.find_keyspace(cfm->ks_name()).metadata();
|
|
return do_prepare_new_column_family_announcement(sp, *ksm, cfm, timestamp);
|
|
} catch (const replica::no_such_keyspace& e) {
|
|
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
|
|
}
|
|
});
|
|
}
|
|
|
|
future<> prepare_new_column_family_announcement(utils::chunked_vector<mutation>& mutations,
|
|
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
|
|
return prepare_new_column_families_announcement(mutations, sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
|
|
}
|
|
|
|
future<> prepare_new_column_families_announcement(utils::chunked_vector<mutation>& mutations,
|
|
storage_proxy& sp, const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
|
|
for (auto cfm : cfms) {
|
|
try {
|
|
co_await validate(cfm);
|
|
} catch (...) {
|
|
std::throw_with_nested(std::runtime_error(seastar::format("Validation of schema extensions failed for ColumnFamily: {}", cfm)));
|
|
}
|
|
}
|
|
auto& db = sp.local_db();
|
|
// If the keyspace exists, ensure that we use the current metadata.
|
|
const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm;
|
|
auto new_mutations = co_await do_prepare_new_column_families_announcement(sp, current_ksm, cfms, timestamp);
|
|
std::move(new_mutations.begin(), new_mutations.end(), std::back_inserter(mutations));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
|
|
schema_ptr cfm, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
|
|
warn(unimplemented::cause::VALIDATION);
|
|
co_await validate(cfm);
|
|
try {
|
|
auto& db = sp.local_db();
|
|
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
|
|
mlogger.info("Update table '{}.{}' From {} To {}", cfm->ks_name(), cfm->cf_name(), *old_schema, *cfm);
|
|
auto&& keyspace = db.find_keyspace(cfm->ks_name()).metadata();
|
|
|
|
auto mutations = co_await seastar::async([&] {
|
|
// Can call notifier when it creates new indexes, so needs to run in Seastar thread
|
|
return db::schema_tables::make_update_table_mutations(sp, keyspace, old_schema, cfm, ts);
|
|
});
|
|
for (auto&& view : view_updates) {
|
|
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
|
|
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
|
|
auto view_mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, false);
|
|
std::move(view_mutations.begin(), view_mutations.end(), std::back_inserter(mutations));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_await seastar::async([&] {
|
|
db.get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
|
|
});
|
|
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
|
|
} catch (const replica::no_such_column_family& e) {
|
|
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
|
|
cfm->cf_name(), cfm->ks_name())));
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
static future<utils::chunked_vector<mutation>> do_prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
|
|
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
|
|
mlogger.info("Prepare Create new User Type: {}", new_type->get_name_as_string());
|
|
return do_prepare_new_type_announcement(sp, std::move(new_type), ts);
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts) {
|
|
mlogger.info("Prepare Update User Type: {}", updated_type->get_name_as_string());
|
|
return do_prepare_new_type_announcement(sp, updated_type, ts);
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(func->name().keyspace);
|
|
auto mutations = db::schema_tables::make_create_function_mutations(func, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(func->name().keyspace);
|
|
auto mutations = db::schema_tables::make_drop_function_mutations(func, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
|
|
auto mutations = db::schema_tables::make_create_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
|
|
auto mutations = db::schema_tables::make_drop_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_proxy& sp, lw_shared_ptr<keyspace_metadata> ks_meta, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
|
|
using namespace db::view;
|
|
mlogger.info("Cleaning view building state for all views in keyspace {} ", ks_meta->name());
|
|
|
|
auto& sys_ks = sp.system_keyspace();
|
|
auto& vb_state_machine = sp.view_building_state_machine();
|
|
|
|
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) -> future<> {
|
|
for (auto& [id, _]: task_map) {
|
|
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
|
|
out.push_back(std::move(mut));
|
|
mlogger.trace("Aborting view building task with ID: {} because the keyspace is being dropped", id);
|
|
}
|
|
};
|
|
|
|
// Drop view building tasks - this operation will also automatically abort them if any is already started
|
|
for (auto& table: ks_meta->tables()) {
|
|
auto tid = table->id();
|
|
if (!vb_state_machine.building_state.tasks_state.contains(tid)) {
|
|
continue;
|
|
}
|
|
|
|
for (auto [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(tid)) {
|
|
for (auto& [_, views_tasks]: replica_tasks.view_tasks) {
|
|
co_await drop_all_tasks_in_task_map(views_tasks);
|
|
}
|
|
co_await drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
|
|
}
|
|
}
|
|
|
|
for (auto& view: ks_meta->views()) {
|
|
// Remove entries from `system.view_build_status_v2`
|
|
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
|
out.push_back(std::move(build_status_mut));
|
|
}
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_keyspace_drop_announcement(storage_proxy& sp, const sstring& ks_name, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
if (!db.has_keyspace(ks_name)) {
|
|
throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name));
|
|
}
|
|
auto& keyspace = db.find_keyspace(ks_name);
|
|
mlogger.info("Drop Keyspace '{}'", ks_name);
|
|
return seastar::async([&sp, &keyspace, ts, ks_name] {
|
|
auto& db = sp.local_db();
|
|
auto mutations = db::schema_tables::make_drop_keyspace_mutations(db.features().cluster_schema_features(), keyspace.metadata(), ts);
|
|
if (sp.features().view_building_coordinator && keyspace.uses_tablets()) {
|
|
add_cleanup_view_building_state_drop_keyspace_mutations(sp, keyspace.metadata(), mutations, ts).get();
|
|
}
|
|
db.get_notifier().before_drop_keyspace(ks_name, mutations, ts);
|
|
return mutations;
|
|
});
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
|
|
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
|
|
try {
|
|
auto& db = sp.local_db();
|
|
auto& old_cfm = db.find_column_family(ks_name, cf_name);
|
|
auto& schema = old_cfm.schema();
|
|
if (schema->is_view()) {
|
|
co_await coroutine::return_exception(exceptions::invalid_request_exception("Cannot use DROP TABLE on Materialized View. (Did you mean DROP MATERIALIZED VIEW)?"));
|
|
}
|
|
auto keyspace = db.find_keyspace(ks_name).metadata();
|
|
|
|
// If drop_views is false (the default), we don't allow to delete a
|
|
// table which has views which aren't part of an index. If drop_views
|
|
// is true, we delete those views as well.
|
|
auto&& views = old_cfm.views();
|
|
if (!drop_views && views.size() > schema->all_indices().size()) {
|
|
auto explicit_view_names = views
|
|
| std::views::filter([&old_cfm](const view_ptr& v) { return !old_cfm.get_index_manager().is_index(v); })
|
|
| std::views::transform([](const view_ptr& v) { return v->cf_name(); });
|
|
co_await coroutine::return_exception(exceptions::invalid_request_exception(seastar::format("Cannot drop table when materialized views still depend on it ({}.{{{}}})",
|
|
schema->ks_name(), fmt::join(explicit_view_names, ", "))));
|
|
}
|
|
mlogger.info("Drop table '{}.{}'", schema->ks_name(), schema->cf_name());
|
|
|
|
utils::chunked_vector<mutation> drop_si_mutations;
|
|
if (!schema->all_indices().empty()) {
|
|
auto builder = schema_builder(schema).without_indexes();
|
|
drop_si_mutations = co_await seastar::async([&] {
|
|
return db::schema_tables::make_update_table_mutations(sp, keyspace, schema, builder.build(), ts);
|
|
});
|
|
}
|
|
auto mutations = db::schema_tables::make_drop_table_mutations(keyspace, schema, ts);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(drop_si_mutations.begin()), std::make_move_iterator(drop_si_mutations.end()));
|
|
for (auto& v : views) {
|
|
if (!old_cfm.get_index_manager().is_index(v)) {
|
|
mlogger.info("Drop view '{}.{}' of table '{}'", v->ks_name(), v->cf_name(), schema->cf_name());
|
|
auto m = db::schema_tables::make_drop_view_mutations(keyspace, v, ts);
|
|
mutations.insert(mutations.end(), std::make_move_iterator(m.begin()), std::make_move_iterator(m.end()));
|
|
}
|
|
}
|
|
|
|
// notifiers must run in seastar thread
|
|
co_await seastar::async([&] {
|
|
for (auto& view : views) {
|
|
db.get_notifier().before_drop_column_family(*view, mutations, ts);
|
|
}
|
|
db.get_notifier().before_drop_column_family(*schema, mutations, ts);
|
|
});
|
|
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
|
|
} catch (const replica::no_such_column_family& e) {
|
|
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name)));
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
|
|
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
|
|
auto mutations =
|
|
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, ts);
|
|
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
|
|
}
|
|
|
|
static future<> add_view_building_tasks_mutations(storage_proxy& sp, view_ptr view, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
|
|
using namespace db::view;
|
|
|
|
auto& db = sp.local_db();
|
|
auto& sys_ks = sp.system_keyspace();
|
|
|
|
auto base_id = view->view_info()->base_id();
|
|
auto& base_cf = db.find_column_family(base_id);
|
|
auto erm = base_cf.get_effective_replication_map();
|
|
auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(base_id);
|
|
|
|
co_await tablet_map.for_each_tablet([&] (auto tid, const auto& tablet_info) -> future<> {
|
|
auto last_token = tablet_map.get_last_token(tid);
|
|
for (auto& replica: tablet_info.replicas) {
|
|
auto id = utils::UUID_gen::get_time_UUID();
|
|
view_building_task task {
|
|
id, view_building_task::task_type::build_range, false,
|
|
base_id, view->id(), replica, last_token
|
|
};
|
|
|
|
auto mut = co_await sys_ks.make_view_building_task_mutation(ts, task);
|
|
out.push_back(std::move(mut));
|
|
mlogger.trace("Creating view building task: {} with ID: {} for replica: {}", task, id, replica);
|
|
}
|
|
});
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
|
|
co_await validate(view);
|
|
auto& db = sp.local_db();
|
|
|
|
try {
|
|
auto keyspace = db.find_keyspace(view->ks_name()).metadata();
|
|
if (keyspace->cf_meta_data().contains(view->cf_name())) {
|
|
throw exceptions::already_exists_exception(view->ks_name(), view->cf_name());
|
|
}
|
|
mlogger.info("Create new view: {}", view);
|
|
co_return co_await seastar::async([&db, keyspace = std::move(keyspace), &sp, view = std::move(view), ts] {
|
|
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, view, ts);
|
|
if (sp.features().view_building_coordinator && keyspace->uses_tablets()) {
|
|
add_view_building_tasks_mutations(sp, view, mutations, ts).get();
|
|
}
|
|
// We don't have a separate on_before_create_view() listener to
|
|
// call. But a view is also a column family, and we need to call
|
|
// the on_before_create_column_family listener - notably, to
|
|
// create tablets for the new view table.
|
|
db.get_notifier().before_create_column_family(*keyspace, *view, mutations, ts);
|
|
return include_keyspace(sp, *keyspace, std::move(mutations)).get();
|
|
});
|
|
} catch (const replica::no_such_keyspace& e) {
|
|
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.",
|
|
view->cf_name(), view->ks_name())));
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
|
|
co_await validate(view);
|
|
auto db = sp.data_dictionary();
|
|
try {
|
|
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
|
|
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
|
|
if (!old_view->is_view()) {
|
|
co_await coroutine::return_exception(exceptions::invalid_request_exception("Cannot use ALTER MATERIALIZED VIEW on Table. (Did you mean ALTER TABLE)?"));
|
|
}
|
|
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
|
|
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true);
|
|
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
|
|
} catch (const std::out_of_range& e) {
|
|
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
|
|
view->cf_name(), view->ks_name())));
|
|
co_return coroutine::exception(std::move(ex));
|
|
}
|
|
}
|
|
|
|
static future<> add_cleanup_view_building_state_drop_view_mutations(storage_proxy& sp, schema_ptr view, utils::chunked_vector<mutation>& out, api::timestamp_type ts) {
|
|
using namespace db::view;
|
|
mlogger.info("Cleaning view building state for view {} ({}.{})", view->id(), view->ks_name(), view->cf_name());
|
|
|
|
auto& sys_ks = sp.system_keyspace();
|
|
auto& vb_state_machine = sp.view_building_state_machine();
|
|
|
|
// Drop view building tasks - this operation will also automatically abort them if any is already started
|
|
auto base_id = view->view_info()->base_id();
|
|
if (vb_state_machine.building_state.tasks_state.contains(base_id)) {
|
|
for (auto& [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(base_id)) {
|
|
if (!replica_tasks.view_tasks.contains(view->id())) {
|
|
continue;
|
|
}
|
|
|
|
// Abort all view building tasks for this view
|
|
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
|
|
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
|
|
out.push_back(std::move(mut));
|
|
mlogger.trace("Aborting view building task with ID: {} because the view is being dropped", id);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove entries from `system.view_build_status_v2`
|
|
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
|
out.push_back(std::move(build_status_mut));
|
|
}
|
|
|
|
future<utils::chunked_vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
|
|
auto& db = sp.local_db();
|
|
try {
|
|
auto& view = db.find_column_family(ks_name, cf_name).schema();
|
|
if (!view->is_view()) {
|
|
throw exceptions::invalid_request_exception("Cannot use DROP MATERIALIZED VIEW on Table. (Did you mean DROP TABLE)?");
|
|
}
|
|
if (db.find_column_family(view->view_info()->base_id()).get_index_manager().is_index(view_ptr(view))) {
|
|
throw exceptions::invalid_request_exception("Cannot use DROP MATERIALIZED VIEW on Index. (Did you mean DROP INDEX)?");
|
|
}
|
|
auto keyspace = db.find_keyspace(ks_name).metadata();
|
|
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
|
|
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), ts);
|
|
if (sp.features().view_building_coordinator && keyspace->uses_tablets()) {
|
|
co_await add_cleanup_view_building_state_drop_view_mutations(sp, view, mutations, ts);
|
|
}
|
|
// notifiers must run in seastar thread
|
|
co_await seastar::async([&] {
|
|
db.get_notifier().before_drop_column_family(*view, mutations, ts);
|
|
});
|
|
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
|
|
} catch (const replica::no_such_column_family& e) {
|
|
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
|
|
cf_name, ks_name));
|
|
}
|
|
}
|
|
|
|
template<typename mutation_type>
|
|
future<> migration_manager::announce_with_raft(utils::chunked_vector<mutation> schema, group0_guard guard, std::string_view description, std::optional<raft_timeout> timeout) {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
auto schema_features = _feat.cluster_schema_features();
|
|
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features);
|
|
|
|
auto group0_cmd = _group0_client.prepare_command(
|
|
mutation_type {
|
|
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
|
|
},
|
|
guard, std::move(description));
|
|
|
|
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as, timeout.value_or(raft_timeout{}));
|
|
}
|
|
|
|
static mutation make_group0_schema_version_mutation(const data_dictionary::database db, const group0_guard& guard) {
|
|
auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
auto* cdef = s->get_column_definition("value");
|
|
SCYLLA_ASSERT(cdef);
|
|
|
|
mutation m(s, partition_key::from_singular(*s, "group0_schema_version"));
|
|
auto cell = atomic_cell::make_live(*cdef->type, guard.write_timestamp(),
|
|
cdef->type->decompose(fmt::to_string(guard.new_group0_state_id())));
|
|
m.set_clustered_cell(clustering_key::make_empty(), *cdef, std::move(cell));
|
|
return m;
|
|
}
|
|
|
|
// Precondition: GROUP0_SCHEMA_VERSIONING feature is enabled in the cluster.
|
|
//
|
|
// See the description of this column in db/schema_tables.cc.
|
|
static void add_committed_by_group0_flag(utils::chunked_vector<mutation>& schema, const group0_guard& guard) {
|
|
auto timestamp = guard.write_timestamp();
|
|
|
|
for (auto& mut: schema) {
|
|
if (mut.schema()->cf_name() != db::schema_tables::v3::SCYLLA_TABLES) {
|
|
continue;
|
|
}
|
|
|
|
auto& scylla_tables_schema = *mut.schema();
|
|
auto cdef = scylla_tables_schema.get_column_definition("committed_by_group0");
|
|
SCYLLA_ASSERT(cdef);
|
|
|
|
for (auto& cr: mut.partition().clustered_rows()) {
|
|
cr.row().cells().apply(*cdef, atomic_cell::make_live(
|
|
*cdef->type, timestamp, cdef->type->decompose(true)));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns a future on the local application of the schema
|
|
template<typename mutation_type>
|
|
future<> migration_manager::announce(utils::chunked_vector<mutation> schema, group0_guard guard, std::string_view description, std::optional<raft_timeout> timeout) {
|
|
schema.push_back(make_group0_schema_version_mutation(_storage_proxy.data_dictionary(), guard));
|
|
add_committed_by_group0_flag(schema, guard);
|
|
return announce_with_raft<mutation_type>(std::move(schema), std::move(guard), std::move(description), std::move(timeout));
|
|
}
|
|
template
|
|
future<> migration_manager::announce_with_raft<schema_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout);
|
|
template
|
|
future<> migration_manager::announce_with_raft<topology_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout);
|
|
|
|
template
|
|
future<> migration_manager::announce<schema_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout = std::nullopt);
|
|
template
|
|
future<> migration_manager::announce<topology_change>(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout = std::nullopt);
|
|
|
|
future<group0_guard> migration_manager::start_group0_operation(std::optional<raft_timeout> timeout) {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
return _group0_client.start_operation(_as, timeout.value_or(raft_timeout{}));
|
|
}
|
|
|
|
/**
|
|
* Announce my version passively over gossip.
|
|
* Used to notify nodes as they arrive in the cluster.
|
|
*
|
|
* @param version The schema version to announce
|
|
*/
|
|
void migration_manager::passive_announce(table_schema_version version) {
|
|
_schema_version_to_publish = version;
|
|
(void)_schema_push.trigger().handle_exception([version = std::move(version)] (std::exception_ptr ex) {
|
|
mlogger.warn("Passive announcing of version {} failed: {}. Ignored.", version, ex);
|
|
});
|
|
}
|
|
|
|
future<> migration_manager::passive_announce() {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
mlogger.info("Gossiping my schema version {}", _schema_version_to_publish);
|
|
return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish));
|
|
}
|
|
|
|
// Returns schema of given version, either from cache or from remote node identified by 'from'.
|
|
// Doesn't affect current node's schema in any way.
|
|
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& storage_proxy) {
|
|
return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst, shard] (table_schema_version v) {
|
|
mlogger.debug("Requesting schema {} from {}", v, dst);
|
|
return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, shard, v).then([&storage_proxy] (frozen_schema s) {
|
|
auto& proxy = storage_proxy.container();
|
|
// Since the latest schema version is always present in the schema registry
|
|
// we only happen to query already outdated schema version, which is
|
|
// referenced by the incoming request.
|
|
// That means the column mapping for the schema should always be inserted
|
|
// with TTL (refresh TTL in case column mapping already existed prior to that).
|
|
// We don't set the CDC schema here because it's not included in the RPC and we're
|
|
// not using raft mode.
|
|
auto us = s.unfreeze(db::schema_ctxt(proxy), nullptr);
|
|
// if this is a view - sanity check that its schema doesn't need fixing.
|
|
schema_ptr base_schema;
|
|
if (us->is_view()) {
|
|
auto& db = proxy.local().local_db();
|
|
base_schema = db.find_schema(us->view_info()->base_id());
|
|
db::schema_tables::check_no_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema);
|
|
}
|
|
return db::schema_tables::store_column_mapping(proxy, us, true).then([us, base_schema] -> extended_frozen_schema {
|
|
return extended_frozen_schema(us);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<schema_ptr> migration_manager::get_schema_for_read(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
|
|
return get_schema_for_write(v, dst, shard, ms, as);
|
|
}
|
|
|
|
future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
|
|
if (_as.abort_requested()) {
|
|
co_return coroutine::exception(std::make_exception_ptr(abort_requested_exception()));
|
|
}
|
|
|
|
auto s = local_schema_registry().get_or_null(v);
|
|
if (!s || !s->is_synced()) {
|
|
// Schema is synchronized through Raft, so perform a group 0 read barrier.
|
|
// Batch the barriers so we don't invoke them redundantly.
|
|
mlogger.trace("Performing raft read barrier because schema is not synced, version: {}", v);
|
|
co_await _group0_barrier.trigger(as);
|
|
}
|
|
|
|
if (!s) {
|
|
// The schema returned by get_schema_definition comes (eventually) from the schema registry,
|
|
// so if it is a view, it already has base info and we don't need to set it later
|
|
s = co_await get_schema_definition(v, dst, shard, ms, _storage_proxy);
|
|
}
|
|
|
|
// Schema is synced already (through barrier above), mark it as such.
|
|
mlogger.trace("Mark schema {} as synced", v);
|
|
s->registry_entry()->mark_synced();
|
|
co_return s;
|
|
}
|
|
|
|
future<> migration_manager::sync_schema(const replica::database& db, const std::vector<locator::host_id>& nodes) {
|
|
using schema_and_hosts = std::unordered_map<table_schema_version, std::vector<locator::host_id>>;
|
|
schema_and_hosts schema_map;
|
|
co_await coroutine::parallel_for_each(nodes, [this, &schema_map, &db] (const locator::host_id& node) -> future<> {
|
|
const auto& my_version = db.get_version();
|
|
abort_source as;
|
|
auto remote_version = co_await ser::migration_manager_rpc_verbs::send_schema_check(&_messaging, node, as);
|
|
if (my_version != remote_version) {
|
|
schema_map[remote_version].emplace_back(node);
|
|
}
|
|
});
|
|
if (schema_map.empty()) {
|
|
co_return;
|
|
}
|
|
co_await _group0_barrier.trigger(_as);
|
|
}
|
|
|
|
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version v) {
|
|
schema_ptr s = local_schema_registry().get_or_null(v);
|
|
if (s) {
|
|
return make_ready_future<column_mapping>(s->get_column_mapping());
|
|
}
|
|
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
|
|
}
|
|
|
|
future<> migration_manager::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) {
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> migration_manager::on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> migration_manager::on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) {
|
|
return make_ready_future();
|
|
}
|
|
|
|
void migration_manager::set_concurrent_ddl_retries(size_t n) {
|
|
_concurrent_ddl_retries = n;
|
|
}
|
|
|
|
void migration_listener::on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp) {
|
|
for (auto cfm : cfms) {
|
|
on_before_create_column_family(ksm, *cfm, mutations, timestamp);
|
|
}
|
|
}
|
|
|
|
}
|