diff --git a/gms/FailureDetector.java b/gms/FailureDetector.java deleted file mode 100644 index ec72379255..0000000000 --- a/gms/FailureDetector.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.gms; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.BoundedStatsDeque; -import org.apache.cassandra.utils.FBUtilities; - -/** - * This FailureDetector is an implementation of the paper titled - * "The Phi Accrual Failure Detector" by Hayashibara. - * Check the paper and the IFailureDetector interface for details. - */ -public class FailureDetector implements IFailureDetector, FailureDetectorMBean -{ - private static final Logger logger = LoggerFactory.getLogger(FailureDetector.class); - public static final String MBEAN_NAME = "org.apache.cassandra.net:type=FailureDetector"; - private static final int SAMPLE_SIZE = 1000; - protected static final long INITIAL_VALUE_NANOS = TimeUnit.NANOSECONDS.convert(getInitialValue(), TimeUnit.MILLISECONDS); - - public static final IFailureDetector instance = new FailureDetector(); - - // this is useless except to provide backwards compatibility in phi_convict_threshold, - // because everyone seems pretty accustomed to the default of 8, and users who have - // already tuned their phi_convict_threshold for their own environments won't need to - // change. - private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434... - - private final Map arrivalSamples = new Hashtable(); - private final List fdEvntListeners = new CopyOnWriteArrayList(); - - public FailureDetector() - { - // Register this instance with JMX - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - private static long getInitialValue() - { - String newvalue = System.getProperty("cassandra.fd_initial_value_ms"); - if (newvalue == null) - { - return Gossiper.intervalInMillis * 2; - } - else - { - logger.info("Overriding FD INITIAL_VALUE to {}ms", newvalue); - return Integer.parseInt(newvalue); - } - } - - public String getAllEndpointStates() - { - StringBuilder sb = new StringBuilder(); - for (Map.Entry entry : Gossiper.instance.endpointStateMap.entrySet()) - { - sb.append(entry.getKey()).append("\n"); - appendEndpointState(sb, entry.getValue()); - } - return sb.toString(); - } - - public Map getSimpleStates() - { - Map nodesStatus = new HashMap(Gossiper.instance.endpointStateMap.size()); - for (Map.Entry entry : Gossiper.instance.endpointStateMap.entrySet()) - { - if (entry.getValue().isAlive()) - nodesStatus.put(entry.getKey().toString(), "UP"); - else - nodesStatus.put(entry.getKey().toString(), "DOWN"); - } - return nodesStatus; - } - - public int getDownEndpointCount() - { - int count = 0; - for (Map.Entry entry : Gossiper.instance.endpointStateMap.entrySet()) - { - if (!entry.getValue().isAlive()) - count++; - } - return count; - } - - public int getUpEndpointCount() - { - int count = 0; - for (Map.Entry entry : Gossiper.instance.endpointStateMap.entrySet()) - { - if (entry.getValue().isAlive()) - count++; - } - return count; - } - - public String getEndpointState(String address) throws UnknownHostException - { - StringBuilder sb = new StringBuilder(); - EndpointState endpointState = Gossiper.instance.getEndpointStateForEndpoint(InetAddress.getByName(address)); - appendEndpointState(sb, endpointState); - return sb.toString(); - } - - private void appendEndpointState(StringBuilder sb, EndpointState endpointState) - { - sb.append(" generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n"); - sb.append(" heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n"); - for (Map.Entry state : endpointState.applicationState.entrySet()) - { - if (state.getKey() == ApplicationState.TOKENS) - continue; - sb.append(" ").append(state.getKey()).append(":").append(state.getValue().value).append("\n"); - } - } - - /** - * Dump the inter arrival times for examination if necessary. - */ - public void dumpInterArrivalTimes() - { - File file = FileUtils.createTempFile("failuredetector-", ".dat"); - - OutputStream os = null; - try - { - os = new BufferedOutputStream(new FileOutputStream(file, true)); - os.write(toString().getBytes()); - } - catch (IOException e) - { - throw new FSWriteError(e, file); - } - finally - { - FileUtils.closeQuietly(os); - } - } - - public void setPhiConvictThreshold(double phi) - { - DatabaseDescriptor.setPhiConvictThreshold(phi); - } - - public double getPhiConvictThreshold() - { - return DatabaseDescriptor.getPhiConvictThreshold(); - } - - public boolean isAlive(InetAddress ep) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - return true; - - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep); - // we could assert not-null, but having isAlive fail screws a node over so badly that - // it's worth being defensive here so minor bugs don't cause disproportionate - // badness. (See CASSANDRA-1463 for an example). - if (epState == null) - logger.error("unknown endpoint {}", ep); - return epState != null && epState.isAlive(); - } - - public void report(InetAddress ep) - { - if (logger.isTraceEnabled()) - logger.trace("reporting {}", ep); - long now = System.nanoTime(); - ArrivalWindow heartbeatWindow = arrivalSamples.get(ep); - if (heartbeatWindow == null) - { - // avoid adding an empty ArrivalWindow to the Map - heartbeatWindow = new ArrivalWindow(SAMPLE_SIZE); - heartbeatWindow.add(now); - arrivalSamples.put(ep, heartbeatWindow); - } - else - { - heartbeatWindow.add(now); - } - } - - public void interpret(InetAddress ep) - { - ArrivalWindow hbWnd = arrivalSamples.get(ep); - if (hbWnd == null) - { - return; - } - long now = System.nanoTime(); - double phi = hbWnd.phi(now); - if (logger.isTraceEnabled()) - logger.trace("PHI for {} : {}", ep, phi); - - if (PHI_FACTOR * phi > getPhiConvictThreshold()) - { - logger.trace("notifying listeners that {} is down", ep); - logger.trace("intervals: {} mean: {}", hbWnd, hbWnd.mean()); - for (IFailureDetectionEventListener listener : fdEvntListeners) - { - listener.convict(ep, phi); - } - } - } - - public void forceConviction(InetAddress ep) - { - logger.debug("Forcing conviction of {}", ep); - for (IFailureDetectionEventListener listener : fdEvntListeners) - { - listener.convict(ep, getPhiConvictThreshold()); - } - } - - public void remove(InetAddress ep) - { - arrivalSamples.remove(ep); - } - - public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) - { - fdEvntListeners.add(listener); - } - - public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) - { - fdEvntListeners.remove(listener); - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - Set eps = arrivalSamples.keySet(); - - sb.append("-----------------------------------------------------------------------"); - for (InetAddress ep : eps) - { - ArrivalWindow hWnd = arrivalSamples.get(ep); - sb.append(ep + " : "); - sb.append(hWnd); - sb.append(System.getProperty("line.separator")); - } - sb.append("-----------------------------------------------------------------------"); - return sb.toString(); - } - - public static void main(String[] args) - { - } -} - -class ArrivalWindow -{ - private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class); - private long tLast = 0L; - private final BoundedStatsDeque arrivalIntervals; - - // this is useless except to provide backwards compatibility in phi_convict_threshold, - // because everyone seems pretty accustomed to the default of 8, and users who have - // already tuned their phi_convict_threshold for their own environments won't need to - // change. - private final double PHI_FACTOR = 1.0 / Math.log(10.0); - - // in the event of a long partition, never record an interval longer than the rpc timeout, - // since if a host is regularly experiencing connectivity problems lasting this long we'd - // rather mark it down quickly instead of adapting - // this value defaults to the same initial value the FD is seeded with - private final long MAX_INTERVAL_IN_NANO = getMaxInterval(); - - ArrivalWindow(int size) - { - arrivalIntervals = new BoundedStatsDeque(size); - } - - private static long getMaxInterval() - { - String newvalue = System.getProperty("cassandra.fd_max_interval_ms"); - if (newvalue == null) - { - return FailureDetector.INITIAL_VALUE_NANOS; - } - else - { - logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue); - return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS); - } - } - - synchronized void add(long value) - { - assert tLast >= 0; - if (tLast > 0L) - { - long interArrivalTime = (value - tLast); - if (interArrivalTime <= MAX_INTERVAL_IN_NANO) - arrivalIntervals.add(interArrivalTime); - else - logger.debug("Ignoring interval time of {}", interArrivalTime); - } - else - { - // We use a very large initial interval since the "right" average depends on the cluster size - // and it's better to err high (false negatives, which will be corrected by waiting a bit longer) - // than low (false positives, which cause "flapping"). - arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS); - } - tLast = value; - } - - double mean() - { - return arrivalIntervals.mean(); - } - - // see CASSANDRA-2597 for an explanation of the math at work here. - double phi(long tnow) - { - assert arrivalIntervals.size() > 0 && tLast > 0; // should not be called before any samples arrive - long t = tnow - tLast; - return t / mean(); - } - - public String toString() - { - return StringUtils.join(arrivalIntervals.iterator(), " "); - } -} - diff --git a/gms/failure_detector.hh b/gms/failure_detector.hh new file mode 100644 index 0000000000..55a75077a4 --- /dev/null +++ b/gms/failure_detector.hh @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "unimplemented.hh" +#include "db_clock.hh" +#include "gms/inet_address.hh" +#include "gms/i_failure_detector.hh" +#include "core/sstring.hh" +#include "core/shared_ptr.hh" +#include "gms/gossiper.hh" +#include "utils/bounded_stats_deque.hh" +#include +#include +#include + +namespace gms { + +class failure_detector_helper { +public: + static long get_initial_value() { +#if 0 + String newvalue = System.getProperty("cassandra.fd_initial_value_ms"); + if (newvalue == null) + { + return Gossiper.intervalInMillis * 2; + } + else + { + logger.info("Overriding FD INITIAL_VALUE to {}ms", newvalue); + return Integer.parseInt(newvalue); + } +#endif + warn(unimplemented::cause::GOSSIP); + return 1000 * 2; + } + + static long INITIAL_VALUE_NANOS() { + // Convert from milliseconds to nanoseconds + return get_initial_value() * 1000; + } +}; + +class arrival_window { +private: + long _tlast = 0; + utils::bounded_stats_deque _arrival_intervals; + + // this is useless except to provide backwards compatibility in phi_convict_threshold, + // because everyone seems pretty accustomed to the default of 8, and users who have + // already tuned their phi_convict_threshold for their own environments won't need to + // change. + static constexpr const double PHI_FACTOR{1.0 / std::log(10.0)}; + + // in the event of a long partition, never record an interval longer than the rpc timeout, + // since if a host is regularly experiencing connectivity problems lasting this long we'd + // rather mark it down quickly instead of adapting + // this value defaults to the same initial value the FD is seeded with + long MAX_INTERVAL_IN_NANO = get_max_interval(); + +public: + arrival_window(int size) + : _arrival_intervals(size) { + } + + static long get_max_interval() { +#if 0 + sstring newvalue = System.getProperty("cassandra.fd_max_interval_ms"); + if (newvalue == null) + { + return failure_detector.INITIAL_VALUE_NANOS; + } + else + { + logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue); + return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS); + } +#endif + warn(unimplemented::cause::GOSSIP); + return failure_detector_helper::INITIAL_VALUE_NANOS(); + } + + void add(long value) { + assert(_tlast >= 0); + if (_tlast > 0L) { + long inter_arrival_time = value - _tlast; + if (inter_arrival_time <= MAX_INTERVAL_IN_NANO) { + _arrival_intervals.add(inter_arrival_time); + } else { + //logger.debug("Ignoring interval time of {}", interArrivalTime); + } + } else { + // We use a very large initial interval since the "right" average depends on the cluster size + // and it's better to err high (false negatives, which will be corrected by waiting a bit longer) + // than low (false positives, which cause "flapping"). + _arrival_intervals.add(failure_detector_helper::INITIAL_VALUE_NANOS()); + } + _tlast = value; + } + + double mean() { + return _arrival_intervals.mean(); + } + + // see CASSANDRA-2597 for an explanation of the math at work here. + double phi(long tnow) { + assert(_arrival_intervals.size() > 0 && _tlast > 0); // should not be called before any samples arrive + long t = tnow - _tlast; + return t / mean(); + } + + friend inline std::ostream& operator<<(std::ostream& os, const arrival_window& w) { + for (auto& x : w._arrival_intervals.deque()) { + os << x << " "; + } + return os; + } + +}; + + +/** + * This FailureDetector is an implementation of the paper titled + * "The Phi Accrual Failure Detector" by Hayashibara. + * Check the paper and the IFailureDetector interface for details. + */ +class failure_detector : public i_failure_detector { +private: + static constexpr const int SAMPLE_SIZE = 1000; + // this is useless except to provide backwards compatibility in phi_convict_threshold, + // because everyone seems pretty accustomed to the default of 8, and users who have + // already tuned their phi_convict_threshold for their own environments won't need to + // change. + static constexpr const double PHI_FACTOR{1.0 / std::log(10.0)}; // 0.434... + std::map _arrival_samples; + std::list> _fd_evnt_listeners; + +public: + failure_detector() { + } + + sstring get_all_endpoint_states() { + std::stringstream ss; + for (auto& entry : the_gossiper().endpoint_state_map) { + auto& ep = entry.first; + auto& state = entry.second; + ss << ep << "\n"; + append_endpoint_state(ss, state); + } + return sstring(ss.str()); + } + + std::map get_simple_states() { + std::map nodes_status; + for (auto& entry : the_gossiper().endpoint_state_map) { + auto& ep = entry.first; + auto& state = entry.second; + std::stringstream ss; + ss << ep; + if (state.is_alive()) + nodes_status.emplace(sstring(ss.str()), "UP"); + else + nodes_status.emplace(sstring(ss.str()), "DOWN"); + } + return nodes_status; + } + + int get_down_endpoint_count() { + int count = 0; + for (auto& entry : the_gossiper().endpoint_state_map) { + auto& state = entry.second; + if (!state.is_alive()) { + count++; + } + } + return count; + } + + int get_up_endpoint_count() { + int count = 0; + for (auto& entry : the_gossiper().endpoint_state_map) { + auto& state = entry.second; + if (state.is_alive()) { + count++; + } + } + return count; + } + + sstring get_endpoint_state(sstring address) { + std::stringstream ss; + auto eps = the_gossiper().get_endpoint_state_for_endpoint(inet_address(address)); + if (eps) { + append_endpoint_state(ss, *eps); + return sstring(ss.str()); + } else { + return sstring("unknown endpoint ") + address; + } + } + +private: + void append_endpoint_state(std::stringstream& ss, endpoint_state& state) { + ss << " generation:" << state.get_heart_beat_state().get_generation() << "\n"; + ss << " heartbeat:" << state.get_heart_beat_state().get_heart_beat_version() << "\n"; + for (auto& entry : state.get_application_state_map()) { + auto& app_state = entry.first; + auto& value = entry.second; + if (app_state == application_state::TOKENS) { + continue; + } + // FIXME: Add operator<< for application_state + ss << " " << int32_t(app_state) << ":" << value.value << "\n"; + } + } + +public: + /** + * Dump the inter arrival times for examination if necessary. + */ +#if 0 + void dumpInterArrivalTimes() { + File file = FileUtils.createTempFile("failuredetector-", ".dat"); + + OutputStream os = null; + try + { + os = new BufferedOutputStream(new FileOutputStream(file, true)); + os.write(toString().getBytes()); + } + catch (IOException e) + { + throw new FSWriteError(e, file); + } + finally + { + FileUtils.closeQuietly(os); + } + } +#endif + + void set_phi_convict_threshold(double phi) { + // FIXME + // DatabaseDescriptor.setPhiConvictThreshold(phi); + } + + double get_phi_convict_threshold() { + // FIXME + // return DatabaseDescriptor.getPhiConvictThreshold(); + fail(unimplemented::cause::GOSSIP); + return 0; + } + + + bool is_alive(inet_address ep) { + if (ep.is_broadcast_address()) { + return true; + } + + auto eps = the_gossiper().get_endpoint_state_for_endpoint(ep); + // we could assert not-null, but having isAlive fail screws a node over so badly that + // it's worth being defensive here so minor bugs don't cause disproportionate + // badness. (See CASSANDRA-1463 for an example). + if (eps) { + return eps->is_alive(); + } else { + // logger.error("unknown endpoint {}", ep); + return false; + } + } + + void report(inet_address ep) { + // if (logger.isTraceEnabled()) + // logger.trace("reporting {}", ep); + long now = db_clock::now().time_since_epoch().count(); + auto it = _arrival_samples.find(ep); + if (it == _arrival_samples.end()) { + // avoid adding an empty ArrivalWindow to the Map + auto heartbeat_window = arrival_window(SAMPLE_SIZE); + heartbeat_window.add(now); + _arrival_samples.emplace(ep, heartbeat_window); + } else { + it->second.add(now); + } + } + + void interpret(inet_address ep) { + auto it = _arrival_samples.find(ep); + if (it == _arrival_samples.end()) { + return; + } + arrival_window& hb_wnd = it->second; + long now = db_clock::now().time_since_epoch().count(); + double phi = hb_wnd.phi(now); + // if (logger.isTraceEnabled()) + // logger.trace("PHI for {} : {}", ep, phi); + + if (PHI_FACTOR * phi > get_phi_convict_threshold()) { + // logger.trace("notifying listeners that {} is down", ep); + // logger.trace("intervals: {} mean: {}", hb_wnd, hb_wnd.mean()); + for (auto& listener : _fd_evnt_listeners) { + listener->convict(ep, phi); + } + } + } + + void force_conviction(inet_address ep) { + //logger.debug("Forcing conviction of {}", ep); + for (auto& listener : _fd_evnt_listeners) { + listener->convict(ep, get_phi_convict_threshold()); + } + } + + void remove(inet_address ep) { + _arrival_samples.erase(ep); + } + + void register_failure_detection_event_listener(shared_ptr listener) { + _fd_evnt_listeners.push_back(std::move(listener)); + } + + void unregister_failure_detection_event_listener(shared_ptr listener) { + _fd_evnt_listeners.remove(listener); + } + + friend inline std::ostream& operator<<(std::ostream& os, const failure_detector& x) { + os << "-----------------------------------------------------------------------"; + for (auto& entry : x._arrival_samples) { + const inet_address& ep = entry.first; + const arrival_window& win = entry.second; + os << ep << " : " << win << "\n"; + } + os << "-----------------------------------------------------------------------"; + return os; + } +}; + +} // namespace gms diff --git a/gms/gms.cc b/gms/gms.cc index ac0d64a581..b503b5fc19 100644 --- a/gms/gms.cc +++ b/gms/gms.cc @@ -17,6 +17,7 @@ #include "gms/token_serializer.hh" #include "gms/i_endpoint_state_change_subscriber.hh" #include "gms/i_failure_detection_event_listener.hh" +#include "gms/failure_detector.hh" namespace gms { gossiper _the_gossiper;