migration_manager: Make the migration callbacks runs inside seastar thread

At the momment, the callbacks returns void, it is impossible to wait for
the callbacks to complete. Make the callbacks runs inside seastar
thread, so if we need to wait for the callback, we can make it call
foo_operation().get() in the callback. It is easier than making the
callbacks return future<>.
This commit is contained in:
Asias He
2016-03-14 16:34:11 +08:00
parent 5076f4878b
commit 93015bcc54
4 changed files with 31 additions and 22 deletions

View File

@@ -607,10 +607,10 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
#endif
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
for (auto&& keyspace_to_drop : keyspaces_to_drop) {
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
db.drop_keyspace(keyspace_to_drop);
service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
}
return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
});
}).get0();
});
}
@@ -650,7 +650,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
return do_for_each(created, [&db](auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
return db.create_keyspace(ksm).then([ksm] {
service::get_local_migration_manager().notify_create_keyspace(ksm);
return service::get_local_migration_manager().notify_create_keyspace(ksm);
});
}).then([&altered, &db] () mutable {
for (auto&& name : altered) {

View File

@@ -50,18 +50,21 @@ public:
virtual ~migration_listener()
{ }
// The callback runs inside seastar thread
virtual void on_create_keyspace(const sstring& ks_name) = 0;
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) = 0;
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) = 0;
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
// The callback runs inside seastar thread
virtual void on_update_keyspace(const sstring& ks_name) = 0;
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) = 0;
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) = 0;
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) = 0;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
// The callback runs inside seastar thread
virtual void on_drop_keyspace(const sstring& ks_name) = 0;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) = 0;

View File

@@ -231,8 +231,8 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
}
void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
{
future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
try {
@@ -241,10 +241,11 @@ void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_meta
logger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
void migration_manager::notify_create_column_family(const schema_ptr& cfm)
{
future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
@@ -254,6 +255,7 @@ void migration_manager::notify_create_column_family(const schema_ptr& cfm)
logger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0
@@ -276,8 +278,8 @@ public void notifyCreateAggregate(UDAggregate udf)
}
#endif
void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
{
future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
try {
@@ -286,10 +288,11 @@ void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_meta
logger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed)
{
future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
return seastar::async([this, cfm, columns_changed] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
@@ -299,6 +302,7 @@ void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool
logger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0
@@ -321,8 +325,8 @@ public void notifyUpdateAggregate(UDAggregate udf)
}
#endif
void migration_manager::notify_drop_keyspace(const sstring& ks_name)
{
future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
return seastar::async([this, ks_name] {
for (auto&& listener : _listeners) {
try {
listener->on_drop_keyspace(ks_name);
@@ -330,10 +334,11 @@ void migration_manager::notify_drop_keyspace(const sstring& ks_name)
logger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
}
}
});
}
void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
{
future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& cf_name = cfm->cf_name();
auto&& ks_name = cfm->ks_name();
for (auto&& listener : _listeners) {
@@ -343,6 +348,7 @@ void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
logger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0

View File

@@ -81,12 +81,12 @@ public:
// Keep mutations alive around whole async operation.
future<> merge_schema_from(net::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations);
void notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
void notify_create_column_family(const schema_ptr& cfm);
void notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
void notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
void notify_drop_keyspace(const sstring& ks_name);
void notify_drop_column_family(const schema_ptr& cfm);
future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
future<> notify_create_column_family(const schema_ptr& cfm);
future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
future<> notify_drop_keyspace(const sstring& ks_name);
future<> notify_drop_column_family(const schema_ptr& cfm);
bool should_pull_schema_from(const gms::inet_address& endpoint);