From bbe2c52d9b7fe66551fddda5a96c97b812260743 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 12:29:35 +0300 Subject: [PATCH 1/8] service: Import IEndpointLifecycleSubscriber.java Signed-off-by: Pekka Enberg --- service/IEndpointLifecycleSubscriber.java | 67 +++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 service/IEndpointLifecycleSubscriber.java diff --git a/service/IEndpointLifecycleSubscriber.java b/service/IEndpointLifecycleSubscriber.java new file mode 100644 index 0000000000..24cb3d780f --- /dev/null +++ b/service/IEndpointLifecycleSubscriber.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.net.InetAddress; + +/** + * Interface on which interested parties can be notified of high level endpoint + * state changes. + * + * Note that while IEndpointStateChangeSubscriber notify about gossip related + * changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join + * gossip), this interface allows to be notified about higher level events. + */ +public interface IEndpointLifecycleSubscriber +{ + /** + * Called when a new node joins the cluster, i.e. either has just been + * bootstrapped or "instajoins". + * + * @param endpoint the newly added endpoint. + */ + public void onJoinCluster(InetAddress endpoint); + + /** + * Called when a new node leave the cluster (decommission or removeToken). + * + * @param endpoint the endpoint that is leaving. + */ + public void onLeaveCluster(InetAddress endpoint); + + /** + * Called when a node is marked UP. + * + * @param endpoint the endpoint marked UP. + */ + public void onUp(InetAddress endpoint); + + /** + * Called when a node is marked DOWN. + * + * @param endpoint the endpoint marked DOWN. + */ + public void onDown(InetAddress endpoint); + + /** + * Called when a node has moved (to a new token). + * + * @param endpoint the endpoint that has moved. + */ + public void onMove(InetAddress endpoint); +} From 8d0d60168e72d7f1107eed94e2fb06f8ece710fa Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 12:31:41 +0300 Subject: [PATCH 2/8] service: Convert IEndpointLifecycleSubscriber to C++ Signed-off-by: Pekka Enberg --- ....java => endpoint_lifecycle_subscriber.hh} | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) rename service/{IEndpointLifecycleSubscriber.java => endpoint_lifecycle_subscriber.hh} (75%) diff --git a/service/IEndpointLifecycleSubscriber.java b/service/endpoint_lifecycle_subscriber.hh similarity index 75% rename from service/IEndpointLifecycleSubscriber.java rename to service/endpoint_lifecycle_subscriber.hh index 24cb3d780f..453e8c46f0 100644 --- a/service/IEndpointLifecycleSubscriber.java +++ b/service/endpoint_lifecycle_subscriber.hh @@ -15,9 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.service; -import java.net.InetAddress; +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include "gms/inet_address.hh" + +namespace service { /** * Interface on which interested parties can be notified of high level endpoint @@ -27,41 +36,46 @@ import java.net.InetAddress; * changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join * gossip), this interface allows to be notified about higher level events. */ -public interface IEndpointLifecycleSubscriber -{ +class endpoint_lifecycle_subscriber { +public: + virtual ~endpoint_lifecycle_subscriber() + { } + /** * Called when a new node joins the cluster, i.e. either has just been * bootstrapped or "instajoins". * * @param endpoint the newly added endpoint. */ - public void onJoinCluster(InetAddress endpoint); + virtual void on_join_cluster(const gms::inet_address& endpoint) = 0; /** * Called when a new node leave the cluster (decommission or removeToken). * * @param endpoint the endpoint that is leaving. */ - public void onLeaveCluster(InetAddress endpoint); + virtual void on_leave_cluster(const gms::inet_address& endpoint) = 0; /** * Called when a node is marked UP. * * @param endpoint the endpoint marked UP. */ - public void onUp(InetAddress endpoint); + virtual void on_up(const gms::inet_address& endpoint) = 0; /** * Called when a node is marked DOWN. * * @param endpoint the endpoint marked DOWN. */ - public void onDown(InetAddress endpoint); + virtual void on_down(const gms::inet_address& endpoint) = 0; /** * Called when a node has moved (to a new token). * * @param endpoint the endpoint that has moved. */ - public void onMove(InetAddress endpoint); + virtual void on_move(const gms::inet_address& endpoint) = 0; +}; + } From 921d9386cc1f9fe635db2721d06311db87383524 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 12:36:55 +0300 Subject: [PATCH 3/8] service/storage_service: Endpoint lifecycle subscriber hooks Signed-off-by: Pekka Enberg --- service/storage_service.cc | 44 +++++++++++++++++++++++++++----------- service/storage_service.hh | 17 +++++++-------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index be0ca705f4..3969707fa6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -511,11 +511,17 @@ void storage_service::handle_state_normal(inet_address endpoint) { if (is_moving) { _token_metadata.remove_from_moving(endpoint); - // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - // subscriber.onMove(endpoint); + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_move(endpoint); + } + }); } else { - // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - // subscriber.onJoinCluster(endpoint); + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_join_cluster(endpoint); + } + }); } // PendingRangeCalculatorService.instance.update(); @@ -649,15 +655,16 @@ void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { logger.debug("on_alive endpoint={}", endpoint); get_local_migration_manager().schedule_schema_pull(endpoint, state); + if (_token_metadata.is_member(endpoint)) { #if 0 - - if (_token_metadata.isMember(endpoint)) - { HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onUp(endpoint); - } #endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_up(endpoint); + } + }); + } } void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) { @@ -713,9 +720,12 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st logger.debug("on_restart endpoint={}", endpoint); #if 0 MessagingService.instance().convict(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onDown(endpoint); #endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_down(endpoint); + } + }); } void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { @@ -820,6 +830,16 @@ future<> storage_service::set_tokens(std::unordered_set tokens) { }); } +void storage_service::register_subscriber(endpoint_lifecycle_subscriber* subscriber) +{ + _lifecycle_subscribers.emplace_back(subscriber); +} + +void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) +{ + _lifecycle_subscribers.erase(std::remove(_lifecycle_subscribers.begin(), _lifecycle_subscribers.end(), subscriber), _lifecycle_subscribers.end()); +} + future<> storage_service::init_server(int delay) { #if 0 logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); diff --git a/service/storage_service.hh b/service/storage_service.hh index a126faf934..2563b3d9bf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -23,6 +23,7 @@ #pragma once #include "gms/i_endpoint_state_change_subscriber.hh" +#include "service/endpoint_lifecycle_subscriber.hh" #include "locator/token_metadata.hh" #include "gms/gossiper.hh" #include "utils/UUID_gen.hh" @@ -158,9 +159,11 @@ private: private volatile int totalCFs, remainingCFs; private static final AtomicInteger nextRepairCommand = new AtomicInteger(); +#endif - private final List lifecycleSubscribers = new CopyOnWriteArrayList<>(); + std::vector _lifecycle_subscribers; +#if 0 private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); private final ObjectName jmxObjectName; @@ -182,17 +185,13 @@ public: { this.daemon = daemon; } +#endif - public void register(IEndpointLifecycleSubscriber subscriber) - { - lifecycleSubscribers.add(subscriber); - } + void register_subscriber(endpoint_lifecycle_subscriber* subscriber); - public void unregister(IEndpointLifecycleSubscriber subscriber) - { - lifecycleSubscribers.remove(subscriber); - } + void unregister_subscriber(endpoint_lifecycle_subscriber* subscriber); +#if 0 // should only be called via JMX public void stopGossiping() { From f5ac4a3738de7bce39988805ec29e11188833cf1 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 13:48:53 +0300 Subject: [PATCH 4/8] transport/event: Convert TopologyChange and StatusChange to C++ Signed-off-by: Pekka Enberg --- transport/event.hh | 101 ++++++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/transport/event.hh b/transport/event.hh index 4fb8b7dd2e..180f160597 100644 --- a/transport/event.hh +++ b/transport/event.hh @@ -24,7 +24,10 @@ #pragma once -#include "core/sstring.hh" +#include "gms/inet_address.hh" + +#include +#include #include @@ -71,39 +74,40 @@ public: protected abstract void serializeEvent(ByteBuf dest, int version); protected abstract int eventSerializedSize(int version); #endif + class topology_change; + class status_change; class schema_change; }; + class event::topology_change : public event { + public: + enum class change_type { NEW_NODE, REMOVED_NODE, MOVED_NODE }; + + const change_type change; + const ipv4_addr node; + + topology_change(change_type change, const ipv4_addr& node) + : event{event_type::TOPOLOGY_CHANGE} + , change{change} + , node{node} + { } + + static topology_change new_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::NEW_NODE, ipv4_addr{host.raw_addr(), port}}; + } + + static topology_change removed_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::REMOVED_NODE, ipv4_addr{host.raw_addr(), port}}; + } + + static topology_change moved_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::MOVED_NODE, ipv4_addr{host.raw_addr(), port}}; + } + #if 0 - public static class TopologyChange extends Event - { - public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE } - - public final Change change; - public final InetSocketAddress node; - - private TopologyChange(Change change, InetSocketAddress node) - { - super(Type.TOPOLOGY_CHANGE); - this.change = change; - this.node = node; - } - - public static TopologyChange newNode(InetAddress host, int port) - { - return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(host, port)); - } - - public static TopologyChange removedNode(InetAddress host, int port) - { - return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port)); - } - - public static TopologyChange movedNode(InetAddress host, int port) - { - return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port)); - } - // Assumes the type has already been deserialized private static TopologyChange deserializeEvent(ByteBuf cb, int version) { @@ -145,32 +149,33 @@ public: return Objects.equal(change, tpc.change) && Objects.equal(node, tpc.node); } - } +#endif + }; - public static class StatusChange extends Event - { - public enum Status { UP, DOWN } + class event::status_change : public event { + public: + enum class status_type { UP, DOWN }; - public final Status status; - public final InetSocketAddress node; + const status_type status; + const ipv4_addr node; - private StatusChange(Status status, InetSocketAddress node) + status_change(status_type status, const ipv4_addr& node) + : event{event_type::STATUS_CHANGE} + , status{status} + , node{node} + { } + + static status_change node_up(const gms::inet_address& host, uint16_t port) { - super(Type.STATUS_CHANGE); - this.status = status; - this.node = node; + return status_change{status_type::UP, ipv4_addr{host.raw_addr(), port}}; } - public static StatusChange nodeUp(InetAddress host, int port) + static status_change node_down(const gms::inet_address& host, uint16_t port) { - return new StatusChange(Status.UP, new InetSocketAddress(host, port)); - } - - public static StatusChange nodeDown(InetAddress host, int port) - { - return new StatusChange(Status.DOWN, new InetSocketAddress(host, port)); + return status_change{status_type::DOWN, ipv4_addr{host.raw_addr(), port}}; } +#if 0 // Assumes the type has already been deserialized private static StatusChange deserializeEvent(ByteBuf cb, int version) { @@ -212,8 +217,8 @@ public: return Objects.equal(status, stc.status) && Objects.equal(node, stc.node); } - } #endif + }; class event::schema_change : public event { public: From 92c813d5620958157617d12cb7ae2a561cac9462 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 14:11:30 +0300 Subject: [PATCH 5/8] transport/server: Topology and status change event encoding Signed-off-by: Pekka Enberg --- transport/server.cc | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/transport/server.cc b/transport/server.cc index 786162f0b6..f5811bf1a0 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -124,6 +124,25 @@ inline int16_t consistency_to_wire(db::consistency_level c) } } +sstring to_string(const transport::event::topology_change::change_type t) { + using type = transport::event::topology_change::change_type; + switch (t) { + case type::NEW_NODE: return "NEW_NODE"; + case type::REMOVED_NODE: return "REMOVED_NODE"; + case type::MOVED_NODE: return "MOVED_NODE"; + } + throw std::invalid_argument("unknown change type"); +} + +sstring to_string(const transport::event::status_change::status_type t) { + using type = transport::event::status_change::status_type; + switch (t) { + case type::UP: return "NEW_NODE"; + case type::DOWN: return "REMOVED_NODE"; + } + throw std::invalid_argument("unknown change type"); +} + sstring to_string(const transport::event::schema_change::change_type t) { switch (t) { case transport::event::schema_change::change_type::CREATED: return "CREATED"; @@ -220,6 +239,8 @@ private: future<> write_ready(int16_t stream); future<> write_supported(int16_t stream); future<> write_result(int16_t stream, shared_ptr msg); + future<> write_topology_change_event(const transport::event::topology_change& event); + future<> write_status_change_event(const transport::event::status_change& event); future<> write_schema_change_event(const transport::event::schema_change& event); future<> write_response(shared_ptr response); @@ -918,6 +939,24 @@ future<> cql_server::connection::write_result(int16_t stream, shared_ptr cql_server::connection::write_topology_change_event(const transport::event::topology_change& event) +{ + auto response = make_shared(-1, cql_binary_opcode::EVENT); + response->write_string("TOPOLOGY_CHANGE"); + response->write_string(to_string(event.change)); + response->write_inet(event.node); + return write_response(response); +} + +future<> cql_server::connection::write_status_change_event(const transport::event::status_change& event) +{ + auto response = make_shared(-1, cql_binary_opcode::EVENT); + response->write_string("STATUS_CHANGE"); + response->write_string(to_string(event.status)); + response->write_inet(event.node); + return write_response(response); +} + future<> cql_server::connection::write_schema_change_event(const transport::event::schema_change& event) { auto response = make_shared(-1, cql_binary_opcode::EVENT); From 096e02469e84cff6761c4ea6b915bbc438643d93 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 15:22:56 +0300 Subject: [PATCH 6/8] transport/server: Fix response::write_byte() The last two arguments to vector::insert() are flipped which causes us to fill the vector with 'b' bytes of value 0x01. Switch to the single element insertion variant to fix the issue. Spotted by dtest push notification tests. Signed-off-by: Pekka Enberg --- transport/server.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/server.cc b/transport/server.cc index f5811bf1a0..6ffded2535 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1246,7 +1246,7 @@ sstring cql_server::response::make_frame(uint8_t version, size_t length) void cql_server::response::write_byte(uint8_t b) { - _body.insert(_body.end(), b, 1); + _body.insert(_body.end(), b); } void cql_server::response::write_int(int32_t n) From dc4171e8d75f74b4945b905f160fc94bd47f9bb2 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 14:48:30 +0300 Subject: [PATCH 7/8] transport/server: Fix response::write_inet() Signed-off-by: Pekka Enberg --- transport/server.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/transport/server.cc b/transport/server.cc index 6ffded2535..7719413568 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1327,8 +1327,12 @@ void cql_server::response::write_option_list(std::vector> 24)); + write_byte(((inet.ip & 0x00ff0000) >> 16)); + write_byte(((inet.ip & 0x0000ff00) >> 8 )); + write_byte(((inet.ip & 0x000000ff) )); + write_int(inet.port); } void cql_server::response::write_consistency(db::consistency_level c) From c8128de562364436f1ee8c46f5001929c7e623c5 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Aug 2015 12:54:29 +0300 Subject: [PATCH 8/8] transport/server: Hook event notifier to lifecycle events Signed-off-by: Pekka Enberg --- transport/server.cc | 52 ++++++++++++++++++++++++++++++++++++++++++--- transport/server.hh | 13 +++++++++++- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/transport/server.cc b/transport/server.cc index 7719413568..3cd5fcf8b3 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -12,6 +12,7 @@ #include #include "service/migration_manager.hh" +#include "service/storage_service.hh" #include "db/consistency_level.hh" #include "core/future-util.hh" #include "core/reactor.hh" @@ -280,15 +281,18 @@ private: friend event_notifier; }; +cql_server::event_notifier::event_notifier(uint16_t port) + : _port{port} +{ +} + void cql_server::event_notifier::register_event(transport::event::event_type et, cql_server::connection* conn) { switch (et) { case transport::event::event_type::TOPOLOGY_CHANGE: - logger.warn("TOPOLOGY_CHANGE events are not supported"); _topology_change_listeners.emplace(conn); break; case transport::event::event_type::STATUS_CHANGE: - logger.warn("STATUS_CHANGE events are not supported"); _status_change_listeners.emplace(conn); break; case transport::event::event_type::SCHEMA_CHANGE: @@ -445,6 +449,46 @@ void cql_server::event_notifier::on_drop_aggregate(const sstring& ks_name, const logger.warn("%s event ignored", __func__); } +void cql_server::event_notifier::on_join_cluster(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::new_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::removed_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_move(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::moved_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_up(const gms::inet_address& endpoint) +{ + for (auto&& conn : _status_change_listeners) { + using namespace transport; + conn->write_status_change_event(event::status_change::node_up(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_down(const gms::inet_address& endpoint) +{ + for (auto&& conn : _status_change_listeners) { + using namespace transport; + conn->write_status_change_event(event::status_change::node_down(endpoint, _port)); + } +} + class cql_server::response { int16_t _stream; cql_binary_opcode _opcode; @@ -481,7 +525,6 @@ cql_server::cql_server(distributed& proxy, distributed(setup_collectd())) - , _notifier{std::make_unique()} { } @@ -508,13 +551,16 @@ cql_server::setup_collectd() { } future<> cql_server::stop() { + service::get_local_storage_service().unregister_subscriber(_notifier.get()); service::get_local_migration_manager().unregister_listener(_notifier.get()); return make_ready_future<>(); } future<> cql_server::listen(ipv4_addr addr) { + _notifier = std::make_unique(addr.port); service::get_local_migration_manager().register_listener(_notifier.get()); + service::get_local_storage_service().register_subscriber(_notifier.get()); listen_options lo; lo.reuse_address = true; _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); diff --git a/transport/server.hh b/transport/server.hh index f8263de877..19669780b3 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -6,6 +6,7 @@ #define CQL_SERVER_HH #include "core/reactor.hh" +#include "service/endpoint_lifecycle_subscriber.hh" #include "service/migration_listener.hh" #include "service/storage_proxy.hh" #include "cql3/query_processor.hh" @@ -46,11 +47,15 @@ private: friend class type_codec; }; -class cql_server::event_notifier : public service::migration_listener { +class cql_server::event_notifier : public service::migration_listener, + public service::endpoint_lifecycle_subscriber +{ + uint16_t _port; std::set _topology_change_listeners; std::set _status_change_listeners; std::set _schema_change_listeners; public: + event_notifier(uint16_t port); void register_event(transport::event::event_type et, cql_server::connection* conn); void unregister_connection(cql_server::connection* conn); @@ -71,6 +76,12 @@ public: virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override; virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override; virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override; + + virtual void on_join_cluster(const gms::inet_address& endpoint) override; + virtual void on_leave_cluster(const gms::inet_address& endpoint) override; + virtual void on_up(const gms::inet_address& endpoint) override; + virtual void on_down(const gms::inet_address& endpoint) override; + virtual void on_move(const gms::inet_address& endpoint) override; }; #endif