service: run notifying code in threaded context
In order to allow yielding when handling endpoint lifecycle changes, notifiers now run in threaded context. Implementations which used this assumption before are supplemented with assertions that they indeed run in seastar::async mode. Fixes #4317 Message-Id: <45bbaf2d25dac314e4f322a91350705fad8b81ed.1552567666.git.sarna@scylladb.com>
This commit is contained in:
committed by
Duarte Nunes
parent
a7602bd2f1
commit
9c544df217
@@ -3610,6 +3610,7 @@ void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) {};
|
||||
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
|
||||
|
||||
void storage_proxy::on_down(const gms::inet_address& endpoint) {
|
||||
assert(thread::running_in_thread());
|
||||
for (auto it = _view_update_handlers_list->begin(); it != _view_update_handlers_list->end(); ++it) {
|
||||
auto guard = it->shared_from_this();
|
||||
if (it->get_targets().count(endpoint) > 0) {
|
||||
|
||||
@@ -3335,26 +3335,26 @@ future<> storage_service::set_cql_ready(bool ready) {
|
||||
void storage_service::notify_down(inet_address endpoint) {
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
netw::get_local_messaging_service().remove_rpc_client(netw::msg_addr{endpoint, 0});
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
try {
|
||||
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
return seastar::async([endpoint, subscriber] {
|
||||
subscriber->on_down(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("Down notification failed {}: {}", endpoint, std::current_exception());
|
||||
}
|
||||
}
|
||||
}).handle_exception([endpoint] (std::exception_ptr ex) {
|
||||
slogger.warn("Down notification failed {}: {}", endpoint, ex);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
slogger.debug("Notify node {} has been down", endpoint);
|
||||
}
|
||||
|
||||
void storage_service::notify_left(inet_address endpoint) {
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
try {
|
||||
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
return seastar::async([endpoint, subscriber] {
|
||||
subscriber->on_leave_cluster(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception());
|
||||
}
|
||||
}
|
||||
}).handle_exception([endpoint] (std::exception_ptr ex) {
|
||||
slogger.warn("Leave cluster notification failed {}: {}", ex);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
slogger.debug("Notify node {} has left the cluster", endpoint);
|
||||
}
|
||||
@@ -3366,13 +3366,13 @@ void storage_service::notify_up(inet_address endpoint)
|
||||
return;
|
||||
}
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
try {
|
||||
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
return seastar::async([endpoint, subscriber] {
|
||||
subscriber->on_up(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("Up notification failed {}: {}", endpoint, std::current_exception());
|
||||
}
|
||||
}
|
||||
}).handle_exception([endpoint] (std::exception_ptr ex) {
|
||||
slogger.warn("Up notification failed {}: {}", endpoint, ex);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
slogger.debug("Notify node {} has been up", endpoint);
|
||||
}
|
||||
@@ -3384,13 +3384,13 @@ void storage_service::notify_joined(inet_address endpoint)
|
||||
}
|
||||
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
try {
|
||||
return do_for_each(ss._lifecycle_subscribers, [endpoint] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
return seastar::async([endpoint, subscriber] {
|
||||
subscriber->on_join_cluster(endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("Join cluster notification failed {}: {}", endpoint, std::current_exception());
|
||||
}
|
||||
}
|
||||
}).handle_exception([endpoint] (std::exception_ptr ex) {
|
||||
slogger.warn("Join cluster notification failed {}: {}", endpoint, ex);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
slogger.debug("Notify node {} has joined the cluster", endpoint);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user