service: run notifying code in threaded context

In order to allow yielding when handling endpoint lifecycle changes,
notifiers now run in threaded context.
Implementations which used this assumption before are supplemented
with assertions that they indeed run in seastar::async mode.

Fixes #4317
Message-Id: <45bbaf2d25dac314e4f322a91350705fad8b81ed.1552567666.git.sarna@scylladb.com>
This commit is contained in:
Piotr Sarna
2019-03-14 13:50:14 +01:00
committed by Duarte Nunes
parent a7602bd2f1
commit 9c544df217
2 changed files with 25 additions and 24 deletions

View File

@@ -3610,6 +3610,7 @@ void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) {};
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
void storage_proxy::on_down(const gms::inet_address& endpoint) {
assert(thread::running_in_thread());
for (auto it = _view_update_handlers_list->begin(); it != _view_update_handlers_list->end(); ++it) {
auto guard = it->shared_from_this();
if (it->get_targets().count(endpoint) > 0) {

View File

@@ -3335,26 +3335,26 @@ future<> storage_service::set_cql_ready(bool ready) {
void storage_service::notify_down(inet_address endpoint) {
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
netw::get_local_messaging_service().remove_rpc_client(netw::msg_addr{endpoint, 0});
for (auto&& subscriber : ss._lifecycle_subscribers) {
try {
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
return seastar::async([endpoint, subscriber] {
subscriber->on_down(endpoint);
} catch (...) {
slogger.warn("Down notification failed {}: {}", endpoint, std::current_exception());
}
}
}).handle_exception([endpoint] (std::exception_ptr ex) {
slogger.warn("Down notification failed {}: {}", endpoint, ex);
});
});
}).get();
slogger.debug("Notify node {} has been down", endpoint);
}
void storage_service::notify_left(inet_address endpoint) {
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
try {
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
return seastar::async([endpoint, subscriber] {
subscriber->on_leave_cluster(endpoint);
} catch (...) {
slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
}).handle_exception([endpoint] (std::exception_ptr ex) {
slogger.warn("Leave cluster notification failed {}: {}", ex);
});
});
}).get();
slogger.debug("Notify node {} has left the cluster", endpoint);
}
@@ -3366,13 +3366,13 @@ void storage_service::notify_up(inet_address endpoint)
return;
}
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
try {
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
return seastar::async([endpoint, subscriber] {
subscriber->on_up(endpoint);
} catch (...) {
slogger.warn("Up notification failed {}: {}", endpoint, std::current_exception());
}
}
}).handle_exception([endpoint] (std::exception_ptr ex) {
slogger.warn("Up notification failed {}: {}", endpoint, ex);
});
});
}).get();
slogger.debug("Notify node {} has been up", endpoint);
}
@@ -3384,13 +3384,13 @@ void storage_service::notify_joined(inet_address endpoint)
}
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
try {
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
return seastar::async([endpoint, subscriber] {
subscriber->on_join_cluster(endpoint);
} catch (...) {
slogger.warn("Join cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
}).handle_exception([endpoint] (std::exception_ptr ex) {
slogger.warn("Join cluster notification failed {}: {}", endpoint, ex);
});
});
}).get();
slogger.debug("Notify node {} has joined the cluster", endpoint);
}