From baf5d05631491792c0809f6ea657f9aa60ad0cd2 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 24 Feb 2021 18:46:47 +0200 Subject: [PATCH] storage_service: use atomic_vector for lifecycle_subscribers So it can be modified while walked to dispatch subscribed event notifications. In #8143, there is a race between scylla shutdown and notify_down(), causing use-after-free of cql_server. Using an atomic vector itstead and futurizing unregister_subscriber allows deleting from _lifecycle_subscribers while walked using atomic_vector::for_each. Fixes #8143 Test: unit(release) DTest: update_cluster_layout_tests:TestUpdateClusterLayout.add_node_with_large_partition4_test(release) materialized_views_test.py:TestMaterializedViews.double_node_failure_during_mv_insert_4_nodes_test(release) Signed-off-by: Benny Halevy Message-Id: <20210224164647.561493-2-bhalevy@scylladb.com> --- db/hints/manager.cc | 6 +++--- service/storage_service.cc | 25 +++++++++++++------------ service/storage_service.hh | 4 ++-- transport/event_notifier.cc | 3 ++- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 1b1e5f8961..53f7bc3e2e 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -120,10 +120,9 @@ future<> manager::start(shared_ptr proxy_ptr, shared_ptr future<> manager::stop() { manager_logger.info("Asked to stop"); - if (_strorage_service_anchor) { - _strorage_service_anchor->unregister_subscriber(this); - } + auto f = _strorage_service_anchor ? _strorage_service_anchor->unregister_subscriber(this) : make_ready_future<>(); + return f.finally([this] { set_stopping(); return _draining_eps_gate.close().finally([this] { @@ -134,6 +133,7 @@ future<> manager::stop() { manager_logger.info("Stopped"); }).discard_result(); }); + }); } future<> manager::compute_hints_dir_device_id() { diff --git a/service/storage_service.cc b/service/storage_service.cc index 6f11969e9c..279abb14e0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1288,12 +1288,12 @@ void storage_service::set_gossip_tokens( void storage_service::register_subscriber(endpoint_lifecycle_subscriber* subscriber) { - _lifecycle_subscribers.emplace_back(subscriber); + _lifecycle_subscribers.add(subscriber); } -void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) +future<> storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept { - _lifecycle_subscribers.erase(std::remove(_lifecycle_subscribers.begin(), _lifecycle_subscribers.end(), subscriber), _lifecycle_subscribers.end()); + return _lifecycle_subscribers.remove(subscriber); } static std::optional> drain_in_progress; @@ -1341,8 +1341,9 @@ future<> storage_service::drain_on_shutdown() { get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable { auto& ss = service::get_local_storage_service(); - ss.unregister_subscriber(&local_proxy); + return ss.unregister_subscriber(&local_proxy).finally([&local_proxy] { return local_proxy.drain_on_shutdown(); + }); }).get(); slogger.info("Drain on shutdown: hints manager is stopped"); @@ -3188,13 +3189,13 @@ void storage_service::notify_down(inet_address endpoint) { container().invoke_on_all([endpoint] (auto&& ss) { ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0}); return seastar::async([&ss, endpoint] { - for (auto&& subscriber : ss._lifecycle_subscribers) { + ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) { try { subscriber->on_down(endpoint); } catch (...) { slogger.warn("Down notification failed {}: {}", endpoint, std::current_exception()); } - } + }); }); }).get(); slogger.debug("Notify node {} has been down", endpoint); @@ -3203,13 +3204,13 @@ void storage_service::notify_down(inet_address endpoint) { void storage_service::notify_left(inet_address endpoint) { container().invoke_on_all([endpoint] (auto&& ss) { return seastar::async([&ss, endpoint] { - for (auto&& subscriber : ss._lifecycle_subscribers) { + ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) { try { subscriber->on_leave_cluster(endpoint); } catch (...) { slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception()); } - } + }); }); }).get(); slogger.debug("Notify node {} has left the cluster", endpoint); @@ -3222,13 +3223,13 @@ void storage_service::notify_up(inet_address endpoint) } container().invoke_on_all([endpoint] (auto&& ss) { return seastar::async([&ss, endpoint] { - for (auto&& subscriber : ss._lifecycle_subscribers) { + ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) { try { subscriber->on_up(endpoint); } catch (...) { slogger.warn("Up notification failed {}: {}", endpoint, std::current_exception()); } - } + }); }); }).get(); slogger.debug("Notify node {} has been up", endpoint); @@ -3242,13 +3243,13 @@ void storage_service::notify_joined(inet_address endpoint) container().invoke_on_all([endpoint] (auto&& ss) { return seastar::async([&ss, endpoint] { - for (auto&& subscriber : ss._lifecycle_subscribers) { + ss._lifecycle_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) { try { subscriber->on_join_cluster(endpoint); } catch (...) { slogger.warn("Join cluster notification failed {}: {}", endpoint, std::current_exception()); } - } + }); }); }).get(); slogger.debug("Notify node {} has joined the cluster", endpoint); diff --git a/service/storage_service.hh b/service/storage_service.hh index 56e7cda697..e65b430689 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -326,7 +326,7 @@ private: drain_progress _drain_progress{}; - std::vector _lifecycle_subscribers; + atomic_vector _lifecycle_subscribers; std::unordered_set _bootstrap_tokens; @@ -354,7 +354,7 @@ public: void register_subscriber(endpoint_lifecycle_subscriber* subscriber); - void unregister_subscriber(endpoint_lifecycle_subscriber* subscriber); + future<> unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept; // should only be called via JMX future<> stop_gossiping(); diff --git a/transport/event_notifier.cc b/transport/event_notifier.cc index 4ec4db1a09..0d938285bb 100644 --- a/transport/event_notifier.cc +++ b/transport/event_notifier.cc @@ -42,8 +42,9 @@ cql_server::event_notifier::~event_notifier() future<> cql_server::event_notifier::stop() { return _mnotifier.unregister_listener(this).then([this]{ - service::get_local_storage_service().unregister_subscriber(this); + return service::get_local_storage_service().unregister_subscriber(this).finally([this] { _stopped = true; + }); }); }