diff --git a/service/endpoint_lifecycle_subscriber.hh b/service/endpoint_lifecycle_subscriber.hh new file mode 100644 index 0000000000..453e8c46f0 --- /dev/null +++ b/service/endpoint_lifecycle_subscriber.hh @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include "gms/inet_address.hh" + +namespace service { + +/** + * Interface on which interested parties can be notified of high level endpoint + * state changes. + * + * Note that while IEndpointStateChangeSubscriber notify about gossip related + * changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join + * gossip), this interface allows to be notified about higher level events. + */ +class endpoint_lifecycle_subscriber { +public: + virtual ~endpoint_lifecycle_subscriber() + { } + + /** + * Called when a new node joins the cluster, i.e. either has just been + * bootstrapped or "instajoins". + * + * @param endpoint the newly added endpoint. + */ + virtual void on_join_cluster(const gms::inet_address& endpoint) = 0; + + /** + * Called when a new node leave the cluster (decommission or removeToken). + * + * @param endpoint the endpoint that is leaving. + */ + virtual void on_leave_cluster(const gms::inet_address& endpoint) = 0; + + /** + * Called when a node is marked UP. + * + * @param endpoint the endpoint marked UP. + */ + virtual void on_up(const gms::inet_address& endpoint) = 0; + + /** + * Called when a node is marked DOWN. + * + * @param endpoint the endpoint marked DOWN. + */ + virtual void on_down(const gms::inet_address& endpoint) = 0; + + /** + * Called when a node has moved (to a new token). + * + * @param endpoint the endpoint that has moved. + */ + virtual void on_move(const gms::inet_address& endpoint) = 0; +}; + +} diff --git a/service/storage_service.cc b/service/storage_service.cc index be0ca705f4..3969707fa6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -511,11 +511,17 @@ void storage_service::handle_state_normal(inet_address endpoint) { if (is_moving) { _token_metadata.remove_from_moving(endpoint); - // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - // subscriber.onMove(endpoint); + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_move(endpoint); + } + }); } else { - // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - // subscriber.onJoinCluster(endpoint); + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_join_cluster(endpoint); + } + }); } // PendingRangeCalculatorService.instance.update(); @@ -649,15 +655,16 @@ void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { logger.debug("on_alive endpoint={}", endpoint); get_local_migration_manager().schedule_schema_pull(endpoint, state); + if (_token_metadata.is_member(endpoint)) { #if 0 - - if (_token_metadata.isMember(endpoint)) - { HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onUp(endpoint); - } #endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_up(endpoint); + } + }); + } } void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) { @@ -713,9 +720,12 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st logger.debug("on_restart endpoint={}", endpoint); #if 0 MessagingService.instance().convict(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onDown(endpoint); #endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_down(endpoint); + } + }); } void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { @@ -820,6 +830,16 @@ future<> storage_service::set_tokens(std::unordered_set tokens) { }); } +void storage_service::register_subscriber(endpoint_lifecycle_subscriber* subscriber) +{ + _lifecycle_subscribers.emplace_back(subscriber); +} + +void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) +{ + _lifecycle_subscribers.erase(std::remove(_lifecycle_subscribers.begin(), _lifecycle_subscribers.end(), subscriber), _lifecycle_subscribers.end()); +} + future<> storage_service::init_server(int delay) { #if 0 logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); diff --git a/service/storage_service.hh b/service/storage_service.hh index a126faf934..2563b3d9bf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -23,6 +23,7 @@ #pragma once #include "gms/i_endpoint_state_change_subscriber.hh" +#include "service/endpoint_lifecycle_subscriber.hh" #include "locator/token_metadata.hh" #include "gms/gossiper.hh" #include "utils/UUID_gen.hh" @@ -158,9 +159,11 @@ private: private volatile int totalCFs, remainingCFs; private static final AtomicInteger nextRepairCommand = new AtomicInteger(); +#endif - private final List lifecycleSubscribers = new CopyOnWriteArrayList<>(); + std::vector _lifecycle_subscribers; +#if 0 private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); private final ObjectName jmxObjectName; @@ -182,17 +185,13 @@ public: { this.daemon = daemon; } +#endif - public void register(IEndpointLifecycleSubscriber subscriber) - { - lifecycleSubscribers.add(subscriber); - } + void register_subscriber(endpoint_lifecycle_subscriber* subscriber); - public void unregister(IEndpointLifecycleSubscriber subscriber) - { - lifecycleSubscribers.remove(subscriber); - } + void unregister_subscriber(endpoint_lifecycle_subscriber* subscriber); +#if 0 // should only be called via JMX public void stopGossiping() { diff --git a/transport/event.hh b/transport/event.hh index 4fb8b7dd2e..180f160597 100644 --- a/transport/event.hh +++ b/transport/event.hh @@ -24,7 +24,10 @@ #pragma once -#include "core/sstring.hh" +#include "gms/inet_address.hh" + +#include +#include #include @@ -71,39 +74,40 @@ public: protected abstract void serializeEvent(ByteBuf dest, int version); protected abstract int eventSerializedSize(int version); #endif + class topology_change; + class status_change; class schema_change; }; + class event::topology_change : public event { + public: + enum class change_type { NEW_NODE, REMOVED_NODE, MOVED_NODE }; + + const change_type change; + const ipv4_addr node; + + topology_change(change_type change, const ipv4_addr& node) + : event{event_type::TOPOLOGY_CHANGE} + , change{change} + , node{node} + { } + + static topology_change new_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::NEW_NODE, ipv4_addr{host.raw_addr(), port}}; + } + + static topology_change removed_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::REMOVED_NODE, ipv4_addr{host.raw_addr(), port}}; + } + + static topology_change moved_node(const gms::inet_address& host, uint16_t port) + { + return topology_change{change_type::MOVED_NODE, ipv4_addr{host.raw_addr(), port}}; + } + #if 0 - public static class TopologyChange extends Event - { - public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE } - - public final Change change; - public final InetSocketAddress node; - - private TopologyChange(Change change, InetSocketAddress node) - { - super(Type.TOPOLOGY_CHANGE); - this.change = change; - this.node = node; - } - - public static TopologyChange newNode(InetAddress host, int port) - { - return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(host, port)); - } - - public static TopologyChange removedNode(InetAddress host, int port) - { - return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port)); - } - - public static TopologyChange movedNode(InetAddress host, int port) - { - return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port)); - } - // Assumes the type has already been deserialized private static TopologyChange deserializeEvent(ByteBuf cb, int version) { @@ -145,32 +149,33 @@ public: return Objects.equal(change, tpc.change) && Objects.equal(node, tpc.node); } - } +#endif + }; - public static class StatusChange extends Event - { - public enum Status { UP, DOWN } + class event::status_change : public event { + public: + enum class status_type { UP, DOWN }; - public final Status status; - public final InetSocketAddress node; + const status_type status; + const ipv4_addr node; - private StatusChange(Status status, InetSocketAddress node) + status_change(status_type status, const ipv4_addr& node) + : event{event_type::STATUS_CHANGE} + , status{status} + , node{node} + { } + + static status_change node_up(const gms::inet_address& host, uint16_t port) { - super(Type.STATUS_CHANGE); - this.status = status; - this.node = node; + return status_change{status_type::UP, ipv4_addr{host.raw_addr(), port}}; } - public static StatusChange nodeUp(InetAddress host, int port) + static status_change node_down(const gms::inet_address& host, uint16_t port) { - return new StatusChange(Status.UP, new InetSocketAddress(host, port)); - } - - public static StatusChange nodeDown(InetAddress host, int port) - { - return new StatusChange(Status.DOWN, new InetSocketAddress(host, port)); + return status_change{status_type::DOWN, ipv4_addr{host.raw_addr(), port}}; } +#if 0 // Assumes the type has already been deserialized private static StatusChange deserializeEvent(ByteBuf cb, int version) { @@ -212,8 +217,8 @@ public: return Objects.equal(status, stc.status) && Objects.equal(node, stc.node); } - } #endif + }; class event::schema_change : public event { public: diff --git a/transport/server.cc b/transport/server.cc index 786162f0b6..3cd5fcf8b3 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -12,6 +12,7 @@ #include #include "service/migration_manager.hh" +#include "service/storage_service.hh" #include "db/consistency_level.hh" #include "core/future-util.hh" #include "core/reactor.hh" @@ -124,6 +125,25 @@ inline int16_t consistency_to_wire(db::consistency_level c) } } +sstring to_string(const transport::event::topology_change::change_type t) { + using type = transport::event::topology_change::change_type; + switch (t) { + case type::NEW_NODE: return "NEW_NODE"; + case type::REMOVED_NODE: return "REMOVED_NODE"; + case type::MOVED_NODE: return "MOVED_NODE"; + } + throw std::invalid_argument("unknown change type"); +} + +sstring to_string(const transport::event::status_change::status_type t) { + using type = transport::event::status_change::status_type; + switch (t) { + case type::UP: return "NEW_NODE"; + case type::DOWN: return "REMOVED_NODE"; + } + throw std::invalid_argument("unknown change type"); +} + sstring to_string(const transport::event::schema_change::change_type t) { switch (t) { case transport::event::schema_change::change_type::CREATED: return "CREATED"; @@ -220,6 +240,8 @@ private: future<> write_ready(int16_t stream); future<> write_supported(int16_t stream); future<> write_result(int16_t stream, shared_ptr msg); + future<> write_topology_change_event(const transport::event::topology_change& event); + future<> write_status_change_event(const transport::event::status_change& event); future<> write_schema_change_event(const transport::event::schema_change& event); future<> write_response(shared_ptr response); @@ -259,15 +281,18 @@ private: friend event_notifier; }; +cql_server::event_notifier::event_notifier(uint16_t port) + : _port{port} +{ +} + void cql_server::event_notifier::register_event(transport::event::event_type et, cql_server::connection* conn) { switch (et) { case transport::event::event_type::TOPOLOGY_CHANGE: - logger.warn("TOPOLOGY_CHANGE events are not supported"); _topology_change_listeners.emplace(conn); break; case transport::event::event_type::STATUS_CHANGE: - logger.warn("STATUS_CHANGE events are not supported"); _status_change_listeners.emplace(conn); break; case transport::event::event_type::SCHEMA_CHANGE: @@ -424,6 +449,46 @@ void cql_server::event_notifier::on_drop_aggregate(const sstring& ks_name, const logger.warn("%s event ignored", __func__); } +void cql_server::event_notifier::on_join_cluster(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::new_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::removed_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_move(const gms::inet_address& endpoint) +{ + for (auto&& conn : _topology_change_listeners) { + using namespace transport; + conn->write_topology_change_event(event::topology_change::moved_node(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_up(const gms::inet_address& endpoint) +{ + for (auto&& conn : _status_change_listeners) { + using namespace transport; + conn->write_status_change_event(event::status_change::node_up(endpoint, _port)); + } +} + +void cql_server::event_notifier::on_down(const gms::inet_address& endpoint) +{ + for (auto&& conn : _status_change_listeners) { + using namespace transport; + conn->write_status_change_event(event::status_change::node_down(endpoint, _port)); + } +} + class cql_server::response { int16_t _stream; cql_binary_opcode _opcode; @@ -460,7 +525,6 @@ cql_server::cql_server(distributed& proxy, distributed(setup_collectd())) - , _notifier{std::make_unique()} { } @@ -487,13 +551,16 @@ cql_server::setup_collectd() { } future<> cql_server::stop() { + service::get_local_storage_service().unregister_subscriber(_notifier.get()); service::get_local_migration_manager().unregister_listener(_notifier.get()); return make_ready_future<>(); } future<> cql_server::listen(ipv4_addr addr) { + _notifier = std::make_unique(addr.port); service::get_local_migration_manager().register_listener(_notifier.get()); + service::get_local_storage_service().register_subscriber(_notifier.get()); listen_options lo; lo.reuse_address = true; _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); @@ -918,6 +985,24 @@ future<> cql_server::connection::write_result(int16_t stream, shared_ptr cql_server::connection::write_topology_change_event(const transport::event::topology_change& event) +{ + auto response = make_shared(-1, cql_binary_opcode::EVENT); + response->write_string("TOPOLOGY_CHANGE"); + response->write_string(to_string(event.change)); + response->write_inet(event.node); + return write_response(response); +} + +future<> cql_server::connection::write_status_change_event(const transport::event::status_change& event) +{ + auto response = make_shared(-1, cql_binary_opcode::EVENT); + response->write_string("STATUS_CHANGE"); + response->write_string(to_string(event.status)); + response->write_inet(event.node); + return write_response(response); +} + future<> cql_server::connection::write_schema_change_event(const transport::event::schema_change& event) { auto response = make_shared(-1, cql_binary_opcode::EVENT); @@ -1207,7 +1292,7 @@ sstring cql_server::response::make_frame(uint8_t version, size_t length) void cql_server::response::write_byte(uint8_t b) { - _body.insert(_body.end(), b, 1); + _body.insert(_body.end(), b); } void cql_server::response::write_int(int32_t n) @@ -1288,8 +1373,12 @@ void cql_server::response::write_option_list(std::vector> 24)); + write_byte(((inet.ip & 0x00ff0000) >> 16)); + write_byte(((inet.ip & 0x0000ff00) >> 8 )); + write_byte(((inet.ip & 0x000000ff) )); + write_int(inet.port); } void cql_server::response::write_consistency(db::consistency_level c) diff --git a/transport/server.hh b/transport/server.hh index f8263de877..19669780b3 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -6,6 +6,7 @@ #define CQL_SERVER_HH #include "core/reactor.hh" +#include "service/endpoint_lifecycle_subscriber.hh" #include "service/migration_listener.hh" #include "service/storage_proxy.hh" #include "cql3/query_processor.hh" @@ -46,11 +47,15 @@ private: friend class type_codec; }; -class cql_server::event_notifier : public service::migration_listener { +class cql_server::event_notifier : public service::migration_listener, + public service::endpoint_lifecycle_subscriber +{ + uint16_t _port; std::set _topology_change_listeners; std::set _status_change_listeners; std::set _schema_change_listeners; public: + event_notifier(uint16_t port); void register_event(transport::event::event_type et, cql_server::connection* conn); void unregister_connection(cql_server::connection* conn); @@ -71,6 +76,12 @@ public: virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override; virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override; virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override; + + virtual void on_join_cluster(const gms::inet_address& endpoint) override; + virtual void on_leave_cluster(const gms::inet_address& endpoint) override; + virtual void on_up(const gms::inet_address& endpoint) override; + virtual void on_down(const gms::inet_address& endpoint) override; + virtual void on_move(const gms::inet_address& endpoint) override; }; #endif