transport: Fix duplicate up/down messages sent to native clients

This patch plus pekka's previous commit 3c72ea9f96

   "gms: Fix gossiper::handle_major_state_change() restart logic"

fix CASSANDRA-7816.

Backported from:

   def4835 Add missing follow on fix for 7816 only applied to
           cassandra-2.1 branch in 763130bdbde2f4cec2e8973bcd5203caf51cc89f
   763130b Followup commit for 7816
   2199a87 Fix duplicate up/down messages sent to native clients

Tested by:
   pushed_notifications_test.py:TestPushedNotifications.restart_node_test
This commit is contained in:
Asias He
2015-11-27 09:48:18 +08:00
parent 25bb889c2a
commit ed9cd23a2d
2 changed files with 19 additions and 10 deletions

View File

@@ -244,21 +244,29 @@ void cql_server::event_notifier::on_move(const gms::inet_address& endpoint)
void cql_server::event_notifier::on_up(const gms::inet_address& endpoint)
{
for (auto&& conn : _status_change_listeners) {
using namespace transport;
with_gate(conn->_pending_requests_gate, [&] {
return conn->write_response(conn->make_status_change_event(event::status_change::node_up(endpoint, _port)));
});
bool was_up = _last_status_change.count(endpoint) && _last_status_change.at(endpoint) == event::status_change::status_type::UP;
_last_status_change[endpoint] = event::status_change::status_type::UP;
if (!was_up) {
for (auto&& conn : _status_change_listeners) {
using namespace transport;
with_gate(conn->_pending_requests_gate, [&] {
return conn->write_response(conn->make_status_change_event(event::status_change::node_up(endpoint, _port)));
});
}
}
}
void cql_server::event_notifier::on_down(const gms::inet_address& endpoint)
{
for (auto&& conn : _status_change_listeners) {
using namespace transport;
with_gate(conn->_pending_requests_gate, [&] {
return conn->write_response(conn->make_status_change_event(event::status_change::node_down(endpoint, _port)));
});
bool was_down = _last_status_change.count(endpoint) && _last_status_change.at(endpoint) == event::status_change::status_type::DOWN;
_last_status_change[endpoint] = event::status_change::status_type::DOWN;
if (!was_down) {
for (auto&& conn : _status_change_listeners) {
using namespace transport;
with_gate(conn->_pending_requests_gate, [&] {
return conn->write_response(conn->make_status_change_event(event::status_change::node_down(endpoint, _port)));
});
}
}
}

View File

@@ -209,6 +209,7 @@ class cql_server::event_notifier : public service::migration_listener,
std::set<cql_server::connection*> _topology_change_listeners;
std::set<cql_server::connection*> _status_change_listeners;
std::set<cql_server::connection*> _schema_change_listeners;
std::unordered_map<gms::inet_address, event::status_change::status_type> _last_status_change;
public:
event_notifier(uint16_t port);
void register_event(transport::event::event_type et, cql_server::connection* conn);