diff --git a/service/migration_manager.cc b/service/migration_manager.cc index fd7d049782..db4641cef8 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -26,12 +26,16 @@ #include "message/messaging_service.hh" #include "service/storage_service.hh" +#include "service/migration_task.hh" +#include "utils/runtime.hh" #include "gms/gossiper.hh" namespace service { static thread_local logging::logger logger("Migration Manager"); +const int32_t migration_manager::MIGRATION_DELAY_IN_MS = 60000; + #if 0 public void register(IMigrationListener listener) { @@ -42,83 +46,78 @@ public void unregister(IMigrationListener listener) { listeners.remove(listener); } +#endif -public void scheduleSchemaPull(InetAddress endpoint, EndpointState state) +future<> migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state) { - VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); + auto& proxy = service::get_local_storage_proxy(); - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); + const auto& value = state.get_application_state(gms::application_state::SCHEMA); + + if (endpoint != utils::fb_utilities::get_broadcast_address() && value) { + return maybe_schedule_schema_pull(proxy, utils::UUID{value->value}, endpoint); + } + return make_ready_future<>(); } /** * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. */ -private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) +future<> migration_manager::maybe_schedule_schema_pull(storage_proxy& proxy, const utils::UUID& their_version, const gms::inet_address& endpoint) { - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) - { + auto& db = proxy.get_db().local(); + if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) { logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); - return; + return make_ready_future<>(); } - if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) - { + if (db.get_version() == database::empty_version || runtime::get_uptime() < MIGRATION_DELAY_IN_MS) { // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately logger.debug("Submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - } - else - { + return submit_migration_task(proxy, endpoint); + } else { // Include a delay to make sure we have a chance to apply any changes being // pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = new Runnable() - { - public void run() - { - // grab the latest version of the schema since it may have changed again since the initial scheduling - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (epState == null) - { - logger.debug("epState vanished for {}, not submitting migration task", endpoint); - return; - } - VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) - { - logger.debug("not submitting migration task for {} because our versions match", endpoint); - return; - } - logger.debug("submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); + return sleep(std::chrono::milliseconds(MIGRATION_DELAY_IN_MS)).then([&proxy, endpoint] { + // grab the latest version of the schema since it may have changed again since the initial scheduling + auto& gossiper = gms::get_local_gossiper(); + auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint); + if (!ep_state) { + logger.debug("epState vanished for {}, not submitting migration task", endpoint); + return make_ready_future<>(); } - }; - ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + const auto& value = ep_state->get_application_state(gms::application_state::SCHEMA); + utils::UUID current_version{value->value}; + auto& db = proxy.get_db().local(); + if (db.get_version() == current_version) { + logger.debug("not submitting migration task for {} because our versions match", endpoint); + return make_ready_future<>(); + } + logger.debug("submitting migration task for {}", endpoint); + return submit_migration_task(proxy, endpoint); + }); } } -private static Future submitMigrationTask(InetAddress endpoint) +future<> migration_manager::submit_migration_task(service::storage_proxy& proxy, const gms::inet_address& endpoint) { - /* - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + return service::migration_task::run_may_throw(proxy, endpoint); } -private static boolean shouldPullSchemaFrom(InetAddress endpoint) +bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint) { /* * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) * Don't request schema from fat clients */ - return MessagingService.instance().knowsVersion(endpoint) - && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version - && !Gossiper.instance.isGossipOnlyMember(endpoint); + auto& ms = net::get_local_messaging_service(); + return ms.knows_version(endpoint) + && ms.get_raw_version(endpoint) == net::messaging_service::current_version + && !gms::get_gossiper().local().is_gossip_only_member(endpoint); } +#if 0 public static boolean isReadyForBootstrap() { return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; diff --git a/service/migration_manager.hh b/service/migration_manager.hh index f57defdaa1..41af284472 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -26,7 +26,9 @@ #include "db/legacy_schema_tables.hh" +#include "gms/endpoint_state.hh" #include "gms/inet_address.hh" +#include "utils/UUID.hh" #include @@ -39,14 +41,24 @@ class migration_manager { public static final MigrationManager instance = new MigrationManager(); private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); +#endif - public static final int MIGRATION_DELAY_IN_MS = 60000; + static const int32_t MIGRATION_DELAY_IN_MS; +#if 0 private final List listeners = new CopyOnWriteArrayList<>(); private MigrationManager() {} #endif public: + static future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state); + + static future<> maybe_schedule_schema_pull(service::storage_proxy& proxy, const utils::UUID& their_version, const gms::inet_address& endpoint); + + static future<> submit_migration_task(service::storage_proxy& proxy, const gms::inet_address& endpoint); + + static bool should_pull_schema_from(const gms::inet_address& endpoint); + static future<> announce_new_keyspace(distributed& proxy, lw_shared_ptr ksm, bool announce_locally = false); static future<> announce_new_keyspace(distributed& proxy, lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally);