From ed9cd23a2dc10f14d5e7bc39bbc7aec5dff57705 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 27 Nov 2015 09:48:18 +0800 Subject: [PATCH] transport: Fix duplicate up/down messages sent to native clients This patch plus pekka's previous commit 3c72ea9f96a02bdb "gms: Fix gossiper::handle_major_state_change() restart logic" fix CASSANDRA-7816. Backported from: def4835 Add missing follow on fix for 7816 only applied to cassandra-2.1 branch in 763130bdbde2f4cec2e8973bcd5203caf51cc89f 763130b Followup commit for 7816 2199a87 Fix duplicate up/down messages sent to native clients Tested by: pushed_notifications_test.py:TestPushedNotifications.restart_node_test --- transport/event_notifier.cc | 28 ++++++++++++++++++---------- transport/server.hh | 1 + 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/transport/event_notifier.cc b/transport/event_notifier.cc index 36bd9f6bf5..333394caa4 100644 --- a/transport/event_notifier.cc +++ b/transport/event_notifier.cc @@ -244,21 +244,29 @@ void cql_server::event_notifier::on_move(const gms::inet_address& endpoint) void cql_server::event_notifier::on_up(const gms::inet_address& endpoint) { - for (auto&& conn : _status_change_listeners) { - using namespace transport; - with_gate(conn->_pending_requests_gate, [&] { - return conn->write_response(conn->make_status_change_event(event::status_change::node_up(endpoint, _port))); - }); + bool was_up = _last_status_change.count(endpoint) && _last_status_change.at(endpoint) == event::status_change::status_type::UP; + _last_status_change[endpoint] = event::status_change::status_type::UP; + if (!was_up) { + for (auto&& conn : _status_change_listeners) { + using namespace transport; + with_gate(conn->_pending_requests_gate, [&] { + return conn->write_response(conn->make_status_change_event(event::status_change::node_up(endpoint, _port))); + }); + } } } void cql_server::event_notifier::on_down(const gms::inet_address& endpoint) { - for (auto&& conn : _status_change_listeners) { - using namespace transport; - with_gate(conn->_pending_requests_gate, [&] { - return conn->write_response(conn->make_status_change_event(event::status_change::node_down(endpoint, _port))); - }); + bool was_down = _last_status_change.count(endpoint) && _last_status_change.at(endpoint) == event::status_change::status_type::DOWN; + _last_status_change[endpoint] = event::status_change::status_type::DOWN; + if (!was_down) { + for (auto&& conn : _status_change_listeners) { + using namespace transport; + with_gate(conn->_pending_requests_gate, [&] { + return conn->write_response(conn->make_status_change_event(event::status_change::node_down(endpoint, _port))); + }); + } } } diff --git a/transport/server.hh b/transport/server.hh index 989ab11018..581615e974 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -209,6 +209,7 @@ class cql_server::event_notifier : public service::migration_listener, std::set _topology_change_listeners; std::set _status_change_listeners; std::set _schema_change_listeners; + std::unordered_map _last_status_change; public: event_notifier(uint16_t port); void register_event(transport::event::event_type et, cql_server::connection* conn);