diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 18cdf0d369..57ab7cd9cf 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -24,6 +24,7 @@ #include "service/migration_manager.hh" +#include "service/migration_listener.hh" #include "message/messaging_service.hh" #include "service/storage_service.hh" #include "service/migration_task.hh" @@ -41,6 +42,7 @@ using namespace std::chrono_literals; const std::chrono::milliseconds migration_manager::MIGRATION_DELAY_IN_MS = 60000ms; migration_manager::migration_manager() + : _listeners{} { } @@ -49,17 +51,15 @@ future<> migration_manager::stop() return make_ready_future<>(); } -#if 0 -public void register(IMigrationListener listener) +void migration_manager::register_listener(migration_listener* listener) { - listeners.add(listener); + _listeners.emplace_back(listener); } -public void unregister(IMigrationListener listener) +void migration_manager::unregister_listener(migration_listener* listener) { - listeners.remove(listener); + _listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end()); } -#endif future<> migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state) { @@ -135,19 +135,27 @@ public static boolean isReadyForBootstrap() { return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; } +#endif -public void notifyCreateKeyspace(KSMetaData ksm) +future<> migration_manager::notify_create_keyspace(const lw_shared_ptr& ksm) { - for (IMigrationListener listener : listeners) - listener.onCreateKeyspace(ksm.name); + return get_migration_manager().invoke_on_all([name = ksm->name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_create_keyspace(name); + } + }); } -public void notifyCreateColumnFamily(CFMetaData cfm) +future<> migration_manager::notify_create_column_family(schema_ptr cfm) { - for (IMigrationListener listener : listeners) - listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); + return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_create_column_family(ks_name, cf_name); + } + }); } +#if 0 public void notifyCreateUserType(UserType ut) { for (IMigrationListener listener : listeners) @@ -165,19 +173,27 @@ public void notifyCreateAggregate(UDAggregate udf) for (IMigrationListener listener : listeners) listener.onCreateAggregate(udf.name().keyspace, udf.name().name); } +#endif -public void notifyUpdateKeyspace(KSMetaData ksm) +future<> migration_manager::notify_update_keyspace(const lw_shared_ptr& ksm) { - for (IMigrationListener listener : listeners) - listener.onUpdateKeyspace(ksm.name); + return get_migration_manager().invoke_on_all([name = ksm->name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_update_keyspace(name); + } + }); } -public void notifyUpdateColumnFamily(CFMetaData cfm) +future<> migration_manager::notify_update_column_family(schema_ptr cfm) { - for (IMigrationListener listener : listeners) - listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); + return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_update_column_family(ks_name, cf_name); + } + }); } +#if 0 public void notifyUpdateUserType(UserType ut) { for (IMigrationListener listener : listeners) @@ -195,19 +211,27 @@ public void notifyUpdateAggregate(UDAggregate udf) for (IMigrationListener listener : listeners) listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); } +#endif -public void notifyDropKeyspace(KSMetaData ksm) +future<> migration_manager::notify_drop_keyspace(const lw_shared_ptr& ksm) { - for (IMigrationListener listener : listeners) - listener.onDropKeyspace(ksm.name); + return get_migration_manager().invoke_on_all([name = ksm->name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_drop_keyspace(name); + } + }); } -public void notifyDropColumnFamily(CFMetaData cfm) +future<> migration_manager::notify_drop_column_family(schema_ptr cfm) { - for (IMigrationListener listener : listeners) - listener.onDropColumnFamily(cfm.ksName, cfm.cfName); + return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) { + for (auto&& listener : mm._listeners) { + listener->on_drop_column_family(ks_name, cf_name); + } + }); } +#if 0 public void notifyDropUserType(UserType ut) { for (IMigrationListener listener : listeners) diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 6a68410ca3..c2e10617d2 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -24,8 +24,8 @@ #pragma once +#include "service/migration_listener.hh" #include "db/legacy_schema_tables.hh" - #include "gms/endpoint_state.hh" #include "core/distributed.hh" #include "gms/inet_address.hh" @@ -42,20 +42,36 @@ class migration_manager { private final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); #endif - static const std::chrono::milliseconds MIGRATION_DELAY_IN_MS; + std::vector _listeners; -#if 0 - private final List listeners = new CopyOnWriteArrayList<>(); -#endif + static const std::chrono::milliseconds MIGRATION_DELAY_IN_MS; public: migration_manager(); + /// Register a migration listener on current shard. + void register_listener(migration_listener* listener); + + /// Unregister a migration listener on current shard. + void unregister_listener(migration_listener* listener); + future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state); future<> maybe_schedule_schema_pull(service::storage_proxy& proxy, const utils::UUID& their_version, const gms::inet_address& endpoint); future<> submit_migration_task(service::storage_proxy& proxy, const gms::inet_address& endpoint); + static future<> notify_create_keyspace(const lw_shared_ptr& ksm); + + static future<> notify_create_column_family(schema_ptr cfm); + + static future<> notify_update_keyspace(const lw_shared_ptr& ksm); + + static future<> notify_update_column_family(schema_ptr cfm); + + static future<> notify_drop_keyspace(const lw_shared_ptr& ksm); + + static future<> notify_drop_column_family(schema_ptr cfm); + bool should_pull_schema_from(const gms::inet_address& endpoint); future<> announce_new_keyspace(distributed& proxy, lw_shared_ptr ksm, bool announce_locally = false);