diff --git a/configure.py b/configure.py index 11d28287e6..95e90e0d9b 100755 --- a/configure.py +++ b/configure.py @@ -390,6 +390,7 @@ urchin_core = (['database.cc', 'utils/uuid.cc', 'types.cc', 'validation.cc', + 'service/migration_manager.cc', 'service/storage_proxy.cc', 'cql3/operator.cc', 'cql3/relation.cc', diff --git a/service/migration_manager.cc b/service/migration_manager.cc new file mode 100644 index 0000000000..05c34efd86 --- /dev/null +++ b/service/migration_manager.cc @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include "service/migration_manager.hh" + +namespace service { + +#if 0 +public void register(IMigrationListener listener) +{ + listeners.add(listener); +} + +public void unregister(IMigrationListener listener) +{ + listeners.remove(listener); +} + +public void scheduleSchemaPull(InetAddress endpoint, EndpointState state) +{ + VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); + + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) + maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); +} + +/** + * If versions differ this node sends request with local migration list to the endpoint + * and expecting to receive a list of migrations to apply locally. + */ +private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) +{ + if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) + { + logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); + return; + } + + if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + { + // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately + logger.debug("Submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + } + else + { + // Include a delay to make sure we have a chance to apply any changes being + // pushed out simultaneously. See CASSANDRA-5025 + Runnable runnable = new Runnable() + { + public void run() + { + // grab the latest version of the schema since it may have changed again since the initial scheduling + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epState == null) + { + logger.debug("epState vanished for {}, not submitting migration task", endpoint); + return; + } + VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); + UUID currentVersion = UUID.fromString(value.value); + if (Schema.instance.getVersion().equals(currentVersion)) + { + logger.debug("not submitting migration task for {} because our versions match", endpoint); + return; + } + logger.debug("submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + } + }; + ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + } +} + +private static Future submitMigrationTask(InetAddress endpoint) +{ + /* + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); +} + +private static boolean shouldPullSchemaFrom(InetAddress endpoint) +{ + /* + * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) + * Don't request schema from fat clients + */ + return MessagingService.instance().knowsVersion(endpoint) + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version + && !Gossiper.instance.isGossipOnlyMember(endpoint); +} + +public static boolean isReadyForBootstrap() +{ + return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; +} + +public void notifyCreateKeyspace(KSMetaData ksm) +{ + for (IMigrationListener listener : listeners) + listener.onCreateKeyspace(ksm.name); +} + +public void notifyCreateColumnFamily(CFMetaData cfm) +{ + for (IMigrationListener listener : listeners) + listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); +} + +public void notifyCreateUserType(UserType ut) +{ + for (IMigrationListener listener : listeners) + listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); +} + +public void notifyCreateFunction(UDFunction udf) +{ + for (IMigrationListener listener : listeners) + listener.onCreateFunction(udf.name().keyspace, udf.name().name); +} + +public void notifyCreateAggregate(UDAggregate udf) +{ + for (IMigrationListener listener : listeners) + listener.onCreateAggregate(udf.name().keyspace, udf.name().name); +} + +public void notifyUpdateKeyspace(KSMetaData ksm) +{ + for (IMigrationListener listener : listeners) + listener.onUpdateKeyspace(ksm.name); +} + +public void notifyUpdateColumnFamily(CFMetaData cfm) +{ + for (IMigrationListener listener : listeners) + listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); +} + +public void notifyUpdateUserType(UserType ut) +{ + for (IMigrationListener listener : listeners) + listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); +} + +public void notifyUpdateFunction(UDFunction udf) +{ + for (IMigrationListener listener : listeners) + listener.onUpdateFunction(udf.name().keyspace, udf.name().name); +} + +public void notifyUpdateAggregate(UDAggregate udf) +{ + for (IMigrationListener listener : listeners) + listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); +} + +public void notifyDropKeyspace(KSMetaData ksm) +{ + for (IMigrationListener listener : listeners) + listener.onDropKeyspace(ksm.name); +} + +public void notifyDropColumnFamily(CFMetaData cfm) +{ + for (IMigrationListener listener : listeners) + listener.onDropColumnFamily(cfm.ksName, cfm.cfName); +} + +public void notifyDropUserType(UserType ut) +{ + for (IMigrationListener listener : listeners) + listener.onDropUserType(ut.keyspace, ut.getNameAsString()); +} + +public void notifyDropFunction(UDFunction udf) +{ + for (IMigrationListener listener : listeners) + listener.onDropFunction(udf.name().keyspace, udf.name().name); +} + +public void notifyDropAggregate(UDAggregate udf) +{ + for (IMigrationListener listener : listeners) + listener.onDropAggregate(udf.name().keyspace, udf.name().name); +} +#endif + +future<>migration_manager::announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, bool announce_locally) +{ + return announce_new_keyspace(proxy, ksm, db_clock::now_in_usecs(), announce_locally); +} + +future<> migration_manager::announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally) +{ +#if 0 + ksm.validate(); + + if (Schema.instance.getKSMetaData(ksm.name) != null) + throw new AlreadyExistsException(ksm.name); + + logger.info(String.format("Create new Keyspace: %s", ksm)); +#endif + auto mutations = db::legacy_schema_tables::make_create_keyspace_mutations(ksm, timestamp); + return announce(proxy, std::move(mutations), announce_locally); +} + +future<> migration_manager::announce_new_column_family(service::storage_proxy& proxy, schema_ptr cfm, bool announce_locally) { + warn(unimplemented::cause::MIGRATIONS); + return make_ready_future<>(); +#if 0 + cfm.validate(); + + KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); + if (ksm == null) + throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName)); + else if (ksm.cfMetaData().containsKey(cfm.cfName)) + throw new AlreadyExistsException(cfm.ksName, cfm.cfName); + + logger.info(String.format("Create new table: %s", cfm)); + announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); +#endif +} + +#if 0 +public static void announceNewType(UserType newType, boolean announceLocally) +{ + KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace); + announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceNewFunction(UDFunction udf, boolean announceLocally) +{ + logger.info(String.format("Create scalar function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) +{ + logger.info(String.format("Create aggregate function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException +{ + announceKeyspaceUpdate(ksm, false); +} + +public static void announceKeyspaceUpdate(KSMetaData ksm, boolean announceLocally) throws ConfigurationException +{ + ksm.validate(); + + KSMetaData oldKsm = Schema.instance.getKSMetaData(ksm.name); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); + + logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm)); + announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException +{ + announceColumnFamilyUpdate(cfm, fromThrift, false); +} + +public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift, boolean announceLocally) throws ConfigurationException +{ + cfm.validate(); + + CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName); + if (oldCfm == null) + throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); + + oldCfm.validateCompatility(cfm); + + logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm)); + announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally); +} + +public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) +{ + announceNewType(updatedType, announceLocally); +} + +public static void announceKeyspaceDrop(String ksName) throws ConfigurationException +{ + announceKeyspaceDrop(ksName, false); +} + +public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException +{ + KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); + + logger.info(String.format("Drop Keyspace '%s'", oldKsm.name)); + announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException +{ + announceColumnFamilyDrop(ksName, cfName, false); +} + +public static void announceColumnFamilyDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException +{ + CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName); + if (oldCfm == null) + throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(ksName); + + logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName)); + announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceTypeDrop(UserType droppedType) +{ + announceTypeDrop(droppedType, false); +} + +public static void announceTypeDrop(UserType droppedType, boolean announceLocally) +{ + KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace); + announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) +{ + logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); +} + +public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) +{ + logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); +} +#endif + +/** + * actively announce a new version to active hosts via rpc + * @param schema The schema mutation to be applied + */ +future<> migration_manager::announce(service::storage_proxy& proxy, mutation schema, bool announce_locally) +{ + std::vector mutations; + mutations.emplace_back(std::move(schema)); + return announce(proxy, std::move(mutations), announce_locally); +} + +future<> migration_manager::announce(service::storage_proxy& proxy, std::vector mutations, bool announce_locally) +{ + if (announce_locally) { + return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false); + } else { + return announce(proxy, std::move(mutations)); + } +} + +#if 0 +private static void pushSchemaMutation(InetAddress endpoint, Collection schema) +{ + MessageOut> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, + schema, + MigrationsSerializer.instance); + MessagingService.instance().sendOneWay(msg, endpoint); +} +#endif + + // Returns a future on the local application of the schema +future<> migration_manager::announce(service::storage_proxy& proxy, std::vector schema) +{ + auto f = db::legacy_schema_tables::merge_schema(proxy, std::move(schema)); +#if 0 + for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) + { + // only push schema to nodes with known and equal versions + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + MessagingService.instance().knowsVersion(endpoint) && + MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) + pushSchemaMutation(endpoint, schema); + } +#endif + return f; +} + +#if 0 +/** + * Announce my version passively over gossip. + * Used to notify nodes as they arrive in the cluster. + * + * @param version The schema version to announce + */ +public static void passiveAnnounce(UUID version) +{ + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + logger.debug("Gossiping my schema version {}", version); +} + +/** + * Clear all locally stored schema information and reset schema to initial state. + * Called by user (via JMX) who wants to get rid of schema disagreement. + * + * @throws IOException if schema tables truncation fails + */ +public static void resetLocalSchema() throws IOException +{ + logger.info("Starting local schema reset..."); + + logger.debug("Truncating schema tables..."); + + LegacySchemaTables.truncateSchemaTables(); + + logger.debug("Clearing local schema keyspace definitions..."); + + Schema.instance.clear(); + + Set liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + + // force migration if there are nodes around + for (InetAddress node : liveEndpoints) + { + if (shouldPullSchemaFrom(node)) + { + logger.debug("Requesting schema from {}", node); + FBUtilities.waitOnFuture(submitMigrationTask(node)); + break; + } + } + + logger.info("Local schema reset is complete."); +} + +public static class MigrationsSerializer implements IVersionedSerializer> +{ + public static MigrationsSerializer instance = new MigrationsSerializer(); + + public void serialize(Collection schema, DataOutputPlus out, int version) throws IOException + { + out.writeInt(schema.size()); + for (Mutation mutation : schema) + Mutation.serializer.serialize(mutation, out, version); + } + + public Collection deserialize(DataInput in, int version) throws IOException + { + int count = in.readInt(); + Collection schema = new ArrayList<>(count); + + for (int i = 0; i < count; i++) + schema.add(Mutation.serializer.deserialize(in, version)); + + return schema; + } + + public long serializedSize(Collection schema, int version) + { + int size = TypeSizes.NATIVE.sizeof(schema.size()); + for (Mutation mutation : schema) + size += Mutation.serializer.serializedSize(mutation, version); + return size; + } +} +#endif + +} diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 0c0c6a105d..1198ab3e38 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -26,46 +26,6 @@ #include "db/legacy_schema_tables.hh" -#if 0 -package org.apache.cassandra.service; - -import java.io.DataInput; -import java.io.IOException; -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.*; - -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.exceptions.AlreadyExistsException; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.*; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; -#endif - namespace service { class migration_manager { @@ -81,485 +41,24 @@ class migration_manager { private final List listeners = new CopyOnWriteArrayList<>(); private MigrationManager() {} - - public void register(IMigrationListener listener) - { - listeners.add(listener); - } - - public void unregister(IMigrationListener listener) - { - listeners.remove(listener); - } - - public void scheduleSchemaPull(InetAddress endpoint, EndpointState state) - { - VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); - - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); - } - - /** - * If versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) - { - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) - { - logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); - return; - } - - if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) - { - // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately - logger.debug("Submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - } - else - { - // Include a delay to make sure we have a chance to apply any changes being - // pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = new Runnable() - { - public void run() - { - // grab the latest version of the schema since it may have changed again since the initial scheduling - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (epState == null) - { - logger.debug("epState vanished for {}, not submitting migration task", endpoint); - return; - } - VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) - { - logger.debug("not submitting migration task for {} because our versions match", endpoint); - return; - } - logger.debug("submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - } - }; - ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); - } - } - - private static Future submitMigrationTask(InetAddress endpoint) - { - /* - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); - } - - private static boolean shouldPullSchemaFrom(InetAddress endpoint) - { - /* - * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) - * Don't request schema from fat clients - */ - return MessagingService.instance().knowsVersion(endpoint) - && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version - && !Gossiper.instance.isGossipOnlyMember(endpoint); - } #endif public: -#if 0 - public static boolean isReadyForBootstrap() - { - return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; - } + static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, bool announce_locally = false); - public void notifyCreateKeyspace(KSMetaData ksm) - { - for (IMigrationListener listener : listeners) - listener.onCreateKeyspace(ksm.name); - } + static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally); - public void notifyCreateColumnFamily(CFMetaData cfm) - { - for (IMigrationListener listener : listeners) - listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); - } - - public void notifyCreateUserType(UserType ut) - { - for (IMigrationListener listener : listeners) - listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); - } - - public void notifyCreateFunction(UDFunction udf) - { - for (IMigrationListener listener : listeners) - listener.onCreateFunction(udf.name().keyspace, udf.name().name); - } - - public void notifyCreateAggregate(UDAggregate udf) - { - for (IMigrationListener listener : listeners) - listener.onCreateAggregate(udf.name().keyspace, udf.name().name); - } - - public void notifyUpdateKeyspace(KSMetaData ksm) - { - for (IMigrationListener listener : listeners) - listener.onUpdateKeyspace(ksm.name); - } - - public void notifyUpdateColumnFamily(CFMetaData cfm) - { - for (IMigrationListener listener : listeners) - listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); - } - - public void notifyUpdateUserType(UserType ut) - { - for (IMigrationListener listener : listeners) - listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); - } - - public void notifyUpdateFunction(UDFunction udf) - { - for (IMigrationListener listener : listeners) - listener.onUpdateFunction(udf.name().keyspace, udf.name().name); - } - - public void notifyUpdateAggregate(UDAggregate udf) - { - for (IMigrationListener listener : listeners) - listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); - } - - public void notifyDropKeyspace(KSMetaData ksm) - { - for (IMigrationListener listener : listeners) - listener.onDropKeyspace(ksm.name); - } - - public void notifyDropColumnFamily(CFMetaData cfm) - { - for (IMigrationListener listener : listeners) - listener.onDropColumnFamily(cfm.ksName, cfm.cfName); - } - - public void notifyDropUserType(UserType ut) - { - for (IMigrationListener listener : listeners) - listener.onDropUserType(ut.keyspace, ut.getNameAsString()); - } - - public void notifyDropFunction(UDFunction udf) - { - for (IMigrationListener listener : listeners) - listener.onDropFunction(udf.name().keyspace, udf.name().name); - } - - public void notifyDropAggregate(UDAggregate udf) - { - for (IMigrationListener listener : listeners) - listener.onDropAggregate(udf.name().keyspace, udf.name().name); - } -#endif - - static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm) - { - return announce_new_keyspace(proxy, ksm, false); - } - - static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, bool announce_locally) - { - return announce_new_keyspace(proxy, ksm, db_clock::now_in_usecs(), announce_locally); - } - - static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally) - { -#if 0 - ksm.validate(); - - if (Schema.instance.getKSMetaData(ksm.name) != null) - throw new AlreadyExistsException(ksm.name); - - logger.info(String.format("Create new Keyspace: %s", ksm)); -#endif - auto mutations = db::legacy_schema_tables::make_create_keyspace_mutations(ksm, timestamp); - return announce(proxy, std::move(mutations), announce_locally); - } - -#if 0 - public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException - { - announceNewColumnFamily(cfm, false); - } -#endif - - static future<> announce_new_column_family(service::storage_proxy& proxy, schema_ptr cfm, bool announce_locally) { - warn(unimplemented::cause::MIGRATIONS); - return make_ready_future<>(); -#if 0 - cfm.validate(); - - KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); - if (ksm == null) - throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName)); - else if (ksm.cfMetaData().containsKey(cfm.cfName)) - throw new AlreadyExistsException(cfm.ksName, cfm.cfName); - - logger.info(String.format("Create new table: %s", cfm)); - announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); -#endif - } - -#if 0 - public static void announceNewType(UserType newType, boolean announceLocally) - { - KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace); - announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewFunction(UDFunction udf, boolean announceLocally) - { - logger.info(String.format("Create scalar function '%s'", udf.name())); - KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) - { - logger.info(String.format("Create aggregate function '%s'", udf.name())); - KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException - { - announceKeyspaceUpdate(ksm, false); - } - - public static void announceKeyspaceUpdate(KSMetaData ksm, boolean announceLocally) throws ConfigurationException - { - ksm.validate(); - - KSMetaData oldKsm = Schema.instance.getKSMetaData(ksm.name); - if (oldKsm == null) - throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); - - logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm)); - announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException - { - announceColumnFamilyUpdate(cfm, fromThrift, false); - } - - public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift, boolean announceLocally) throws ConfigurationException - { - cfm.validate(); - - CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName); - if (oldCfm == null) - throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName)); - KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); - - oldCfm.validateCompatility(cfm); - - logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm)); - announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally); - } - - public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) - { - announceNewType(updatedType, announceLocally); - } - - public static void announceKeyspaceDrop(String ksName) throws ConfigurationException - { - announceKeyspaceDrop(ksName, false); - } - - public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException - { - KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName); - if (oldKsm == null) - throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); - - logger.info(String.format("Drop Keyspace '%s'", oldKsm.name)); - announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException - { - announceColumnFamilyDrop(ksName, cfName, false); - } - - public static void announceColumnFamilyDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException - { - CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName); - if (oldCfm == null) - throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); - KSMetaData ksm = Schema.instance.getKSMetaData(ksName); - - logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName)); - announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceTypeDrop(UserType droppedType) - { - announceTypeDrop(droppedType, false); - } - - public static void announceTypeDrop(UserType droppedType, boolean announceLocally) - { - KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace); - announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) - { - logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes())); - KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) - { - logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes())); - KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); - announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } -#endif + static future<> announce_new_column_family(service::storage_proxy& proxy, schema_ptr cfm, bool announce_locally = false); /** * actively announce a new version to active hosts via rpc * @param schema The schema mutation to be applied */ - static future<> announce(service::storage_proxy& proxy, mutation schema, bool announce_locally) - { - std::vector mutations; - mutations.emplace_back(std::move(schema)); - return announce(proxy, std::move(mutations), announce_locally); - } + static future<> announce(service::storage_proxy& proxy, mutation schema, bool announce_locally); - static future<> announce(service::storage_proxy& proxy, std::vector mutations, bool announce_locally) - { - if (announce_locally) { - return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false); - } else { - return announce(proxy, std::move(mutations)); - } - } - -#if 0 - private static void pushSchemaMutation(InetAddress endpoint, Collection schema) - { - MessageOut> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, - schema, - MigrationsSerializer.instance); - MessagingService.instance().sendOneWay(msg, endpoint); - } -#endif + static future<> announce(service::storage_proxy& proxy, std::vector mutations, bool announce_locally); // Returns a future on the local application of the schema - static future<> announce(service::storage_proxy& proxy, std::vector schema) - { - auto f = db::legacy_schema_tables::merge_schema(proxy, std::move(schema)); -#if 0 - for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) - { - // only push schema to nodes with known and equal versions - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && - MessagingService.instance().knowsVersion(endpoint) && - MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) - pushSchemaMutation(endpoint, schema); - } -#endif - return f; - } - -#if 0 - /** - * Announce my version passively over gossip. - * Used to notify nodes as they arrive in the cluster. - * - * @param version The schema version to announce - */ - public static void passiveAnnounce(UUID version) - { - Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); - logger.debug("Gossiping my schema version {}", version); - } - - /** - * Clear all locally stored schema information and reset schema to initial state. - * Called by user (via JMX) who wants to get rid of schema disagreement. - * - * @throws IOException if schema tables truncation fails - */ - public static void resetLocalSchema() throws IOException - { - logger.info("Starting local schema reset..."); - - logger.debug("Truncating schema tables..."); - - LegacySchemaTables.truncateSchemaTables(); - - logger.debug("Clearing local schema keyspace definitions..."); - - Schema.instance.clear(); - - Set liveEndpoints = Gossiper.instance.getLiveMembers(); - liveEndpoints.remove(FBUtilities.getBroadcastAddress()); - - // force migration if there are nodes around - for (InetAddress node : liveEndpoints) - { - if (shouldPullSchemaFrom(node)) - { - logger.debug("Requesting schema from {}", node); - FBUtilities.waitOnFuture(submitMigrationTask(node)); - break; - } - } - - logger.info("Local schema reset is complete."); - } - - public static class MigrationsSerializer implements IVersionedSerializer> - { - public static MigrationsSerializer instance = new MigrationsSerializer(); - - public void serialize(Collection schema, DataOutputPlus out, int version) throws IOException - { - out.writeInt(schema.size()); - for (Mutation mutation : schema) - Mutation.serializer.serialize(mutation, out, version); - } - - public Collection deserialize(DataInput in, int version) throws IOException - { - int count = in.readInt(); - Collection schema = new ArrayList<>(count); - - for (int i = 0; i < count; i++) - schema.add(Mutation.serializer.deserialize(in, version)); - - return schema; - } - - public long serializedSize(Collection schema, int version) - { - int size = TypeSizes.NATIVE.sizeof(schema.size()); - for (Mutation mutation : schema) - size += Mutation.serializer.serializedSize(mutation, version); - return size; - } - } -#endif + static future<> announce(service::storage_proxy& proxy, std::vector schema); }; }