migration_manager: Make the migration callbacks runs inside seastar thread
At the momment, the callbacks returns void, it is impossible to wait for the callbacks to complete. Make the callbacks runs inside seastar thread, so if we need to wait for the callback, we can make it call foo_operation().get() in the callback. It is easier than making the callbacks return future<>.
This commit is contained in:
@@ -607,10 +607,10 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
|
||||
#endif
|
||||
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
|
||||
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
|
||||
for (auto&& keyspace_to_drop : keyspaces_to_drop) {
|
||||
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
|
||||
db.drop_keyspace(keyspace_to_drop);
|
||||
service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
|
||||
}
|
||||
return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
|
||||
});
|
||||
}).get0();
|
||||
});
|
||||
}
|
||||
@@ -650,7 +650,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
|
||||
return do_for_each(created, [&db](auto&& val) {
|
||||
auto ksm = create_keyspace_from_schema_partition(val);
|
||||
return db.create_keyspace(ksm).then([ksm] {
|
||||
service::get_local_migration_manager().notify_create_keyspace(ksm);
|
||||
return service::get_local_migration_manager().notify_create_keyspace(ksm);
|
||||
});
|
||||
}).then([&altered, &db] () mutable {
|
||||
for (auto&& name : altered) {
|
||||
|
||||
@@ -50,18 +50,21 @@ public:
|
||||
virtual ~migration_listener()
|
||||
{ }
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_create_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
|
||||
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) = 0;
|
||||
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_update_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) = 0;
|
||||
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) = 0;
|
||||
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
|
||||
|
||||
// The callback runs inside seastar thread
|
||||
virtual void on_drop_keyspace(const sstring& ks_name) = 0;
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
|
||||
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) = 0;
|
||||
|
||||
@@ -231,8 +231,8 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
|
||||
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
|
||||
}
|
||||
|
||||
void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
|
||||
{
|
||||
future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
@@ -241,10 +241,11 @@ void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_meta
|
||||
logger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_create_column_family(const schema_ptr& cfm)
|
||||
{
|
||||
future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -254,6 +255,7 @@ void migration_manager::notify_create_column_family(const schema_ptr& cfm)
|
||||
logger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -276,8 +278,8 @@ public void notifyCreateAggregate(UDAggregate udf)
|
||||
}
|
||||
#endif
|
||||
|
||||
void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
|
||||
{
|
||||
future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
@@ -286,10 +288,11 @@ void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_meta
|
||||
logger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed)
|
||||
{
|
||||
future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
|
||||
return seastar::async([this, cfm, columns_changed] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -299,6 +302,7 @@ void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool
|
||||
logger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -321,8 +325,8 @@ public void notifyUpdateAggregate(UDAggregate udf)
|
||||
}
|
||||
#endif
|
||||
|
||||
void migration_manager::notify_drop_keyspace(const sstring& ks_name)
|
||||
{
|
||||
future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
|
||||
return seastar::async([this, ks_name] {
|
||||
for (auto&& listener : _listeners) {
|
||||
try {
|
||||
listener->on_drop_keyspace(ks_name);
|
||||
@@ -330,10 +334,11 @@ void migration_manager::notify_drop_keyspace(const sstring& ks_name)
|
||||
logger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
|
||||
{
|
||||
future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
@@ -343,6 +348,7 @@ void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
|
||||
logger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -81,12 +81,12 @@ public:
|
||||
// Keep mutations alive around whole async operation.
|
||||
future<> merge_schema_from(net::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations);
|
||||
|
||||
void notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
void notify_create_column_family(const schema_ptr& cfm);
|
||||
void notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
void notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
|
||||
void notify_drop_keyspace(const sstring& ks_name);
|
||||
void notify_drop_column_family(const schema_ptr& cfm);
|
||||
future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
future<> notify_create_column_family(const schema_ptr& cfm);
|
||||
future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
|
||||
future<> notify_drop_keyspace(const sstring& ks_name);
|
||||
future<> notify_drop_column_family(const schema_ptr& cfm);
|
||||
|
||||
bool should_pull_schema_from(const gms::inet_address& endpoint);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user