diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index c02340a56d..95e2cffb03 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -27,6 +27,7 @@ #include "gms/application_state.hh" #include "gms/versioned_value.hh" #include "db_clock.hh" +#include namespace gms { @@ -42,6 +43,12 @@ private: db_clock::time_point _update_timestamp; bool _is_alive; public: + endpoint_state() + : _heart_beat_state(0) + , _update_timestamp(db_clock::now()) + , _is_alive(true) { + } + endpoint_state(heart_beat_state initial_hb_state) : _heart_beat_state(initial_hb_state) , _update_timestamp(db_clock::now()) @@ -57,15 +64,20 @@ public: _heart_beat_state = hbs; } - versioned_value getapplication_state(application_state key) { - return _application_state.at(key); + std::experimental::optional get_application_state(application_state key) { + auto it = _application_state.find(key); + if (it == _application_state.end()) { + return {}; + } else { + return _application_state.at(key); + } } /** * TODO replace this with operations that don't expose private state */ // @Deprecated - std::map get_application_state_map() { + std::map& get_application_state_map() { return _application_state; } diff --git a/gms/failure_detector.hh b/gms/failure_detector.hh index 55a75077a4..5756f67dff 100644 --- a/gms/failure_detector.hh +++ b/gms/failure_detector.hh @@ -27,6 +27,7 @@ #include "gms/i_failure_detector.hh" #include "core/sstring.hh" #include "core/shared_ptr.hh" +#include "core/distributed.hh" #include "gms/gossiper.hh" #include "utils/bounded_stats_deque.hh" #include @@ -160,7 +161,7 @@ public: sstring get_all_endpoint_states() { std::stringstream ss; - for (auto& entry : the_gossiper().endpoint_state_map) { + for (auto& entry : get_local_gossiper().endpoint_state_map) { auto& ep = entry.first; auto& state = entry.second; ss << ep << "\n"; @@ -171,7 +172,7 @@ public: std::map get_simple_states() { std::map nodes_status; - for (auto& entry : the_gossiper().endpoint_state_map) { + for (auto& entry : get_local_gossiper().endpoint_state_map) { auto& ep = entry.first; auto& state = entry.second; std::stringstream ss; @@ -186,7 +187,7 @@ public: int get_down_endpoint_count() { int count = 0; - for (auto& entry : the_gossiper().endpoint_state_map) { + for (auto& entry : get_local_gossiper().endpoint_state_map) { auto& state = entry.second; if (!state.is_alive()) { count++; @@ -197,7 +198,7 @@ public: int get_up_endpoint_count() { int count = 0; - for (auto& entry : the_gossiper().endpoint_state_map) { + for (auto& entry : get_local_gossiper().endpoint_state_map) { auto& state = entry.second; if (state.is_alive()) { count++; @@ -208,7 +209,7 @@ public: sstring get_endpoint_state(sstring address) { std::stringstream ss; - auto eps = the_gossiper().get_endpoint_state_for_endpoint(inet_address(address)); + auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(inet_address(address)); if (eps) { append_endpoint_state(ss, *eps); return sstring(ss.str()); @@ -275,7 +276,7 @@ public: return true; } - auto eps = the_gossiper().get_endpoint_state_for_endpoint(ep); + auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(ep); // we could assert not-null, but having isAlive fail screws a node over so badly that // it's worth being defensive here so minor bugs don't cause disproportionate // badness. (See CASSANDRA-1463 for an example). @@ -353,4 +354,10 @@ public: } }; +extern distributed _the_failure_detector; +inline failure_detector& get_local_failure_detector() { + assert(engine().cpu_id() == 0); + return _the_failure_detector.local(); +} + } // namespace gms diff --git a/gms/gms.cc b/gms/gms.cc index b503b5fc19..125c8a380d 100644 --- a/gms/gms.cc +++ b/gms/gms.cc @@ -19,6 +19,8 @@ #include "gms/i_failure_detection_event_listener.hh" #include "gms/failure_detector.hh" +#include "core/distributed.hh" namespace gms { - gossiper _the_gossiper; + distributed _the_gossiper; + distributed _the_failure_detector; } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 927540eb8a..d899e8c2e1 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -21,33 +21,33 @@ #pragma once +#include "core/distributed.hh" +#include "core/shared_ptr.hh" +#include "core/print.hh" +#include "unimplemented.hh" #include "gms/inet_address.hh" #include "gms/endpoint_state.hh" +#include "gms/i_failure_detection_event_listener.hh" +#include "gms/i_endpoint_state_change_subscriber.hh" +#include "gms/i_failure_detector.hh" +#include "gms/gossip_digest.hh" +#include "utils/UUID.hh" +#include "gms/gossip_digest_syn.hh" +#include "gms/versioned_value.hh" + +#include #include +#include namespace gms { -class gossiper { -public: - std::map endpoint_state_map; - std::experimental::optional get_endpoint_state_for_endpoint(inet_address ep) { - auto it = endpoint_state_map.find(ep); - if (it == endpoint_state_map.end()) { - return {}; - } else { - return it->second; - } - } +// FIXME: Stub +template +class MessageOut { + T _msg; }; -extern gossiper _the_gossiper; - -inline gossiper& the_gossiper() { - return _the_gossiper; -} - -#if 0 /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -60,65 +60,60 @@ inline gossiper& the_gossiper() { * Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in * the Failure Detector. */ +class gossiper : public i_failure_detection_event_listener { +private: + inet_address get_broadcast_address() { + // FIXME: Helper for FBUtilities.getBroadcastAddress + return inet_address(0xffffff); + } +public: + static int64_t now_millis() { + return db_clock::now().time_since_epoch().count(); + } + static int64_t now_nanos() { + return now_millis() * 1000; + } +public: + /* map where key is the endpoint and value is the state associated with the endpoint */ + std::map endpoint_state_map; -public class Gossiper implements IFailureDetectionEventListener, GossiperMBean -{ - public static final String MBEAN_NAME = "org.apache.cassandra.net:type=Gossiper"; - - private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks"); - - static final ApplicationState[] STATES = ApplicationState.values(); - static final List DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, - VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE); - - private ScheduledFuture scheduledGossipTask; - private static final ReentrantLock taskLock = new ReentrantLock(); - public final static int intervalInMillis = 1000; - public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2; - private static final Logger logger = LoggerFactory.getLogger(Gossiper.class); - public static final Gossiper instance = new Gossiper(); - - public static final long aVeryLongTime = 259200 * 1000; // 3 days + const std::vector DEAD_STATES = { versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, + versioned_value::STATUS_LEFT, versioned_value::HIBERNATE }; + static constexpr const int INTERVAL_IN_MILLIS = 1000; + // FIXME: Define StorageService.RING_DELAY -> cassandra.ring_delay_ms + static constexpr const int QUARANTINE_DELAY = (30 * 1000) * 2; // StorageService.RING_DELAY * 2; + static constexpr const int64_t A_VERY_LONG_TIME = 259200 * 1000; // 3 days in milliseconds /** Maximimum difference in generation and version values we are willing to accept about a peer */ - private static final long MAX_GENERATION_DIFFERENCE = 86400 * 365; - private long FatClientTimeout; - private final Random random = new Random(); - private final Comparator inetcomparator = new Comparator() - { - public int compare(InetAddress addr1, InetAddress addr2) - { - return addr1.getHostAddress().compareTo(addr2.getHostAddress()); - } - }; - + static constexpr const int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365; + int64_t fat_client_timeout; +private: + std::random_device _random; /* subscribers for interest in EndpointState change */ - private final List subscribers = new CopyOnWriteArrayList(); + std::list> _subscribers; /* live member set */ - private final Set liveEndpoints = new ConcurrentSkipListSet(inetcomparator); + std::set _live_endpoints; /* unreachable member set */ - private final Map unreachableEndpoints = new ConcurrentHashMap(); + std::map _unreachable_endpoints; /* initial seeds for joining the cluster */ - private final Set seeds = new ConcurrentSkipListSet(inetcomparator); - - /* map where key is the endpoint and value is the state associated with the endpoint */ - final ConcurrentMap endpointStateMap = new ConcurrentHashMap(); + std::set _seeds; /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal * gossip gets propagated to all nodes */ - private final Map justRemovedEndpoints = new ConcurrentHashMap(); + std::map _just_removed_endpoints; - private final Map expireTimeEndpointMap = new ConcurrentHashMap(); + std::map _expire_time_endpoint_map; - private boolean inShadowRound = false; + bool _in_shadow_round = false; - private volatile long lastProcessedMessageAt = System.currentTimeMillis(); + int64_t _last_processed_message_at = now_millis(); +#if 0 private class GossipTask implements Runnable { public void run() @@ -131,25 +126,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean taskLock.lock(); /* Update the local heartbeat counter. */ - endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); + endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().updateHeartBeat(); if (logger.isTraceEnabled()) - logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion()); - final List gDigests = new ArrayList(); - Gossiper.instance.makeRandomGossipDigest(gDigests); + logger.trace("My heartbeat is now {}", endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().get_heart_beat_version()); + final List g_digests = new ArrayList(); + Gossiper.instance.make_random_gossip_digest(g_digests); - if (gDigests.size() > 0) + if (g_digests.size() > 0) { GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), - gDigests); + g_digests); MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); /* Gossip to some random live member */ - boolean gossipedToSeed = doGossipToLiveMember(message); + boolean gossipedToSeed = do_gossip_to_live_member(message); /* Gossip to some unreachable member with some probability to check if he is back up */ - doGossipToUnreachableMember(message); + do_gossip_to_unreachable_member(message); /* Gossip to a seed if we did not do so above, or we have seen less nodes than there are seeds. This prevents partitions where each group of nodes @@ -167,10 +162,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean gossipedToSeed check. See CASSANDRA-150 for more exposition. */ - if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) - doGossipToSeed(message); + if (!gossipedToSeed || _live_endpoints.size() < _seeds.size()) + do_gossip_to_seed(message); - doStatusCheck(); + do_status_check(); } } catch (Exception e) @@ -184,45 +179,34 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } } +#endif - private Gossiper() - { - // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip - FatClientTimeout = (long) (QUARANTINE_DELAY / 2); +public: + gossiper() { + // half of QUARATINE_DELAY, to ensure _just_removed_endpoints has enough leeway to prevent re-gossip + fat_client_timeout = (int64_t) (QUARANTINE_DELAY / 2); /* register with the Failure Detector for receiving Failure detector events */ - FailureDetector.instance.registerFailureDetectionEventListener(this); + // FIXME: FailureDetector includes Gossiper!!! + // + //FailureDetector.instance.registerFailureDetectionEventListener(this); + fail(unimplemented::cause::GOSSIP); // Register this instance with JMX - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } } - public void setLastProcessedMessageAt(long timeInMillis) - { - this.lastProcessedMessageAt = timeInMillis; + void set_last_processed_message_at(int64_t time_in_millis) { + _last_processed_message_at = time_in_millis; } - public boolean seenAnySeed() - { - for (Map.Entry entry : endpointStateMap.entrySet()) - { - if (seeds.contains(entry.getKey())) + bool seen_any_seed() { + for (auto& entry : endpoint_state_map) { + if (_seeds.count(entry.first)) { return true; - try - { - if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value))) - return true; } - catch (UnknownHostException e) - { - throw new RuntimeException(e); + auto& state = entry.second; + if (state.get_application_state_map().count(application_state::INTERNAL_IP) && + _seeds.count(inet_address(state.get_application_state(application_state::INTERNAL_IP)->value))) { + return true; } } return false; @@ -233,9 +217,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param subscriber module which implements the IEndpointStateChangeSubscriber */ - public void register(IEndpointStateChangeSubscriber subscriber) - { - subscribers.add(subscriber); + void register_(shared_ptr subscriber) { + _subscribers.push_back(std::move(subscriber)); } /** @@ -243,61 +226,65 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param subscriber module which implements the IEndpointStateChangeSubscriber */ - public void unregister(IEndpointStateChangeSubscriber subscriber) - { - subscribers.remove(subscriber); + void unregister_(shared_ptr subscriber) { + _subscribers.remove(subscriber); } - public Set getLiveMembers() - { - Set liveMembers = new HashSet(liveEndpoints); - if (!liveMembers.contains(FBUtilities.getBroadcastAddress())) - liveMembers.add(FBUtilities.getBroadcastAddress()); - return liveMembers; - } - - public Set getLiveTokenOwners() - { - Set tokenOwners = new HashSet(); - for (InetAddress member : getLiveMembers()) - { - EndpointState epState = endpointStateMap.get(member); - if (epState != null && !isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(member)) - tokenOwners.add(member); + std::set get_live_members() { + std::set live_members(_live_endpoints); + if (!live_members.count(get_broadcast_address())) { + live_members.insert(get_broadcast_address()); } - return tokenOwners; + return live_members; + } + + std::set get_live_token_owners() { + std::set token_owners; + for (auto& member : get_live_members()) { + auto it = endpoint_state_map.find(member); + // FIXME: StorageService.instance.getTokenMetadata + if (it != endpoint_state_map.end() && !is_dead_state(it->second) /* && StorageService.instance.getTokenMetadata().isMember(member) */) { + token_owners.insert(member); + } + fail(unimplemented::cause::GOSSIP); + } + return token_owners; } /** * @return a list of unreachable gossip participants, including fat clients */ - public Set getUnreachableMembers() - { - return unreachableEndpoints.keySet(); + std::set get_unreachable_members() { + std::set ret; + for (auto&& x : _unreachable_endpoints) { + ret.insert(x.first); + } + return ret; } /** * @return a list of unreachable token owners */ - public Set getUnreachableTokenOwners() - { - Set tokenOwners = new HashSet<>(); - for (InetAddress endpoint : unreachableEndpoints.keySet()) - { - if (StorageService.instance.getTokenMetadata().isMember(endpoint)) - tokenOwners.add(endpoint); + std::set get_unreachable_token_owners() { + std::set token_owners; + for (auto&& x : _unreachable_endpoints) { + auto& endpoint = x.first; + fail(unimplemented::cause::GOSSIP); + if (true /* StorageService.instance.getTokenMetadata().isMember(endpoint) */) { + token_owners.insert(endpoint); + } } - - return tokenOwners; + return token_owners; } - public long getEndpointDowntime(InetAddress ep) - { - Long downtime = unreachableEndpoints.get(ep); - if (downtime != null) - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - downtime); - else + int64_t get_endpoint_downtime(inet_address ep) { + auto it = _unreachable_endpoints.find(ep); + if (it != _unreachable_endpoints.end()) { + auto& downtime = it->second; + return (now_nanos() - downtime) / 1000; + } else { return 0L; + } } /** @@ -306,105 +293,107 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint end point that is convicted. */ - public void convict(InetAddress endpoint, double phi) - { - EndpointState epState = endpointStateMap.get(endpoint); - if (epState == null) + virtual void convict(inet_address endpoint, double phi) override { + auto it = endpoint_state_map.find(endpoint); + if (it == endpoint_state_map.end()) { return; - if (epState.isAlive() && !isDeadState(epState)) - { - markDead(endpoint, epState); } - else - epState.markDead(); + auto& state = it->second; + if (state.is_alive() && is_dead_state(state)) { + mark_dead(endpoint, state); + } else { + state.mark_dead(); + } } + /** * Return either: the greatest heartbeat or application state * - * @param epState + * @param ep_state * @return */ - int getMaxEndpointStateVersion(EndpointState epState) - { - int maxVersion = epState.getHeartBeatState().getHeartBeatVersion(); - for (VersionedValue value : epState.getApplicationStateMap().values()) - maxVersion = Math.max(maxVersion, value.version); - return maxVersion; + int get_max_endpoint_state_version(endpoint_state state) { + int max_version = state.get_heart_beat_state().get_heart_beat_version(); + for (auto& entry : state.get_application_state_map()) { + auto& value = entry.second; + max_version = std::max(max_version, value.version); + } + return max_version; } + +private: /** * Removes the endpoint from gossip completely * * @param endpoint endpoint to be removed from the current membership. */ - private void evictFromMembership(InetAddress endpoint) - { - unreachableEndpoints.remove(endpoint); - endpointStateMap.remove(endpoint); - expireTimeEndpointMap.remove(endpoint); - quarantineEndpoint(endpoint); - if (logger.isDebugEnabled()) - logger.debug("evicting {} from gossip", endpoint); + void evict_from_membershipg(inet_address endpoint) { + _unreachable_endpoints.erase(endpoint); + endpoint_state_map.erase(endpoint); + _expire_time_endpoint_map.erase(endpoint); + quarantine_endpoint(endpoint); + // if (logger.isDebugEnabled()) + // logger.debug("evicting {} from gossip", endpoint); } - +public: /** * Removes the endpoint from Gossip but retains endpoint state */ - public void removeEndpoint(InetAddress endpoint) - { + void remove_endpoint(inet_address endpoint) { // do subscribers first so anything in the subscriber that depends on gossiper state won't get confused - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onRemove(endpoint); - - if(seeds.contains(endpoint)) - { - buildSeedsList(); - seeds.remove(endpoint); - logger.info("removed {} from seeds, updated seeds list = {}", endpoint, seeds); + for (shared_ptr& subscriber : _subscribers) { + subscriber->on_remove(endpoint); } - liveEndpoints.remove(endpoint); - unreachableEndpoints.remove(endpoint); - // do not remove endpointState until the quarantine expires - FailureDetector.instance.remove(endpoint); - MessagingService.instance().resetVersion(endpoint); - quarantineEndpoint(endpoint); - MessagingService.instance().destroyConnectionPool(endpoint); - if (logger.isDebugEnabled()) - logger.debug("removing endpoint {}", endpoint); - } + if(_seeds.count(endpoint)) { + build_seeds_list(); + _seeds.erase(endpoint); + //logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds); + } + _live_endpoints.erase(endpoint); + _unreachable_endpoints.erase(endpoint); + // do not remove endpointState until the quarantine expires + // FIXME + //FailureDetector.instance.remove(endpoint); + //MessagingService.instance().resetVersion(endpoint); + fail(unimplemented::cause::GOSSIP); + quarantine_endpoint(endpoint); + //MessagingService.instance().destroyConnectionPool(endpoint); + // if (logger.isDebugEnabled()) + // logger.debug("removing endpoint {}", endpoint); + } +private: /** * Quarantines the endpoint for QUARANTINE_DELAY * * @param endpoint */ - private void quarantineEndpoint(InetAddress endpoint) - { - quarantineEndpoint(endpoint, System.currentTimeMillis()); + void quarantine_endpoint(inet_address endpoint) { + quarantine_endpoint(endpoint, now_millis()); } /** - * Quarantines the endpoint until quarantineExpiration + QUARANTINE_DELAY + * Quarantines the endpoint until quarantine_expiration + QUARANTINE_DELAY * * @param endpoint - * @param quarantineExpiration + * @param quarantine_expiration */ - private void quarantineEndpoint(InetAddress endpoint, long quarantineExpiration) - { - justRemovedEndpoints.put(endpoint, quarantineExpiration); + void quarantine_endpoint(inet_address endpoint, int64_t quarantine_expiration) { + _just_removed_endpoints[endpoint] = quarantine_expiration; } +public: /** * Quarantine endpoint specifically for replacement purposes. * @param endpoint */ - public void replacementQuarantine(InetAddress endpoint) - { - // remember, quarantineEndpoint will effectively already add QUARANTINE_DELAY, so this is 2x - logger.debug(""); - quarantineEndpoint(endpoint, System.currentTimeMillis() + QUARANTINE_DELAY); + void replacement_quarantine(inet_address endpoint) { + // remember, quarantine_endpoint will effectively already add QUARANTINE_DELAY, so this is 2x + // logger.debug(""); + quarantine_endpoint(endpoint, now_millis() + QUARANTINE_DELAY); } /** @@ -413,105 +402,111 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint The endpoint that has been replaced */ - public void replacedEndpoint(InetAddress endpoint) - { - removeEndpoint(endpoint); - evictFromMembership(endpoint); - replacementQuarantine(endpoint); + void replaced_endpoint(inet_address endpoint) { + remove_endpoint(endpoint); + evict_from_membershipg(endpoint); + replacement_quarantine(endpoint); } +private: /** * The gossip digest is built based on randomization * rather than just looping through the collection of live endpoints. * - * @param gDigests list of Gossip Digests. + * @param g_digests list of Gossip Digests. */ - private void makeRandomGossipDigest(List gDigests) - { - EndpointState epState; + void make_random_gossip_digest(std::list& g_digests) { int generation = 0; - int maxVersion = 0; + int max_version = 0; - // local epstate will be part of endpointStateMap - List endpoints = new ArrayList(endpointStateMap.keySet()); - Collections.shuffle(endpoints, random); - for (InetAddress endpoint : endpoints) - { - epState = endpointStateMap.get(endpoint); - if (epState != null) - { - generation = epState.getHeartBeatState().getGeneration(); - maxVersion = getMaxEndpointStateVersion(epState); - } - gDigests.add(new GossipDigest(endpoint, generation, maxVersion)); + // local epstate will be part of endpoint_state_map + std::vector endpoints; + for (auto&& x : endpoint_state_map) { + endpoints.push_back(x.first); } - - if (logger.isTraceEnabled()) - { + std::random_shuffle(endpoints.begin(), endpoints.end()); + for (auto& endpoint : endpoints) { + auto it = endpoint_state_map.find(endpoint); + if (it != endpoint_state_map.end()) { + auto& eps = it->second; + generation = eps.get_heart_beat_state().get_generation(); + max_version = get_max_endpoint_state_version(eps); + } + g_digests.push_back(gossip_digest(endpoint, generation, max_version)); + } +#if 0 + if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); - for (GossipDigest gDigest : gDigests) + for (GossipDigest g_digest : g_digests) { - sb.append(gDigest); + sb.append(g_digest); sb.append(" "); } logger.trace("Gossip Digests are : {}", sb); } +#endif } +public: /** * This method will begin removing an existing endpoint from the cluster by spoofing its state * This should never be called unless this coordinator has had 'removenode' invoked * * @param endpoint - the endpoint being removed - * @param hostId - the ID of the host being removed - * @param localHostId - my own host ID for replication coordination + * @param host_id - the ID of the host being removed + * @param local_host_id - my own host ID for replication coordination */ - public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID localHostId) - { - EndpointState epState = endpointStateMap.get(endpoint); + void advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) { + auto& state = endpoint_state_map.at(endpoint); // remember this node's generation - int generation = epState.getHeartBeatState().getGeneration(); - logger.info("Removing host: {}", hostId); - logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint); - Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); + int generation = state.get_heart_beat_state().get_generation(); + // logger.info("Removing host: {}", host_id); + // logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint); + // FIXME: sleep + fail(unimplemented::cause::GOSSIP); + // Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); // make sure it did not change - epState = endpointStateMap.get(endpoint); - if (epState.getHeartBeatState().getGeneration() != generation) - throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it"); + auto& eps = endpoint_state_map.at(endpoint); + if (eps.get_heart_beat_state().get_generation() != generation) { + throw std::runtime_error(sprint("Endpoint %s generation changed while trying to remove it", endpoint)); + } + // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - epState.updateTimestamp(); // make sure we don't evict it too soon - epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); - epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); - endpointStateMap.put(endpoint, epState); + //logger.info("Advertising removal for {}", endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + // FIXME: StorageService.instance.valueFactory + // eps.add_application_state(application_state::STATUS, StorageService.instance.valueFactory.removingNonlocal(host_id)); + // eps.add_application_state(application_state::REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(local_host_id)); + endpoint_state_map[endpoint] = eps; } /** * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN - * This should only be called after advertiseRemoving + * This should only be called after advertise_removing * * @param endpoint - * @param hostId + * @param host_id */ - public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId) - { - EndpointState epState = endpointStateMap.get(endpoint); - epState.updateTimestamp(); // make sure we don't evict it too soon - epState.getHeartBeatState().forceNewerGenerationUnsafe(); - long expireTime = computeExpireTime(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime)); - logger.info("Completing removal of {}", endpoint); - addExpireTimeForEndpoint(endpoint, expireTime); - endpointStateMap.put(endpoint, epState); + void advertise_token_removed(inet_address endpoint, utils::UUID host_id) { + auto& eps = endpoint_state_map.at(endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + int64_t expire_time = compute_expire_time(); + // FIXME: StorageService.instance.valueFactory.removedNonlocal + // eps.add_application_state(application_state::STATUS, StorageService.instance.valueFactory.removedNonlocal(host_id, expire_time)); + //logger.info("Completing removal of {}", endpoint); + add_expire_time_for_endpoint(endpoint, expire_time); + endpoint_state_map[endpoint] = eps; // ensure at least one gossip round occurs before returning - Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS); + // FIXME: sleep + //Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); + fail(unimplemented::cause::GOSSIP); } - public void unsafeAssassinateEndpoint(String address) throws UnknownHostException - { - logger.warn("Gossiper.unsafeAssassinateEndpoint is deprecated and will be removed in the next release; use assassinateEndpoint instead"); - assassinateEndpoint(address); + void unsafe_assassinate_endpoint(sstring address) { + //logger.warn("Gossiper.unsafeAssassinateEndpoint is deprecated and will be removed in the next release; use assassinate_endpoint instead"); + assassinate_endpoint(address); } /** @@ -522,63 +517,64 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param address * @throws UnknownHostException */ - public void assassinateEndpoint(String address) throws UnknownHostException - { - InetAddress endpoint = InetAddress.getByName(address); - EndpointState epState = endpointStateMap.get(endpoint); - Collection tokens = null; - logger.warn("Assassinating {} via gossip", endpoint); - - if (epState == null) - { - epState = new EndpointState(new HeartBeatState((int) ((System.currentTimeMillis() + 60000) / 1000), 9999)); - } - else - { - try - { + void assassinate_endpoint(sstring address) { + inet_address endpoint(address); + auto is_exist = endpoint_state_map.count(endpoint); + endpoint_state&& ep_state = is_exist ? endpoint_state_map.at(endpoint) : + endpoint_state(heart_beat_state((int) ((now_millis() + 60000) / 1000), 9999)); + //Collection tokens = null; + // logger.warn("Assassinating {} via gossip", endpoint); + if (is_exist) { + // FIXME: + fail(unimplemented::cause::GOSSIP); +#if 0 + try { tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint); - } - catch (Throwable th) - { + } catch (Throwable th) { JVMStabilityInspector.inspectThrowable(th); // TODO this is broken logger.warn("Unable to calculate tokens for {}. Will use a random one", address); tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken()); } - int generation = epState.getHeartBeatState().getGeneration(); - int heartbeat = epState.getHeartBeatState().getHeartBeatVersion(); - logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint); - Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); +#endif + int generation = ep_state.get_heart_beat_state().get_generation(); + int heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version(); + //logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint); + //Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); // make sure it did not change - EndpointState newState = endpointStateMap.get(endpoint); - if (newState == null) - logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint); - else if (newState.getHeartBeatState().getGeneration() != generation) - throw new RuntimeException("Endpoint still alive: " + endpoint + " generation changed while trying to assassinate it"); - else if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat) - throw new RuntimeException("Endpoint still alive: " + endpoint + " heartbeat changed while trying to assassinate it"); - epState.updateTimestamp(); // make sure we don't evict it too soon - epState.getHeartBeatState().forceNewerGenerationUnsafe(); + + auto it = endpoint_state_map.find(endpoint); + if (it == endpoint_state_map.end()) { + // logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint); + } else { + auto& new_state = it->second; + if (new_state.get_heart_beat_state().get_generation() != generation) { + throw std::runtime_error(sprint("Endpoint still alive: %s generation changed while trying to assassinate it", endpoint)); + } else if (new_state.get_heart_beat_state().get_heart_beat_version() != heartbeat) { + throw std::runtime_error(sprint("Endpoint still alive: %s heartbeat changed while trying to assassinate it", endpoint)); + } + } + ep_state.update_timestamp(); // make sure we don't evict it too soon + ep_state.get_heart_beat_state().force_newer_generation_unsafe(); } // do not pass go, do not collect 200 dollars, just gtfo - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime())); - handleMajorStateChange(endpoint, epState); - Uninterruptibles.sleepUninterruptibly(intervalInMillis * 4, TimeUnit.MILLISECONDS); - logger.warn("Finished assassinating {}", endpoint); + // FIXME: StorageService.instance and Sleep + // ep_state.add_application_state(application_state::STATUS, StorageService.instance.valueFactory.left(tokens, compute_expire_time())); + handle_major_state_change(endpoint, ep_state); + // Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 4, TimeUnit.MILLISECONDS); + //logger.warn("Finished assassinating {}", endpoint); } - public boolean isKnownEndpoint(InetAddress endpoint) - { - return endpointStateMap.containsKey(endpoint); +public: + bool is_known_endpoint(inet_address endpoint) { + return endpoint_state_map.count(endpoint); } - public int getCurrentGenerationNumber(InetAddress endpoint) - { - return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration(); + int get_current_generation_number(inet_address endpoint) { + return endpoint_state_map.at(endpoint).get_heart_beat_state().get_generation(); } - +private: /** * Returns true if the chosen target was also a seed. False otherwise * @@ -586,206 +582,215 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param epSet a set of endpoint from which a random endpoint is chosen. * @return true if the chosen endpoint is also a seed. */ - private boolean sendGossip(MessageOut message, Set epSet) - { - List liveEndpoints = ImmutableList.copyOf(epSet); - - int size = liveEndpoints.size(); - if (size < 1) + bool send_gossip(MessageOut message, std::set epset) { + std::vector _live_endpoints(epset.begin(), epset.end()); + size_t size = _live_endpoints.size(); + if (size < 1) { return false; + } /* Generate a random number from 0 -> size */ - int index = (size == 1) ? 0 : random.nextInt(size); - InetAddress to = liveEndpoints.get(index); - if (logger.isTraceEnabled()) - logger.trace("Sending a GossipDigestSyn to {} ...", to); - MessagingService.instance().sendOneWay(message, to); - return seeds.contains(to); + std::uniform_int_distribution dist(0, size - 1); + int index = dist(_random); + inet_address to = _live_endpoints[index]; + // if (logger.isTraceEnabled()) + // logger.trace("Sending a GossipDigestSyn to {} ...", to); + // FIXME: Add MessagingService.instance().sendOneWay + // MessagingService.instance().sendOneWay(message, to); + return _seeds.count(to); } /* Sends a Gossip message to a live member and returns true if the recipient was a seed */ - private boolean doGossipToLiveMember(MessageOut message) - { - int size = liveEndpoints.size(); - if (size == 0) + bool do_gossip_to_live_member(MessageOut message) { + size_t size = _live_endpoints.size(); + if (size == 0) { return false; - return sendGossip(message, liveEndpoints); + } + return send_gossip(message, _live_endpoints); } /* Sends a Gossip message to an unreachable member */ - private void doGossipToUnreachableMember(MessageOut message) - { - double liveEndpointCount = liveEndpoints.size(); - double unreachableEndpointCount = unreachableEndpoints.size(); - if (unreachableEndpointCount > 0) - { + void do_gossip_to_unreachable_member(MessageOut message) { + double live_endpoint_count = _live_endpoints.size(); + double unreachable_endpoint_count = _unreachable_endpoints.size(); + if (unreachable_endpoint_count > 0) { /* based on some probability */ - double prob = unreachableEndpointCount / (liveEndpointCount + 1); - double randDbl = random.nextDouble(); - if (randDbl < prob) - sendGossip(message, unreachableEndpoints.keySet()); + double prob = unreachable_endpoint_count / (live_endpoint_count + 1); + std::uniform_real_distribution dist(0, 1); + double rand_dbl = dist(_random); + if (rand_dbl < prob) { + std::set addrs; + for (auto&& x : _unreachable_endpoints) { + addrs.insert(x.first); + } + send_gossip(message, addrs); + } } } /* Gossip to a seed for facilitating partition healing */ - private void doGossipToSeed(MessageOut prod) - { - int size = seeds.size(); - if (size > 0) - { - if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddress())) - { + void do_gossip_to_seed(MessageOut prod) { + size_t size = _seeds.size(); + if (size > 0) { + // FIXME: FBUtilities.getBroadcastAddress + if (size == 1 /* && _seeds.contains(FBUtilities.getBroadcastAddress())*/) { return; } - if (liveEndpoints.size() == 0) - { - sendGossip(prod, seeds); - } - else - { + if (_live_endpoints.size() == 0) { + send_gossip(prod, _seeds); + } else { /* Gossip with the seed with some probability. */ - double probability = seeds.size() / (double) (liveEndpoints.size() + unreachableEndpoints.size()); - double randDbl = random.nextDouble(); - if (randDbl <= probability) - sendGossip(prod, seeds); + double probability = _seeds.size() / (double) (_live_endpoints.size() + _unreachable_endpoints.size()); + std::uniform_real_distribution dist(0, 1); + double rand_dbl = dist(_random); + if (rand_dbl <= probability) { + send_gossip(prod, _seeds); + } } } } - public boolean isGossipOnlyMember(InetAddress endpoint) - { - EndpointState epState = endpointStateMap.get(endpoint); - if (epState == null) - { + bool is_gossip_only_member(inet_address endpoint) { + auto it = endpoint_state_map.find(endpoint); + if (it == endpoint_state_map.end()) { return false; } - return !isDeadState(epState) && !StorageService.instance.getTokenMetadata().isMember(endpoint); + auto& eps = it->second; + // FIXME: StorageService.instance.getTokenMetadata + return !is_dead_state(eps) /* && !StorageService.instance.getTokenMetadata().isMember(endpoint); */; } - private void doStatusCheck() - { - if (logger.isTraceEnabled()) - logger.trace("Performing status check ..."); + void do_status_check() { + // if (logger.isTraceEnabled()) + // logger.trace("Performing status check ..."); - long now = System.currentTimeMillis(); - long nowNano = System.nanoTime(); + int64_t now = now_millis(); - long pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).getPendingTasks(); - if (pending > 0 && lastProcessedMessageAt < now - 1000) - { + // FIXME: + // int64_t pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).getPendingTasks(); + int64_t pending = 1; + if (pending > 0 && _last_processed_message_at < now - 1000) { + // FIXME: SLEEP // if some new messages just arrived, give the executor some time to work on them - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + //Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); // still behind? something's broke - if (lastProcessedMessageAt < now - 1000) - { - logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending); + if (_last_processed_message_at < now - 1000) { + // logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending); return; } } - Set eps = endpointStateMap.keySet(); - for (InetAddress endpoint : eps) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + for (auto& entry : endpoint_state_map) { + const inet_address& endpoint = entry.first; + if (endpoint == get_broadcast_address()) { continue; + } - FailureDetector.instance.interpret(endpoint); - EndpointState epState = endpointStateMap.get(endpoint); - if (epState != null) - { + // FIXME: FailureDetector.instance + // FailureDetector.instance.interpret(endpoint); + fail(unimplemented::cause::GOSSIP); + + auto it = endpoint_state_map.find(endpoint); + if (it != endpoint_state_map.end()) { + endpoint_state& ep_state = it->second; // check if this is a fat client. fat clients are removed automatically from // gossip after FatClientTimeout. Do not remove dead states here. - if (isGossipOnlyMember(endpoint) - && !justRemovedEndpoints.containsKey(endpoint) - && TimeUnit.NANOSECONDS.toMillis(nowNano - epState.getUpdateTimestamp()) > FatClientTimeout) - { - logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, FatClientTimeout); - removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay - evictFromMembership(endpoint); // can get rid of the state immediately + if (is_gossip_only_member(endpoint) + && !_just_removed_endpoints.count(endpoint) + && ((now - ep_state.get_update_timestamp().time_since_epoch().count()) > fat_client_timeout)) { + // logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, FatClientTimeout); + remove_endpoint(endpoint); // will put it in _just_removed_endpoints to respect quarantine delay + evict_from_membershipg(endpoint); // can get rid of the state immediately } // check for dead state removal - long expireTime = getExpireTimeForEndpoint(endpoint); - if (!epState.isAlive() && (now > expireTime) - && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) - { - if (logger.isDebugEnabled()) - { - logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime); - } - evictFromMembership(endpoint); + int64_t expire_time = get_expire_time_for_endpoint(endpoint); + if (!ep_state.is_alive() && (now > expire_time)) { + /* && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) */ + // if (logger.isDebugEnabled()) { + // logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time); + // } + evict_from_membershipg(endpoint); } } } - if (!justRemovedEndpoints.isEmpty()) - { - for (Entry entry : justRemovedEndpoints.entrySet()) - { - if ((now - entry.getValue()) > QUARANTINE_DELAY) - { - if (logger.isDebugEnabled()) - logger.debug("{} elapsed, {} gossip quarantine over", QUARANTINE_DELAY, entry.getKey()); - justRemovedEndpoints.remove(entry.getKey()); - } + for (auto it = _just_removed_endpoints.begin(); it != _just_removed_endpoints.end();) { + auto& t= it->second; + if ((now - t) > QUARANTINE_DELAY) { + // if (logger.isDebugEnabled()) + // logger.debug("{} elapsed, {} gossip quarantine over", QUARANTINE_DELAY, entry.getKey()); + it = _just_removed_endpoints.erase(it); + } else { + it++; } } } - protected long getExpireTimeForEndpoint(InetAddress endpoint) - { - /* default expireTime is aVeryLongTime */ - Long storedTime = expireTimeEndpointMap.get(endpoint); - return storedTime == null ? computeExpireTime() : storedTime; +public: + int64_t get_expire_time_for_endpoint(inet_address endpoint) { + /* default expire_time is A_VERY_LONG_TIME */ + auto it = _expire_time_endpoint_map.find(endpoint); + if (it == _expire_time_endpoint_map.end()) { + return compute_expire_time(); + } else { + int64_t stored_time = it->second; + return stored_time; + } } - public EndpointState getEndpointStateForEndpoint(InetAddress ep) - { - return endpointStateMap.get(ep); + std::experimental::optional get_endpoint_state_for_endpoint(inet_address ep) { + auto it = endpoint_state_map.find(ep); + if (it == endpoint_state_map.end()) { + return {}; + } else { + return it->second; + } } // removes ALL endpoint states; should only be called after shadow gossip - public void resetEndpointStateMap() - { - endpointStateMap.clear(); - unreachableEndpoints.clear(); - liveEndpoints.clear(); + void reset_endpoint_state_map() { + endpoint_state_map.clear(); + _unreachable_endpoints.clear(); + _live_endpoints.clear(); } - public Set> getEndpointStates() - { - return endpointStateMap.entrySet(); + + std::map& get_endpoint_states() { + return endpoint_state_map; } - public boolean usesHostId(InetAddress endpoint) - { - if (MessagingService.instance().knowsVersion(endpoint)) - return true; - else if (getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null) + bool uses_host_id(inet_address endpoint) { + // FIXME + fail(unimplemented::cause::GOSSIP); + if (true /* MessagingService.instance().knowsVersion(endpoint) */) { + return true; + } else if (get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::NET_VERSION)) { return true; + } return false; } - public boolean usesVnodes(InetAddress endpoint) - { - return usesHostId(endpoint) && getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.TOKENS) != null; + bool uses_vnodes(inet_address endpoint) { + return uses_host_id(endpoint) && get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::TOKENS); } - public UUID getHostId(InetAddress endpoint) - { - if (!usesHostId(endpoint)) - throw new RuntimeException("Host " + endpoint + " does not use new-style tokens!"); - return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value); + utils::UUID get_host_id(inet_address endpoint) { + if (!uses_host_id(endpoint)) { + throw std::runtime_error(sprint("Host %s does not use new-style tokens!", endpoint)); + } + sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value; + // FIXME: Add UUID(const sstring& id) constructor + fail(unimplemented::cause::GOSSIP); + return utils::UUID(0, 0); } - EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) - { - EndpointState epState = endpointStateMap.get(forEndpoint); - EndpointState reqdEndpointState = null; - - if (epState != null) - { + std::experimental::optional get_state_for_version_bigger_than(inet_address for_endpoint, int version) { + std::experimental::optional reqd_endpoint_state; + auto it = endpoint_state_map.find(for_endpoint); + if (it != endpoint_state_map.end()) { + auto& eps = it->second; /* * Here we try to include the Heart Beat state only if it is * greater than the version passed in. It might happen that @@ -794,103 +799,97 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * than the version passed in. In this case we also send the old * heart beat and throw it away on the receiver if it is redundant. */ - int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion(); - if (localHbVersion > version) - { - reqdEndpointState = new EndpointState(epState.getHeartBeatState()); - if (logger.isTraceEnabled()) - logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion, version, forEndpoint); + int local_hb_version = eps.get_heart_beat_state().get_heart_beat_version(); + if (local_hb_version > version) { + reqd_endpoint_state.emplace(eps.get_heart_beat_state()); + // if (logger.isTraceEnabled()) + // logger.trace("local heartbeat version {} greater than {} for {}", local_hb_version, version, for_endpoint); } /* Accumulate all application states whose versions are greater than "version" variable */ - for (Entry entry : epState.getApplicationStateMap().entrySet()) - { - VersionedValue value = entry.getValue(); - if (value.version > version) - { - if (reqdEndpointState == null) - { - reqdEndpointState = new EndpointState(epState.getHeartBeatState()); + for (auto& entry : eps.get_application_state_map()) { + auto& value = entry.second; + if (value.version > version) { + if (!reqd_endpoint_state) { + reqd_endpoint_state.emplace(eps.get_heart_beat_state()); } - final ApplicationState key = entry.getKey(); - if (logger.isTraceEnabled()) - logger.trace("Adding state {}: {}" , key, value.value); - reqdEndpointState.addApplicationState(key, value); + auto& key = entry.first; + // if (logger.isTraceEnabled()) + // logger.trace("Adding state {}: {}" , key, value.value); + reqd_endpoint_state->add_application_state(key, value); } } } - return reqdEndpointState; + return reqd_endpoint_state; } /** * determine which endpoint started up earlier */ - public int compareEndpointStartup(InetAddress addr1, InetAddress addr2) - { - EndpointState ep1 = getEndpointStateForEndpoint(addr1); - EndpointState ep2 = getEndpointStateForEndpoint(addr2); - assert ep1 != null && ep2 != null; - return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration(); + int compare_endpoint_startup(inet_address addr1, inet_address addr2) { + auto ep1 = get_endpoint_state_for_endpoint(addr1); + auto ep2 = get_endpoint_state_for_endpoint(addr2); + assert(ep1 && ep2); + return ep1->get_heart_beat_state().get_generation() - ep2->get_heart_beat_state().get_generation(); } - void notifyFailureDetector(Map remoteEpStateMap) - { - for (Entry entry : remoteEpStateMap.entrySet()) - { - notifyFailureDetector(entry.getKey(), entry.getValue()); + void notify_failure_detector(std::map remoteEpStateMap) { + for (auto& entry : remoteEpStateMap) { + notify_failure_detector(entry.first, entry.second); } } - void notifyFailureDetector(InetAddress endpoint, EndpointState remoteEndpointState) - { - EndpointState localEndpointState = endpointStateMap.get(endpoint); + + void notify_failure_detector(inet_address endpoint, endpoint_state remote_endpoint_state) { /* * If the local endpoint state exists then report to the FD only * if the versions workout. */ - if (localEndpointState != null) - { - IFailureDetector fd = FailureDetector.instance; - int localGeneration = localEndpointState.getHeartBeatState().getGeneration(); - int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration(); - if (remoteGeneration > localGeneration) - { - localEndpointState.updateTimestamp(); + auto it = endpoint_state_map.find(endpoint); + if (it != endpoint_state_map.end()) { + auto& local_endpoint_state = it->second; + // FIXME: i_failure_detector!!!! + // i_failure_detector& fd = the_failure_detector(); + int local_generation = local_endpoint_state.get_heart_beat_state().get_generation(); + int remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation(); + if (remote_generation > local_generation) { + local_endpoint_state.update_timestamp(); // this node was dead and the generation changed, this indicates a reboot, or possibly a takeover // we will clean the fd intervals for it and relearn them - if (!localEndpointState.isAlive()) - { - logger.debug("Clearing interval times for {} due to generation change", endpoint); - fd.remove(endpoint); + if (!local_endpoint_state.is_alive()) { + //logger.debug("Clearing interval times for {} due to generation change", endpoint); + // FIXME: i_failure_detector!!!! + // fd.remove(endpoint); } - fd.report(endpoint); + // FIXME: i_failure_detector!!!! + //fd.report(endpoint); return; } - if (remoteGeneration == localGeneration) - { - int localVersion = getMaxEndpointStateVersion(localEndpointState); - int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion(); - if (remoteVersion > localVersion) - { - localEndpointState.updateTimestamp(); + if (remote_generation == local_generation) { + int local_version = get_max_endpoint_state_version(local_endpoint_state); + int remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version(); + if (remote_version > local_version) { + local_endpoint_state.update_timestamp(); // just a version change, report to the fd - fd.report(endpoint); + // fd.report(endpoint); } } } } - private void markAlive(final InetAddress addr, final EndpointState localState) - { - if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) - { - realMarkAlive(addr, localState); - return; - } +private: + void mark_alive(inet_address addr, endpoint_state local_state) { + fail(unimplemented::cause::GOSSIP); - localState.markDead(); + // if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) { + // real_mark_alive(addr, local_state); + // return; + // } + local_state.mark_dead(); + +#if 0 MessageOut echoMessage = new MessageOut(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer); logger.trace("Sending a EchoMessage to {}", addr); IAsyncCallback echoHandler = new IAsyncCallback() @@ -902,340 +901,324 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void response(MessageIn msg) { - realMarkAlive(addr, localState); + real_mark_alive(addr, local_state); } }; MessagingService.instance().sendRR(echoMessage, addr, echoHandler); +#endif } - private void realMarkAlive(final InetAddress addr, final EndpointState localState) - { - if (logger.isTraceEnabled()) - logger.trace("marking as alive {}", addr); - localState.markAlive(); - localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime - liveEndpoints.add(addr); - unreachableEndpoints.remove(addr); - expireTimeEndpointMap.remove(addr); - logger.debug("removing expire time for endpoint : {}", addr); - logger.info("InetAddress {} is now UP", addr); - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onAlive(addr, localState); - if (logger.isTraceEnabled()) - logger.trace("Notified {}", subscribers); + void real_mark_alive(inet_address addr, endpoint_state local_state) { + // if (logger.isTraceEnabled()) + // logger.trace("marking as alive {}", addr); + local_state.mark_alive(); + local_state.update_timestamp(); // prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME + _live_endpoints.insert(addr); + _unreachable_endpoints.erase(addr); + _expire_time_endpoint_map.erase(addr); + // logger.debug("removing expire time for endpoint : {}", addr); + // logger.info("inet_address {} is now UP", addr); + for (shared_ptr& subscriber : _subscribers) + subscriber->on_alive(addr, local_state); + // if (logger.isTraceEnabled()) + // logger.trace("Notified {}", _subscribers); } - private void markDead(InetAddress addr, EndpointState localState) - { - if (logger.isTraceEnabled()) - logger.trace("marking as down {}", addr); - localState.markDead(); - liveEndpoints.remove(addr); - unreachableEndpoints.put(addr, System.nanoTime()); - logger.info("InetAddress {} is now DOWN", addr); - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onDead(addr, localState); - if (logger.isTraceEnabled()) - logger.trace("Notified {}", subscribers); + void mark_dead(inet_address addr, endpoint_state local_state) { + // if (logger.isTraceEnabled()) + // logger.trace("marking as down {}", addr); + local_state.mark_dead(); + _live_endpoints.erase(addr); + _unreachable_endpoints[addr] = now_nanos(); + // logger.info("inet_address {} is now DOWN", addr); + for (shared_ptr& subscriber : _subscribers) + subscriber->on_dead(addr, local_state); + // if (logger.isTraceEnabled()) + // logger.trace("Notified {}", _subscribers); } /** * This method is called whenever there is a "big" change in ep state (a generation change for a known node). * * @param ep endpoint - * @param epState EndpointState for the endpoint + * @param ep_state EndpointState for the endpoint */ - private void handleMajorStateChange(InetAddress ep, EndpointState epState) - { - if (!isDeadState(epState)) - { - if (endpointStateMap.get(ep) != null) - logger.info("Node {} has restarted, now UP", ep); - else - logger.info("Node {} is now part of the cluster", ep); + void handle_major_state_change(inet_address ep, endpoint_state eps) { + if (!is_dead_state(eps)) { + if (endpoint_state_map.count(ep)) { + //logger.info("Node {} has restarted, now UP", ep); + } else { + //logger.info("Node {} is now part of the cluster", ep); + } } - if (logger.isTraceEnabled()) - logger.trace("Adding endpoint state for {}", ep); - endpointStateMap.put(ep, epState); + // if (logger.isTraceEnabled()) + // logger.trace("Adding endpoint state for {}", ep); + endpoint_state_map[ep] = eps; // the node restarted: it is up to the subscriber to take whatever action is necessary - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onRestart(ep, epState); - - if (!isDeadState(epState)) - markAlive(ep, epState); - else - { - logger.debug("Not marking {} alive due to dead state", ep); - markDead(ep, epState); + for (auto& subscriber : _subscribers) { + subscriber->on_restart(ep, eps); + } + + if (!is_dead_state(eps)) { + mark_alive(ep, eps); + } else { + //logger.debug("Not marking {} alive due to dead state", ep); + mark_dead(ep, eps); + } + for (auto& subscriber : _subscribers) { + subscriber->on_join(ep, eps); } - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onJoin(ep, epState); } - public boolean isDeadState(EndpointState epState) - { - if (epState.getApplicationState(ApplicationState.STATUS) == null) +public: + bool is_dead_state(endpoint_state eps) { + if (!eps.get_application_state(application_state::STATUS)) { return false; - String value = epState.getApplicationState(ApplicationState.STATUS).value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - String state = pieces[0]; - for (String deadstate : DEAD_STATES) - { - if (state.equals(deadstate)) + } + auto value = eps.get_application_state(application_state::STATUS)->value; + std::vector pieces; + boost::split(pieces, value, boost::is_any_of(",")); + assert(pieces.size() > 0); + sstring state = pieces[0]; + for (auto& deadstate : DEAD_STATES) { + if (state == deadstate) { return true; + } } return false; } - void applyStateLocally(Map epStateMap) - { - for (Entry entry : epStateMap.entrySet()) - { - InetAddress ep = entry.getKey(); - if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound()) - continue; - if (justRemovedEndpoints.containsKey(ep)) - { - if (logger.isTraceEnabled()) - logger.trace("Ignoring gossip for {} because it is quarantined", ep); + void apply_state_locally(std::map& map) { + for (auto& entry : map) { + auto& ep = entry.first; + if (ep == get_broadcast_address() && !is_in_shadow_round()) { + continue; + } + if (_just_removed_endpoints.count(ep)) { + // if (logger.isTraceEnabled()) + // logger.trace("Ignoring gossip for {} because it is quarantined", ep); continue; } - - EndpointState localEpStatePtr = endpointStateMap.get(ep); - EndpointState remoteState = entry.getValue(); /* If state does not exist just add it. If it does then add it if the remote generation is greater. If there is a generation tie, attempt to break it by heartbeat version. */ - if (localEpStatePtr != null) - { - int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration(); - int remoteGeneration = remoteState.getHeartBeatState().getGeneration(); - if (logger.isTraceEnabled()) - logger.trace("{} local generation {}, remote generation {}", ep, localGeneration, remoteGeneration); - - if (localGeneration != 0 && remoteGeneration > localGeneration + MAX_GENERATION_DIFFERENCE) - { + endpoint_state& remote_state = entry.second; + auto it = endpoint_state_map.find(ep); + if (it != endpoint_state_map.end() ) { + endpoint_state& local_ep_state_ptr = it->second; + int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation(); + int remote_generation = remote_state.get_heart_beat_state().get_generation(); + // if (logger.isTraceEnabled()) { + // logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation); + // } + if (local_generation != 0 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) { // assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself) - logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}", ep, localGeneration, remoteGeneration); - } - else if (remoteGeneration > localGeneration) - { - if (logger.isTraceEnabled()) - logger.trace("Updating heartbeat state generation to {} from {} for {}", remoteGeneration, localGeneration, ep); + // logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}", + // ep, local_generation, remote_generation); + } else if (remote_generation > local_generation) { + // if (logger.isTraceEnabled()) + // logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, ep); // major state change will handle the update by inserting the remote state directly - handleMajorStateChange(ep, remoteState); - } - else if (remoteGeneration == localGeneration) // generation has not changed, apply new states - { + handle_major_state_change(ep, remote_state); + } else if (remote_generation == local_generation) { // generation has not changed, apply new states /* find maximum state */ - int localMaxVersion = getMaxEndpointStateVersion(localEpStatePtr); - int remoteMaxVersion = getMaxEndpointStateVersion(remoteState); - if (remoteMaxVersion > localMaxVersion) - { + int local_max_version = get_max_endpoint_state_version(local_ep_state_ptr); + int remote_max_version = get_max_endpoint_state_version(remote_state); + if (remote_max_version > local_max_version) { // apply states, but do not notify since there is no major change - applyNewStates(ep, localEpStatePtr, remoteState); + apply_new_states(ep, local_ep_state_ptr, remote_state); + } else { + // if (logger.isTraceEnabled()) { + // logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, ep); } - else if (logger.isTraceEnabled()) - logger.trace("Ignoring remote version {} <= {} for {}", remoteMaxVersion, localMaxVersion, ep); - if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead - markAlive(ep, localEpStatePtr); + if (!local_ep_state_ptr.is_alive() && !is_dead_state(local_ep_state_ptr)) { // unless of course, it was dead + mark_alive(ep, local_ep_state_ptr); + } + } else { + // if (logger.isTraceEnabled()) + // logger.trace("Ignoring remote generation {} < {}", remote_generation, local_generation); } - else - { - if (logger.isTraceEnabled()) - logger.trace("Ignoring remote generation {} < {}", remoteGeneration, localGeneration); - } - } - else - { + } else { // this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive - FailureDetector.instance.report(ep); - handleMajorStateChange(ep, remoteState); + // FIXME: failure_detector !!! + // FailureDetector.instance.report(ep); + handle_major_state_change(ep, remote_state); + fail(unimplemented::cause::GOSSIP); } } } - private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) - { +private: + void apply_new_states(inet_address addr, endpoint_state local_state, endpoint_state remote_state) { // don't assert here, since if the node restarts the version will go back to zero - int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); + //int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version(); - localState.setHeartBeatState(remoteState.getHeartBeatState()); - if (logger.isTraceEnabled()) - logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr); + local_state.set_heart_beat_state(remote_state.get_heart_beat_state()); + // if (logger.isTraceEnabled()) { + // logger.trace("Updating heartbeat state version to {} from {} for {} ...", + // local_state.get_heart_beat_state().get_heart_beat_version(), oldVersion, addr); + // } - // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received - for (Entry remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - ApplicationState remoteKey = remoteEntry.getKey(); - VersionedValue remoteValue = remoteEntry.getValue(); - - assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - localState.addApplicationState(remoteKey, remoteValue); + // we need to make two loops here, one to apply, then another to notify, + // this way all states in an update are present and current when the notifications are received + for (auto& remote_entry : remote_state.get_application_state_map()) { + auto& remote_key = remote_entry.first; + auto& remote_value = remote_entry.second; + assert(remote_state.get_heart_beat_state().get_generation() == local_state.get_heart_beat_state().get_generation()); + local_state.add_application_state(remote_key, remote_value); } - for (Entry remoteEntry : remoteState.getApplicationStateMap().entrySet()) - { - doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); + for (auto& entry : remote_state.get_application_state_map()) { + do_on_change_notifications(addr, entry.first, entry.second); } } - + // notify that a local application state is going to change (doesn't get triggered for remote changes) - private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue) - { - for (IEndpointStateChangeSubscriber subscriber : subscribers) - { - subscriber.beforeChange(addr, epState, apState, newValue); + void do_before_change_notifications(inet_address addr, endpoint_state& ep_state, application_state& ap_state, versioned_value& new_value) { + for (auto& subscriber : _subscribers) { + subscriber->before_change(addr, ep_state, ap_state, new_value); } } // notify that an application state has changed - private void doOnChangeNotifications(InetAddress addr, ApplicationState state, VersionedValue value) - { - for (IEndpointStateChangeSubscriber subscriber : subscribers) - { - subscriber.onChange(addr, state, value); + void do_on_change_notifications(inet_address addr, const application_state& state, versioned_value& value) { + for (auto& subscriber : _subscribers) { + subscriber->on_change(addr, state, value); + } + } + /* Request all the state for the endpoint in the g_digest */ + void request_all(gossip_digest g_digest, std::vector delta_gossip_digest_list, int remote_generation) { + /* We are here since we have no data for this endpoint locally so request everthing. */ + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, 0); + // if (logger.isTraceEnabled()) + // logger.trace("request_all for {}", g_digest.get_endpoint()); + } + + /* Send all the data with version greater than max_remote_version */ + void send_all(gossip_digest g_digest, std::map& delta_ep_state_map, int max_remote_version) { + auto ep = g_digest.get_endpoint(); + auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version); + if (local_ep_state_ptr) { + delta_ep_state_map[ep] = *local_ep_state_ptr; } } - /* Request all the state for the endpoint in the gDigest */ - private void requestAll(GossipDigest gDigest, List deltaGossipDigestList, int remoteGeneration) - { - /* We are here since we have no data for this endpoint locally so request everthing. */ - deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0)); - if (logger.isTraceEnabled()) - logger.trace("requestAll for {}", gDigest.getEndpoint()); - } - - /* Send all the data with version greater than maxRemoteVersion */ - private void sendAll(GossipDigest gDigest, Map deltaEpStateMap, int maxRemoteVersion) - { - EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion); - if (localEpStatePtr != null) - deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr); - } - +public: /* This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests and the delta state are built up. */ - void examineGossiper(List gDigestList, List deltaGossipDigestList, Map deltaEpStateMap) - { - if (gDigestList.size() == 0) - { - /* we've been sent a *completely* empty syn, which should normally never happen since an endpoint will at least send a syn with itself. - If this is happening then the node is attempting shadow gossip, and we should reply with everything we know. - */ - logger.debug("Shadow request received, adding all states"); - for (Map.Entry entry : endpointStateMap.entrySet()) - { - gDigestList.add(new GossipDigest(entry.getKey(), 0, 0)); + void examine_gossiper(std::vector& g_digest_list, + std::vector& delta_gossip_digest_list, + std::map& delta_ep_state_map) { + if (g_digest_list.size() == 0) { + /* we've been sent a *completely* empty syn, which should normally + * never happen since an endpoint will at least send a syn with + * itself. If this is happening then the node is attempting shadow + * gossip, and we should reply with everything we know. + */ + // logger.debug("Shadow request received, adding all states"); + for (auto& entry : endpoint_state_map) { + g_digest_list.emplace_back(entry.first, 0, 0); } } - for ( GossipDigest gDigest : gDigestList ) - { - int remoteGeneration = gDigest.getGeneration(); - int maxRemoteVersion = gDigest.getMaxVersion(); + for (gossip_digest& g_digest : g_digest_list) { + int remote_generation = g_digest.get_generation(); + int max_remote_version = g_digest.get_max_version(); /* Get state associated with the end point in digest */ - EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint()); - /* - Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally - then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to - request all the data for this endpoint. - */ - if (epStatePtr != null) - { - int localGeneration = epStatePtr.getHeartBeatState().getGeneration(); + auto it = endpoint_state_map.find(g_digest.get_endpoint()); + /* Here we need to fire a GossipDigestAckMessage. If we have some + * data associated with this endpoint locally then we follow the + * "if" path of the logic. If we have absolutely nothing for this + * endpoint we need to request all the data for this endpoint. + */ + if (it != endpoint_state_map.end()) { + endpoint_state& ep_state_ptr = it->second; + int local_generation = ep_state_ptr.get_heart_beat_state().get_generation(); /* get the max version of all keys in the state associated with this endpoint */ - int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr); - if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion) + int max_local_version = get_max_endpoint_state_version(ep_state_ptr); + if (remote_generation == local_generation && max_remote_version == max_local_version) { continue; + } - if (remoteGeneration > localGeneration) - { + if (remote_generation > local_generation) { /* we request everything from the gossiper */ - requestAll(gDigest, deltaGossipDigestList, remoteGeneration); - } - else if (remoteGeneration < localGeneration) - { + request_all(g_digest, delta_gossip_digest_list, remote_generation); + } else if (remote_generation < local_generation) { /* send all data with generation = localgeneration and version > 0 */ - sendAll(gDigest, deltaEpStateMap, 0); - } - else if (remoteGeneration == localGeneration) - { + send_all(g_digest, delta_ep_state_map, 0); + } else if (remote_generation == local_generation) { /* - If the max remote version is greater then we request the remote endpoint send us all the data - for this endpoint with version greater than the max version number we have locally for this - endpoint. - If the max remote version is lesser, then we send all the data we have locally for this endpoint - with version greater than the max remote version. - */ - if (maxRemoteVersion > maxLocalVersion) - { - deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion)); - } - else if (maxRemoteVersion < maxLocalVersion) - { - /* send all data with generation = localgeneration and version > maxRemoteVersion */ - sendAll(gDigest, deltaEpStateMap, maxRemoteVersion); + * If the max remote version is greater then we request the + * remote endpoint send us all the data for this endpoint + * with version greater than the max version number we have + * locally for this endpoint. + * + * If the max remote version is lesser, then we send all + * the data we have locally for this endpoint with version + * greater than the max remote version. + */ + if (max_remote_version > max_local_version) { + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, max_local_version); + } else if (max_remote_version < max_local_version) { + /* send all data with generation = localgeneration and version > max_remote_version */ + send_all(g_digest, delta_ep_state_map, max_remote_version); } } - } - else - { + } else { /* We are here since we have no data for this endpoint locally so request everything. */ - requestAll(gDigest, deltaGossipDigestList, remoteGeneration); + request_all(g_digest, delta_gossip_digest_list, remote_generation); } } } - - public void start(int generationNumber) - { +#if 0 +public: + void start(int generationNumber) { start(generationNumber, new HashMap()); } /** * Start the gossiper with the generation number, preloading the map of application states before starting */ - public void start(int generationNbr, Map preloadLocalStates) - { - buildSeedsList(); + void start(int generation_nbr, Map preloadLocalStates) { + build_seeds_list(); /* initialize the heartbeat state for this localEndpoint */ - maybeInitializeLocalState(generationNbr); - EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + maybe_initialize_local_state(generation_nbr); + endpoint_state local_state = endpoint_state_map.get(FBUtilities.getBroadcastAddress()); for (Map.Entry entry : preloadLocalStates.entrySet()) - localState.addApplicationState(entry.getKey(), entry.getValue()); + local_state.add_application_state(entry.getKey(), entry.getValue()); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); if (logger.isTraceEnabled()) - logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration()); + logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation()); scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(), - Gossiper.intervalInMillis, - Gossiper.intervalInMillis, + Gossiper.INTERVAL_IN_MILLIS, + Gossiper.INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); } +#endif +#if 0 /** * Do a single 'shadow' round of gossip, where we do not modify any state * Only used when replacing a node, to get and assume its states */ public void doShadowRound() { - buildSeedsList(); + build_seeds_list(); // send a completely empty syn - List gDigests = new ArrayList(); + List g_digests = new ArrayList(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), - gDigests); + g_digests); MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); - inShadowRound = true; - for (InetAddress seed : seeds) + _in_shadow_round = true; + for (inet_address seed : _seeds) MessagingService.instance().sendOneWay(message, seed); int slept = 0; try @@ -1243,11 +1226,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean while (true) { Thread.sleep(1000); - if (!inShadowRound) + if (!_in_shadow_round) break; slept += 1000; if (slept > StorageService.RING_DELAY) - throw new RuntimeException("Unable to gossip with any seeds"); + throw new RuntimeException("Unable to gossip with any _seeds"); } } catch (InterruptedException wtf) @@ -1255,98 +1238,89 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean throw new RuntimeException(wtf); } } +#endif +private: + void build_seeds_list() { + // for (inet_address seed : DatabaseDescriptor.getSeeds()) + // { + // if (seed.equals(FBUtilities.getBroadcastAddress())) + // continue; + // _seeds.add(seed); + // } + } - private void buildSeedsList() - { - for (InetAddress seed : DatabaseDescriptor.getSeeds()) - { - if (seed.equals(FBUtilities.getBroadcastAddress())) - continue; - seeds.add(seed); +public: + // initialize local HB state if needed, i.e., if gossiper has never been started before. + void maybe_initialize_local_state(int generation_nbr) { + heart_beat_state hb_state(generation_nbr); + endpoint_state local_state(hb_state); + local_state.mark_alive(); + // FIXME + // endpoint_state_map.putIfAbsent(FBUtilities.getBroadcastAddress(), local_state); + inet_address ep = get_broadcast_address(); + auto it = endpoint_state_map.find(ep); + if (it == endpoint_state_map.end()) { + endpoint_state_map.emplace(ep, local_state); } } - // initialize local HB state if needed, i.e., if gossiper has never been started before. - public void maybeInitializeLocalState(int generationNbr) - { - HeartBeatState hbState = new HeartBeatState(generationNbr); - EndpointState localState = new EndpointState(hbState); - localState.markAlive(); - endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState); - } - - /** * Add an endpoint we knew about previously, but whose state is unknown */ - public void addSavedEndpoint(InetAddress ep) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - { - logger.debug("Attempt to add self as saved endpoint"); + void add_saved_endpoint(inet_address ep) { + if (ep == get_broadcast_address()) { + // logger.debug("Attempt to add self as saved endpoint"); return; } //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) - EndpointState epState = endpointStateMap.get(ep); - if (epState != null) - { - logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState); - epState.setHeartBeatState(new HeartBeatState(0)); + auto ep_state = endpoint_state(heart_beat_state(0)); + auto it = endpoint_state_map.find(ep); + if (it != endpoint_state_map.end()) { + ep_state = it->second; + // logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state); + ep_state.set_heart_beat_state(heart_beat_state(0)); } - else - { - epState = new EndpointState(new HeartBeatState(0)); - } - - epState.markDead(); - endpointStateMap.put(ep, epState); - unreachableEndpoints.put(ep, System.nanoTime()); - if (logger.isTraceEnabled()) - logger.trace("Adding saved endpoint {} {}", ep, epState.getHeartBeatState().getGeneration()); + ep_state.mark_dead(); + endpoint_state_map[ep] = ep_state; + _unreachable_endpoints[ep] = now_nanos(); + // if (logger.isTraceEnabled()) + // logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation()); } - public void addLocalApplicationState(ApplicationState state, VersionedValue value) - { - EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); - InetAddress epAddr = FBUtilities.getBroadcastAddress(); - assert epState != null; + void add_local_application_state(application_state state, versioned_value value) { + inet_address ep_addr = get_broadcast_address(); + assert(endpoint_state_map.count(ep_addr)); + endpoint_state& ep_state = endpoint_state_map.at(ep_addr); // Fire "before change" notifications: - doBeforeChangeNotifications(epAddr, epState, state, value); + do_before_change_notifications(ep_addr, ep_state, state, value); // Notifications may have taken some time, so preventively raise the version // of the new value, otherwise it could be ignored by the remote node // if another value with a newer version was received in the meantime: - value = StorageService.instance.valueFactory.cloneWithHigherVersion(value); + // FIXME: + // value = StorageService.instance.valueFactory.cloneWithHigherVersion(value); // Add to local application state and fire "on change" notifications: - epState.addApplicationState(state, value); - doOnChangeNotifications(epAddr, state, value); + ep_state.add_application_state(state, value); + do_on_change_notifications(ep_addr, state, value); } - public void addLocalApplicationStates(List> states) - { - taskLock.lock(); - try - { - for (Pair pair : states) - { - addLocalApplicationState(pair.left, pair.right); - } + void add_lccal_application_states(std::list> states) { + // Note: The taskLock in Origin code is removed, we can probaby use a + // simple data structure here + for (std::pair& pair : states) { + add_local_application_state(pair.first, pair.second); } - finally - { - taskLock.unlock(); - } - } +#if 0 public void stop() { if (scheduledGossipTask != null) scheduledGossipTask.cancel(false); logger.info("Announcing shutdown"); - Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN); - for (InetAddress ep : liveEndpoints) + for (inet_address ep : _live_endpoints) MessagingService.instance().sendOneWay(message, ep); } @@ -1354,64 +1328,69 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean { return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } +#endif - protected void finishShadowRound() - { - if (inShadowRound) - inShadowRound = false; +public: + void finish_shadow_round() { + if (_in_shadow_round) { + _in_shadow_round = false; + } } - protected boolean isInShadowRound() - { - return inShadowRound; + bool is_in_shadow_round() { + return _in_shadow_round; } +#if 0 @VisibleForTesting - public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) + public void initializeNodeUnsafe(inet_address addr, UUID uuid, int generation_nbr) { - HeartBeatState hbState = new HeartBeatState(generationNbr); - EndpointState newState = new EndpointState(hbState); - newState.markAlive(); - EndpointState oldState = endpointStateMap.putIfAbsent(addr, newState); - EndpointState localState = oldState == null ? newState : oldState; + HeartBeatState hb_state = new HeartBeatState(generation_nbr); + endpoint_state new_state = new endpoint_state(hb_state); + new_state.mark_alive(); + endpoint_state oldState = endpoint_state_map.putIfAbsent(addr, new_state); + endpoint_state local_state = oldState == null ? new_state : oldState; // always add the version state - localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); - localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + local_state.add_application_state(application_state::NET_VERSION, StorageService.instance.valueFactory.networkVersion()); + local_state.add_application_state(application_state::HOST_ID, StorageService.instance.valueFactory.host_id(uuid)); } @VisibleForTesting - public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value) + public void injectApplicationState(inet_address endpoint, application_state state, versioned_value value) { - EndpointState localState = endpointStateMap.get(endpoint); - localState.addApplicationState(state, value); + endpoint_state local_state = endpoint_state_map.get(endpoint); + local_state.add_application_state(state, value); } - public long getEndpointDowntime(String address) throws UnknownHostException + public int64_t get_endpoint_downtime(String address) throws UnknownHostException { - return getEndpointDowntime(InetAddress.getByName(address)); + return get_endpoint_downtime(inet_address.getByName(address)); } public int getCurrentGenerationNumber(String address) throws UnknownHostException { - return getCurrentGenerationNumber(InetAddress.getByName(address)); + return getCurrentGenerationNumber(inet_address.getByName(address)); } - - public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime) - { - if (logger.isDebugEnabled()) - { - logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime); - } - expireTimeEndpointMap.put(endpoint, expireTime); - } - - public static long computeExpireTime() - { - return System.currentTimeMillis() + Gossiper.aVeryLongTime; - } - -} #endif +public: + void add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time) { + // if (logger.isDebugEnabled()) { + // logger.debug("adding expire time for endpoint : {} ({})", endpoint, expire_time); + // } + _expire_time_endpoint_map[endpoint] = expire_time; + } + + static int64_t compute_expire_time() { + return now_millis() + A_VERY_LONG_TIME; + } +}; + +extern distributed _the_gossiper; +inline gossiper& get_local_gossiper() { + assert(engine().cpu_id() == 0); + return _the_gossiper.local(); +} + } // namespace gms diff --git a/gms/i_endpoint_state_change_subscriber.hh b/gms/i_endpoint_state_change_subscriber.hh index 23eecf96c8..5ab5d0a16a 100644 --- a/gms/i_endpoint_state_change_subscriber.hh +++ b/gms/i_endpoint_state_change_subscriber.hh @@ -48,7 +48,7 @@ public: */ virtual void on_join(inet_address endpoint, endpoint_state ep_state) = 0; - virtual void beforechange(inet_address endpoint, endpoint_state current_state, application_state new_statekey, versioned_value newvalue) = 0; + virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, versioned_value newvalue) = 0; virtual void on_change(inet_address endpoint, application_state state, versioned_value value) = 0; diff --git a/gms/inet_address.hh b/gms/inet_address.hh index 2bc3f12ca1..a099b05c93 100644 --- a/gms/inet_address.hh +++ b/gms/inet_address.hh @@ -39,6 +39,9 @@ public: size_t serialized_size() const { return serialize_int8_size + serialize_int32_size; } + friend inline bool operator==(const inet_address& x, const inet_address& y) { + return x._addr == y._addr; + } friend inline bool operator<(const inet_address& x, const inet_address& y) { return x._addr.ip < y._addr.ip; }