storage_service: use atomic_vector for lifecycle_subscribers

So it can be modified while walked to dispatch
subscribed event notifications.

In #8143, there is a race between scylla shutdown and
notify_down(), causing use-after-free of cql_server.

Using an atomic vector itstead and futurizing
unregister_subscriber allows deleting from _lifecycle_subscribers
while walked using atomic_vector::for_each.

Fixes #8143

Test: unit(release)
DTest:
  update_cluster_layout_tests:TestUpdateClusterLayout.add_node_with_large_partition4_test(release)
  materialized_views_test.py:TestMaterializedViews.double_node_failure_during_mv_insert_4_nodes_test(release)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210224164647.561493-2-bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-02-24 18:46:47 +02:00
committed by Avi Kivity
parent 1ed04affab
commit baf5d05631
4 changed files with 20 additions and 18 deletions

View File

@@ -120,10 +120,9 @@ future<> manager::start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr
future<> manager::stop() {
manager_logger.info("Asked to stop");
if (_strorage_service_anchor) {
_strorage_service_anchor->unregister_subscriber(this);
}
auto f = _strorage_service_anchor ? _strorage_service_anchor->unregister_subscriber(this) : make_ready_future<>();
return f.finally([this] {
set_stopping();
return _draining_eps_gate.close().finally([this] {
@@ -134,6 +133,7 @@ future<> manager::stop() {
manager_logger.info("Stopped");
}).discard_result();
});
});
}
future<> manager::compute_hints_dir_device_id() {

View File

@@ -1288,12 +1288,12 @@ void storage_service::set_gossip_tokens(
void storage_service::register_subscriber(endpoint_lifecycle_subscriber* subscriber)
{
_lifecycle_subscribers.emplace_back(subscriber);
_lifecycle_subscribers.add(subscriber);
}
void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber)
future<> storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept
{
_lifecycle_subscribers.erase(std::remove(_lifecycle_subscribers.begin(), _lifecycle_subscribers.end(), subscriber), _lifecycle_subscribers.end());
return _lifecycle_subscribers.remove(subscriber);
}
static std::optional<future<>> drain_in_progress;
@@ -1341,8 +1341,9 @@ future<> storage_service::drain_on_shutdown() {
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
auto& ss = service::get_local_storage_service();
ss.unregister_subscriber(&local_proxy);
return ss.unregister_subscriber(&local_proxy).finally([&local_proxy] {
return local_proxy.drain_on_shutdown();
});
}).get();
slogger.info("Drain on shutdown: hints manager is stopped");
@@ -3188,13 +3189,13 @@ void storage_service::notify_down(inet_address endpoint) {
container().invoke_on_all([endpoint] (auto&& ss) {
ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0});
return seastar::async([&ss, endpoint] {
for (auto&& subscriber : ss._lifecycle_subscribers) {
ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_down(endpoint);
} catch (...) {
slogger.warn("Down notification failed {}: {}", endpoint, std::current_exception());
}
}
});
});
}).get();
slogger.debug("Notify node {} has been down", endpoint);
@@ -3203,13 +3204,13 @@ void storage_service::notify_down(inet_address endpoint) {
void storage_service::notify_left(inet_address endpoint) {
container().invoke_on_all([endpoint] (auto&& ss) {
return seastar::async([&ss, endpoint] {
for (auto&& subscriber : ss._lifecycle_subscribers) {
ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_leave_cluster(endpoint);
} catch (...) {
slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
});
});
}).get();
slogger.debug("Notify node {} has left the cluster", endpoint);
@@ -3222,13 +3223,13 @@ void storage_service::notify_up(inet_address endpoint)
}
container().invoke_on_all([endpoint] (auto&& ss) {
return seastar::async([&ss, endpoint] {
for (auto&& subscriber : ss._lifecycle_subscribers) {
ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_up(endpoint);
} catch (...) {
slogger.warn("Up notification failed {}: {}", endpoint, std::current_exception());
}
}
});
});
}).get();
slogger.debug("Notify node {} has been up", endpoint);
@@ -3242,13 +3243,13 @@ void storage_service::notify_joined(inet_address endpoint)
container().invoke_on_all([endpoint] (auto&& ss) {
return seastar::async([&ss, endpoint] {
for (auto&& subscriber : ss._lifecycle_subscribers) {
ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_join_cluster(endpoint);
} catch (...) {
slogger.warn("Join cluster notification failed {}: {}", endpoint, std::current_exception());
}
}
});
});
}).get();
slogger.debug("Notify node {} has joined the cluster", endpoint);

View File

@@ -326,7 +326,7 @@ private:
drain_progress _drain_progress{};
std::vector<endpoint_lifecycle_subscriber*> _lifecycle_subscribers;
atomic_vector<endpoint_lifecycle_subscriber*> _lifecycle_subscribers;
std::unordered_set<token> _bootstrap_tokens;
@@ -354,7 +354,7 @@ public:
void register_subscriber(endpoint_lifecycle_subscriber* subscriber);
void unregister_subscriber(endpoint_lifecycle_subscriber* subscriber);
future<> unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept;
// should only be called via JMX
future<> stop_gossiping();

View File

@@ -42,8 +42,9 @@ cql_server::event_notifier::~event_notifier()
future<> cql_server::event_notifier::stop() {
return _mnotifier.unregister_listener(this).then([this]{
service::get_local_storage_service().unregister_subscriber(this);
return service::get_local_storage_service().unregister_subscriber(this).finally([this] {
_stopped = true;
});
});
}