Merge "Event notification exception safety" from Pekka

"Fix both migration manager and storage service to catch and log
exceptions for listeners to ensure all listeners are notified.

Spotted by Avi."
This commit is contained in:
Avi Kivity
2016-01-04 11:03:01 +02:00
2 changed files with 55 additions and 11 deletions

View File

@@ -173,7 +173,11 @@ future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_
{
return get_migration_manager().invoke_on_all([name = ksm->name()] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_create_keyspace(name);
try {
listener->on_create_keyspace(name);
} catch (...) {
logger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
@@ -182,7 +186,11 @@ future<> migration_manager::notify_create_column_family(schema_ptr cfm)
{
return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_create_column_family(ks_name, cf_name);
try {
listener->on_create_column_family(ks_name, cf_name);
} catch (...) {
logger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
@@ -211,7 +219,11 @@ future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_
{
return get_migration_manager().invoke_on_all([name = ksm->name()] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_update_keyspace(name);
try {
listener->on_update_keyspace(name);
} catch (...) {
logger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
@@ -220,7 +232,11 @@ future<> migration_manager::notify_update_column_family(schema_ptr cfm)
{
return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_update_column_family(ks_name, cf_name);
try {
listener->on_update_column_family(ks_name, cf_name);
} catch (...) {
logger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
@@ -249,7 +265,11 @@ future<> migration_manager::notify_drop_keyspace(sstring ks_name)
{
return get_migration_manager().invoke_on_all([ks_name] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_drop_keyspace(ks_name);
try {
listener->on_drop_keyspace(ks_name);
} catch (...) {
logger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
}
}
});
}
@@ -258,7 +278,11 @@ future<> migration_manager::notify_drop_column_family(schema_ptr cfm)
{
return get_migration_manager().invoke_on_all([ks_name = cfm->ks_name(), cf_name = cfm->cf_name()] (auto&& mm) {
for (auto&& listener : mm._listeners) {
listener->on_drop_column_family(ks_name, cf_name);
try {
listener->on_drop_column_family(ks_name, cf_name);
} catch (...) {
logger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}

View File

@@ -588,13 +588,21 @@ void storage_service::handle_state_normal(inet_address endpoint) {
_token_metadata.remove_from_moving(endpoint);
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_move(endpoint);
try {
subscriber->on_move(endpoint);
} catch (...) {
logger.warn("Move notification failed {}: {}", endpoint, std::current_exception());
}
}
}).get();
} else {
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_join_cluster(endpoint);
try {
subscriber->on_join_cluster(endpoint);
} catch (...) {
logger.warn("Join cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
}).get();
}
@@ -722,7 +730,11 @@ void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state s
#endif
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_up(endpoint);
try {
subscriber->on_up(endpoint);
} catch (...) {
logger.warn("Up notification failed {}: {}", endpoint, std::current_exception());
}
}
});
}
@@ -785,7 +797,11 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st
net::get_local_messaging_service().remove_rpc_client(net::shard_id{endpoint, 0});
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_down(endpoint);
try {
subscriber->on_down(endpoint);
} catch (...) {
logger.warn("Down notification failed {}: {}", endpoint, std::current_exception());
}
}
}).get();
}
@@ -2092,7 +2108,11 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_leave_cluster(endpoint);
try {
subscriber->on_leave_cluster(endpoint);
} catch (...) {
logger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
}).get();