mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-25 19:10:42 +00:00
Merge "CQL topology and status change events" from Pekka
"This series implements CQL topology and status change events. We send out all other events except for the "leave cluster" topology event which doesn't have the relevant code converted in storage service. This fixes #88 and fixes #89."
This commit is contained in:
81
service/endpoint_lifecycle_subscriber.hh
Normal file
81
service/endpoint_lifecycle_subscriber.hh
Normal file
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
/**
|
||||
* Interface on which interested parties can be notified of high level endpoint
|
||||
* state changes.
|
||||
*
|
||||
* Note that while IEndpointStateChangeSubscriber notify about gossip related
|
||||
* changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join
|
||||
* gossip), this interface allows to be notified about higher level events.
|
||||
*/
|
||||
class endpoint_lifecycle_subscriber {
|
||||
public:
|
||||
virtual ~endpoint_lifecycle_subscriber()
|
||||
{ }
|
||||
|
||||
/**
|
||||
* Called when a new node joins the cluster, i.e. either has just been
|
||||
* bootstrapped or "instajoins".
|
||||
*
|
||||
* @param endpoint the newly added endpoint.
|
||||
*/
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) = 0;
|
||||
|
||||
/**
|
||||
* Called when a new node leave the cluster (decommission or removeToken).
|
||||
*
|
||||
* @param endpoint the endpoint that is leaving.
|
||||
*/
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) = 0;
|
||||
|
||||
/**
|
||||
* Called when a node is marked UP.
|
||||
*
|
||||
* @param endpoint the endpoint marked UP.
|
||||
*/
|
||||
virtual void on_up(const gms::inet_address& endpoint) = 0;
|
||||
|
||||
/**
|
||||
* Called when a node is marked DOWN.
|
||||
*
|
||||
* @param endpoint the endpoint marked DOWN.
|
||||
*/
|
||||
virtual void on_down(const gms::inet_address& endpoint) = 0;
|
||||
|
||||
/**
|
||||
* Called when a node has moved (to a new token).
|
||||
*
|
||||
* @param endpoint the endpoint that has moved.
|
||||
*/
|
||||
virtual void on_move(const gms::inet_address& endpoint) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -511,11 +511,17 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
|
||||
if (is_moving) {
|
||||
_token_metadata.remove_from_moving(endpoint);
|
||||
// for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
|
||||
// subscriber.onMove(endpoint);
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
subscriber->on_move(endpoint);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
|
||||
// subscriber.onJoinCluster(endpoint);
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
subscriber->on_join_cluster(endpoint);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// PendingRangeCalculatorService.instance.update();
|
||||
@@ -649,15 +655,16 @@ void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep
|
||||
void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
logger.debug("on_alive endpoint={}", endpoint);
|
||||
get_local_migration_manager().schedule_schema_pull(endpoint, state);
|
||||
if (_token_metadata.is_member(endpoint)) {
|
||||
#if 0
|
||||
|
||||
if (_token_metadata.isMember(endpoint))
|
||||
{
|
||||
HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
|
||||
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
|
||||
subscriber.onUp(endpoint);
|
||||
}
|
||||
#endif
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
subscriber->on_up(endpoint);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) {
|
||||
@@ -713,9 +720,12 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st
|
||||
logger.debug("on_restart endpoint={}", endpoint);
|
||||
#if 0
|
||||
MessagingService.instance().convict(endpoint);
|
||||
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
|
||||
subscriber.onDown(endpoint);
|
||||
#endif
|
||||
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
|
||||
for (auto&& subscriber : ss._lifecycle_subscribers) {
|
||||
subscriber->on_down(endpoint);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
@@ -820,6 +830,16 @@ future<> storage_service::set_tokens(std::unordered_set<token> tokens) {
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::register_subscriber(endpoint_lifecycle_subscriber* subscriber)
|
||||
{
|
||||
_lifecycle_subscribers.emplace_back(subscriber);
|
||||
}
|
||||
|
||||
void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber)
|
||||
{
|
||||
_lifecycle_subscribers.erase(std::remove(_lifecycle_subscribers.begin(), _lifecycle_subscribers.end(), subscriber), _lifecycle_subscribers.end());
|
||||
}
|
||||
|
||||
future<> storage_service::init_server(int delay) {
|
||||
#if 0
|
||||
logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
@@ -158,9 +159,11 @@ private:
|
||||
private volatile int totalCFs, remainingCFs;
|
||||
|
||||
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
|
||||
#endif
|
||||
|
||||
private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
|
||||
std::vector<endpoint_lifecycle_subscriber*> _lifecycle_subscribers;
|
||||
|
||||
#if 0
|
||||
private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
|
||||
|
||||
private final ObjectName jmxObjectName;
|
||||
@@ -182,17 +185,13 @@ public:
|
||||
{
|
||||
this.daemon = daemon;
|
||||
}
|
||||
#endif
|
||||
|
||||
public void register(IEndpointLifecycleSubscriber subscriber)
|
||||
{
|
||||
lifecycleSubscribers.add(subscriber);
|
||||
}
|
||||
void register_subscriber(endpoint_lifecycle_subscriber* subscriber);
|
||||
|
||||
public void unregister(IEndpointLifecycleSubscriber subscriber)
|
||||
{
|
||||
lifecycleSubscribers.remove(subscriber);
|
||||
}
|
||||
void unregister_subscriber(endpoint_lifecycle_subscriber* subscriber);
|
||||
|
||||
#if 0
|
||||
// should only be called via JMX
|
||||
public void stopGossiping()
|
||||
{
|
||||
|
||||
@@ -24,7 +24,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "core/sstring.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/net/api.hh>
|
||||
|
||||
#include <experimental/optional>
|
||||
|
||||
@@ -71,39 +74,40 @@ public:
|
||||
protected abstract void serializeEvent(ByteBuf dest, int version);
|
||||
protected abstract int eventSerializedSize(int version);
|
||||
#endif
|
||||
class topology_change;
|
||||
class status_change;
|
||||
class schema_change;
|
||||
};
|
||||
|
||||
class event::topology_change : public event {
|
||||
public:
|
||||
enum class change_type { NEW_NODE, REMOVED_NODE, MOVED_NODE };
|
||||
|
||||
const change_type change;
|
||||
const ipv4_addr node;
|
||||
|
||||
topology_change(change_type change, const ipv4_addr& node)
|
||||
: event{event_type::TOPOLOGY_CHANGE}
|
||||
, change{change}
|
||||
, node{node}
|
||||
{ }
|
||||
|
||||
static topology_change new_node(const gms::inet_address& host, uint16_t port)
|
||||
{
|
||||
return topology_change{change_type::NEW_NODE, ipv4_addr{host.raw_addr(), port}};
|
||||
}
|
||||
|
||||
static topology_change removed_node(const gms::inet_address& host, uint16_t port)
|
||||
{
|
||||
return topology_change{change_type::REMOVED_NODE, ipv4_addr{host.raw_addr(), port}};
|
||||
}
|
||||
|
||||
static topology_change moved_node(const gms::inet_address& host, uint16_t port)
|
||||
{
|
||||
return topology_change{change_type::MOVED_NODE, ipv4_addr{host.raw_addr(), port}};
|
||||
}
|
||||
|
||||
#if 0
|
||||
public static class TopologyChange extends Event
|
||||
{
|
||||
public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE }
|
||||
|
||||
public final Change change;
|
||||
public final InetSocketAddress node;
|
||||
|
||||
private TopologyChange(Change change, InetSocketAddress node)
|
||||
{
|
||||
super(Type.TOPOLOGY_CHANGE);
|
||||
this.change = change;
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
public static TopologyChange newNode(InetAddress host, int port)
|
||||
{
|
||||
return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(host, port));
|
||||
}
|
||||
|
||||
public static TopologyChange removedNode(InetAddress host, int port)
|
||||
{
|
||||
return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port));
|
||||
}
|
||||
|
||||
public static TopologyChange movedNode(InetAddress host, int port)
|
||||
{
|
||||
return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port));
|
||||
}
|
||||
|
||||
// Assumes the type has already been deserialized
|
||||
private static TopologyChange deserializeEvent(ByteBuf cb, int version)
|
||||
{
|
||||
@@ -145,32 +149,33 @@ public:
|
||||
return Objects.equal(change, tpc.change)
|
||||
&& Objects.equal(node, tpc.node);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
public static class StatusChange extends Event
|
||||
{
|
||||
public enum Status { UP, DOWN }
|
||||
class event::status_change : public event {
|
||||
public:
|
||||
enum class status_type { UP, DOWN };
|
||||
|
||||
public final Status status;
|
||||
public final InetSocketAddress node;
|
||||
const status_type status;
|
||||
const ipv4_addr node;
|
||||
|
||||
private StatusChange(Status status, InetSocketAddress node)
|
||||
status_change(status_type status, const ipv4_addr& node)
|
||||
: event{event_type::STATUS_CHANGE}
|
||||
, status{status}
|
||||
, node{node}
|
||||
{ }
|
||||
|
||||
static status_change node_up(const gms::inet_address& host, uint16_t port)
|
||||
{
|
||||
super(Type.STATUS_CHANGE);
|
||||
this.status = status;
|
||||
this.node = node;
|
||||
return status_change{status_type::UP, ipv4_addr{host.raw_addr(), port}};
|
||||
}
|
||||
|
||||
public static StatusChange nodeUp(InetAddress host, int port)
|
||||
static status_change node_down(const gms::inet_address& host, uint16_t port)
|
||||
{
|
||||
return new StatusChange(Status.UP, new InetSocketAddress(host, port));
|
||||
}
|
||||
|
||||
public static StatusChange nodeDown(InetAddress host, int port)
|
||||
{
|
||||
return new StatusChange(Status.DOWN, new InetSocketAddress(host, port));
|
||||
return status_change{status_type::DOWN, ipv4_addr{host.raw_addr(), port}};
|
||||
}
|
||||
|
||||
#if 0
|
||||
// Assumes the type has already been deserialized
|
||||
private static StatusChange deserializeEvent(ByteBuf cb, int version)
|
||||
{
|
||||
@@ -212,8 +217,8 @@ public:
|
||||
return Objects.equal(status, stc.status)
|
||||
&& Objects.equal(node, stc.node);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
class event::schema_change : public event {
|
||||
public:
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <boost/range/adaptor/sliced.hpp>
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "core/reactor.hh"
|
||||
@@ -124,6 +125,25 @@ inline int16_t consistency_to_wire(db::consistency_level c)
|
||||
}
|
||||
}
|
||||
|
||||
sstring to_string(const transport::event::topology_change::change_type t) {
|
||||
using type = transport::event::topology_change::change_type;
|
||||
switch (t) {
|
||||
case type::NEW_NODE: return "NEW_NODE";
|
||||
case type::REMOVED_NODE: return "REMOVED_NODE";
|
||||
case type::MOVED_NODE: return "MOVED_NODE";
|
||||
}
|
||||
throw std::invalid_argument("unknown change type");
|
||||
}
|
||||
|
||||
sstring to_string(const transport::event::status_change::status_type t) {
|
||||
using type = transport::event::status_change::status_type;
|
||||
switch (t) {
|
||||
case type::UP: return "NEW_NODE";
|
||||
case type::DOWN: return "REMOVED_NODE";
|
||||
}
|
||||
throw std::invalid_argument("unknown change type");
|
||||
}
|
||||
|
||||
sstring to_string(const transport::event::schema_change::change_type t) {
|
||||
switch (t) {
|
||||
case transport::event::schema_change::change_type::CREATED: return "CREATED";
|
||||
@@ -220,6 +240,8 @@ private:
|
||||
future<> write_ready(int16_t stream);
|
||||
future<> write_supported(int16_t stream);
|
||||
future<> write_result(int16_t stream, shared_ptr<transport::messages::result_message> msg);
|
||||
future<> write_topology_change_event(const transport::event::topology_change& event);
|
||||
future<> write_status_change_event(const transport::event::status_change& event);
|
||||
future<> write_schema_change_event(const transport::event::schema_change& event);
|
||||
future<> write_response(shared_ptr<cql_server::response> response);
|
||||
|
||||
@@ -259,15 +281,18 @@ private:
|
||||
friend event_notifier;
|
||||
};
|
||||
|
||||
cql_server::event_notifier::event_notifier(uint16_t port)
|
||||
: _port{port}
|
||||
{
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::register_event(transport::event::event_type et, cql_server::connection* conn)
|
||||
{
|
||||
switch (et) {
|
||||
case transport::event::event_type::TOPOLOGY_CHANGE:
|
||||
logger.warn("TOPOLOGY_CHANGE events are not supported");
|
||||
_topology_change_listeners.emplace(conn);
|
||||
break;
|
||||
case transport::event::event_type::STATUS_CHANGE:
|
||||
logger.warn("STATUS_CHANGE events are not supported");
|
||||
_status_change_listeners.emplace(conn);
|
||||
break;
|
||||
case transport::event::event_type::SCHEMA_CHANGE:
|
||||
@@ -424,6 +449,46 @@ void cql_server::event_notifier::on_drop_aggregate(const sstring& ks_name, const
|
||||
logger.warn("%s event ignored", __func__);
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_join_cluster(const gms::inet_address& endpoint)
|
||||
{
|
||||
for (auto&& conn : _topology_change_listeners) {
|
||||
using namespace transport;
|
||||
conn->write_topology_change_event(event::topology_change::new_node(endpoint, _port));
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint)
|
||||
{
|
||||
for (auto&& conn : _topology_change_listeners) {
|
||||
using namespace transport;
|
||||
conn->write_topology_change_event(event::topology_change::removed_node(endpoint, _port));
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_move(const gms::inet_address& endpoint)
|
||||
{
|
||||
for (auto&& conn : _topology_change_listeners) {
|
||||
using namespace transport;
|
||||
conn->write_topology_change_event(event::topology_change::moved_node(endpoint, _port));
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_up(const gms::inet_address& endpoint)
|
||||
{
|
||||
for (auto&& conn : _status_change_listeners) {
|
||||
using namespace transport;
|
||||
conn->write_status_change_event(event::status_change::node_up(endpoint, _port));
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_down(const gms::inet_address& endpoint)
|
||||
{
|
||||
for (auto&& conn : _status_change_listeners) {
|
||||
using namespace transport;
|
||||
conn->write_status_change_event(event::status_change::node_down(endpoint, _port));
|
||||
}
|
||||
}
|
||||
|
||||
class cql_server::response {
|
||||
int16_t _stream;
|
||||
cql_binary_opcode _opcode;
|
||||
@@ -460,7 +525,6 @@ cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<c
|
||||
: _proxy(proxy)
|
||||
, _query_processor(qp)
|
||||
, _collectd_registrations(std::make_unique<scollectd::registrations>(setup_collectd()))
|
||||
, _notifier{std::make_unique<event_notifier>()}
|
||||
{
|
||||
}
|
||||
|
||||
@@ -487,13 +551,16 @@ cql_server::setup_collectd() {
|
||||
}
|
||||
|
||||
future<> cql_server::stop() {
|
||||
service::get_local_storage_service().unregister_subscriber(_notifier.get());
|
||||
service::get_local_migration_manager().unregister_listener(_notifier.get());
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(ipv4_addr addr) {
|
||||
_notifier = std::make_unique<event_notifier>(addr.port);
|
||||
service::get_local_migration_manager().register_listener(_notifier.get());
|
||||
service::get_local_storage_service().register_subscriber(_notifier.get());
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
|
||||
@@ -918,6 +985,24 @@ future<> cql_server::connection::write_result(int16_t stream, shared_ptr<transpo
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_topology_change_event(const transport::event::topology_change& event)
|
||||
{
|
||||
auto response = make_shared<cql_server::response>(-1, cql_binary_opcode::EVENT);
|
||||
response->write_string("TOPOLOGY_CHANGE");
|
||||
response->write_string(to_string(event.change));
|
||||
response->write_inet(event.node);
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_status_change_event(const transport::event::status_change& event)
|
||||
{
|
||||
auto response = make_shared<cql_server::response>(-1, cql_binary_opcode::EVENT);
|
||||
response->write_string("STATUS_CHANGE");
|
||||
response->write_string(to_string(event.status));
|
||||
response->write_inet(event.node);
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_schema_change_event(const transport::event::schema_change& event)
|
||||
{
|
||||
auto response = make_shared<cql_server::response>(-1, cql_binary_opcode::EVENT);
|
||||
@@ -1207,7 +1292,7 @@ sstring cql_server::response::make_frame(uint8_t version, size_t length)
|
||||
|
||||
void cql_server::response::write_byte(uint8_t b)
|
||||
{
|
||||
_body.insert(_body.end(), b, 1);
|
||||
_body.insert(_body.end(), b);
|
||||
}
|
||||
|
||||
void cql_server::response::write_int(int32_t n)
|
||||
@@ -1288,8 +1373,12 @@ void cql_server::response::write_option_list(std::vector<std::pair<int16_t, boos
|
||||
|
||||
void cql_server::response::write_inet(ipv4_addr inet)
|
||||
{
|
||||
// FIXME
|
||||
assert(0);
|
||||
write_byte(4);
|
||||
write_byte(((inet.ip & 0xff000000) >> 24));
|
||||
write_byte(((inet.ip & 0x00ff0000) >> 16));
|
||||
write_byte(((inet.ip & 0x0000ff00) >> 8 ));
|
||||
write_byte(((inet.ip & 0x000000ff) ));
|
||||
write_int(inet.port);
|
||||
}
|
||||
|
||||
void cql_server::response::write_consistency(db::consistency_level c)
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
#define CQL_SERVER_HH
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
@@ -46,11 +47,15 @@ private:
|
||||
friend class type_codec;
|
||||
};
|
||||
|
||||
class cql_server::event_notifier : public service::migration_listener {
|
||||
class cql_server::event_notifier : public service::migration_listener,
|
||||
public service::endpoint_lifecycle_subscriber
|
||||
{
|
||||
uint16_t _port;
|
||||
std::set<cql_server::connection*> _topology_change_listeners;
|
||||
std::set<cql_server::connection*> _status_change_listeners;
|
||||
std::set<cql_server::connection*> _schema_change_listeners;
|
||||
public:
|
||||
event_notifier(uint16_t port);
|
||||
void register_event(transport::event::event_type et, cql_server::connection* conn);
|
||||
void unregister_connection(cql_server::connection* conn);
|
||||
|
||||
@@ -71,6 +76,12 @@ public:
|
||||
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override;
|
||||
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override;
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
||||
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_up(const gms::inet_address& endpoint) override;
|
||||
virtual void on_down(const gms::inet_address& endpoint) override;
|
||||
virtual void on_move(const gms::inet_address& endpoint) override;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user