diff --git a/database.cc b/database.cc index 7995b1c707..24c7b24ae2 100644 --- a/database.cc +++ b/database.cc @@ -29,6 +29,7 @@ #include "frozen_mutation.hh" #include "mutation_partition_applier.hh" #include "core/do_with.hh" +#include "service/storage_service.hh" thread_local logging::logger dblog("database"); @@ -452,7 +453,7 @@ column_family::seal_active_memtable(database* db) { if (cl != nullptr) { cl->discard_completed_segments(_schema->id(), old->replay_position()); } - _memtables->erase(boost::find(*_memtables, old)); + _memtables->erase(boost::range::find(*_memtables, old)); } catch (std::exception& e) { dblog.error("failed to write sstable: {}", e.what()); } catch (...) { @@ -684,20 +685,10 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const void keyspace::create_replication_strategy() { - static thread_local locator::token_metadata tm; static locator::simple_snitch snitch; static std::unordered_map options = {{"replication_factor", "3"}}; - auto d2t = [](double d) { - unsigned long l = net::hton(static_cast(d*(std::numeric_limits::max()))); - std::array a; - memcpy(a.data(), &l, 8); - return a; - }; - tm.update_normal_token({dht::token::kind::key, {d2t(0).data(), 8}}, to_sstring("127.0.0.1")); - tm.update_normal_token({dht::token::kind::key, {d2t(1.0/4).data(), 8}}, to_sstring("127.0.0.2")); - tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3")); - tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4")); - _replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), tm, snitch, options); + auto& ss = service::get_local_storage_service(); + _replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), ss.get_token_metadata(), snitch, options); } locator::abstract_replication_strategy& diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index 1cd22a54fc..55c29b5e08 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -22,6 +22,7 @@ #include "gms/inet_address.hh" #include "locator/token_metadata.hh" #include "dht/i_partitioner.hh" +#include namespace dht { @@ -76,7 +77,7 @@ public: * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node. * else choose num_tokens tokens at random */ - static std::set get_bootstrap_tokens(token_metadata metadata) { + static std::unordered_set get_bootstrap_tokens(token_metadata metadata) { #if 0 Collection initialTokens = DatabaseDescriptor.getInitialTokens(); // if user specified tokens, use those @@ -106,8 +107,8 @@ public: return get_random_tokens(metadata, num_tokens); } - static std::set get_random_tokens(token_metadata metadata, size_t num_tokens) { - std::set tokens; + static std::unordered_set get_random_tokens(token_metadata metadata, size_t num_tokens) { + std::unordered_set tokens; while (tokens.size() < num_tokens) { auto token = global_partitioner().get_random_token(); auto ep = metadata.get_endpoint(token); diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index d531c8012a..64a1676320 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -138,7 +138,7 @@ std::ostream& operator<<(std::ostream& out, const token& t) { auto flags = out.flags(); for (auto c : t._data) { unsigned char x = c; - out << std::hex << +x << " "; + out << std::hex << std::setw(2) << std::setfill('0') << +x << " "; } out.flags(flags); return out; diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index f9c95afd46..c047890c89 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -61,7 +61,7 @@ public: // [0x00, 0x80] == 1/512 // [0xff, 0x80] == 1 - 1/512 bytes _data; - token(kind k, bytes d) : _kind(k), _data(d) { + token(kind k, bytes d) : _kind(std::move(k)), _data(std::move(d)) { } }; diff --git a/gms/failure_detector.cc b/gms/failure_detector.cc index ce1ef7e655..5f378e2630 100644 --- a/gms/failure_detector.cc +++ b/gms/failure_detector.cc @@ -190,20 +190,7 @@ double failure_detector::get_phi_convict_threshold() { } bool failure_detector::is_alive(inet_address ep) { - if (ep.is_broadcast_address()) { - return true; - } - - auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(ep); - // we could assert not-null, but having isAlive fail screws a node over so badly that - // it's worth being defensive here so minor bugs don't cause disproportionate - // badness. (See CASSANDRA-1463 for an example). - if (eps) { - return eps->is_alive(); - } else { - // logger.error("unknown endpoint {}", ep); - return false; - } + return get_local_gossiper().is_alive(ep); } void failure_detector::report(inet_address ep) { diff --git a/gms/failure_detector.hh b/gms/failure_detector.hh index d58dc4e331..6fcc872df6 100644 --- a/gms/failure_detector.hh +++ b/gms/failure_detector.hh @@ -167,7 +167,6 @@ public: extern distributed _the_failure_detector; inline failure_detector& get_local_failure_detector() { - assert(engine().cpu_id() == 0); return _the_failure_detector.local(); } inline distributed& get_failure_detector() { diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 0810fa3438..1509ae9694 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -787,9 +787,7 @@ utils::UUID gossiper::get_host_id(inet_address endpoint) { throw std::runtime_error(sprint("Host %s does not use new-style tokens!", endpoint)); } sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value; - // FIXME: Add UUID(const sstring& id) constructor - warn(unimplemented::cause::GOSSIP); - return utils::UUID(0, 0); + return utils::UUID(uuid); } std::experimental::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, int version) { @@ -1240,4 +1238,20 @@ void gossiper::debug_show() { reporter->arm_periodic(std::chrono::milliseconds(1000)); } +bool gossiper::is_alive(inet_address ep) { + if (ep == get_broadcast_address()) { + return true; + } + auto eps = get_endpoint_state_for_endpoint(ep); + // we could assert not-null, but having isAlive fail screws a node over so badly that + // it's worth being defensive here so minor bugs don't cause disproportionate + // badness. (See CASSANDRA-1463 for an example). + if (eps) { + return eps->is_alive(); + } else { + // logger.error("unknown endpoint {}", ep); + return false; + } +} + } // namespace gms diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 4237822808..172fd857b2 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -360,6 +360,7 @@ private: void handle_major_state_change(inet_address ep, endpoint_state eps); public: + bool is_alive(inet_address ep); bool is_dead_state(endpoint_state eps); void apply_state_locally(std::map& map); diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index eabdd5f93a..6e7ce95cfa 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -21,7 +21,6 @@ #pragma once -#include #include "types.hh" #include "core/sstring.hh" #include "util/serialization.hh" @@ -30,6 +29,8 @@ #include "gms/inet_address.hh" #include "dht/i_partitioner.hh" #include "to_string.hh" +#include +#include namespace gms { @@ -125,17 +126,17 @@ public: return versioned_value(value.value); } - versioned_value bootstrapping(const std::set& tokens) { + versioned_value bootstrapping(const std::unordered_set& tokens) { return versioned_value(version_string({sstring(versioned_value::STATUS_BOOTSTRAPPING), make_token_string(tokens)})); } - versioned_value normal(const std::set& tokens) { + versioned_value normal(const std::unordered_set& tokens) { return versioned_value(version_string({sstring(versioned_value::STATUS_NORMAL), make_token_string(tokens)})); } - sstring make_token_string(const std::set& tokens) { + sstring make_token_string(const std::unordered_set& tokens) { // FIXME: // return partitioner.getTokenFactory().toString(Iterables.get(tokens, 0)); return "TOKENS"; @@ -151,19 +152,19 @@ public: return versioned_value(new_version.to_sstring()); } - versioned_value leaving(const std::set& tokens) { + versioned_value leaving(const std::unordered_set& tokens) { return versioned_value(version_string({sstring(versioned_value::STATUS_LEAVING), make_token_string(tokens)})); } - versioned_value left(const std::set& tokens, long expireTime) { + versioned_value left(const std::unordered_set& tokens, long expireTime) { return versioned_value(version_string({sstring(versioned_value::STATUS_LEFT), make_token_string(tokens), std::to_string(expireTime)})); } versioned_value moving(token t) { - std::set tokens = {t}; + std::unordered_set tokens = {t}; return versioned_value(version_string({sstring(versioned_value::STATUS_MOVING), make_token_string(tokens)})); } @@ -173,7 +174,7 @@ public: return versioned_value(hostId.to_sstring()); } - versioned_value tokens(const std::set tokens) { + versioned_value tokens(const std::unordered_set tokens) { sstring tokens_string; for (auto it = tokens.cbegin(); it != tokens.cend(); ) { tokens_string += to_hex(it->_data); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 3331283a0c..91742f5e52 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -2,6 +2,7 @@ * Copyright (C) 2015 Cloudius Systems, Ltd. */ +#include "utils/UUID.hh" #include "token_metadata.hh" #include @@ -131,9 +132,13 @@ void token_metadata::debug_show() { print("inet_address=%s, token=%s\n", x.second, x.first); } print("Endpoint -> UUID\n"); - for (auto x: _endpoint_to_host_id_map) { + for (auto x : _endpoint_to_host_id_map) { print("inet_address=%s, uuid=%s\n", x.first, x.second); } + print("Sorted Token\n"); + for (auto x : _sorted_tokens) { + print("token=%s\n", x); + } }); reporter->arm_periodic(std::chrono::seconds(1)); } @@ -160,5 +165,76 @@ void token_metadata::update_host_id(const UUID& host_id, inet_address endpoint) _endpoint_to_host_id_map[endpoint] = host_id; } - +utils::UUID token_metadata::get_host_id(inet_address endpoint) { + assert(_endpoint_to_host_id_map.count(endpoint)); + return _endpoint_to_host_id_map.at(endpoint); } + +gms::inet_address token_metadata::get_endpoint_for_host_id(UUID host_id) { + auto beg = _endpoint_to_host_id_map.cbegin(); + auto end = _endpoint_to_host_id_map.cend(); + auto it = std::find_if(beg, end, [host_id] (auto x) { + return x.second == host_id; + }); + assert(it != end); + return (*it).first; +} + +const auto& token_metadata::get_endpoint_to_host_id_map_for_reading() { + return _endpoint_to_host_id_map; +} + +bool token_metadata::is_member(inet_address endpoint) { + auto beg = _token_to_endpoint_map.cbegin(); + auto end = _token_to_endpoint_map.cend(); + return end != std::find_if(beg, end, [endpoint] (const auto& x) { + return x.second == endpoint; + }); +} + +void token_metadata::add_bootstrap_token(token t, inet_address endpoint) { + std::unordered_set tokens{t}; + add_bootstrap_tokens(tokens, endpoint); +} + +void token_metadata::add_bootstrap_tokens(std::unordered_set tokens, inet_address endpoint) { + for (auto t : tokens) { + auto old_endpoint = _bootstrap_tokens.find(t); + if (old_endpoint != _bootstrap_tokens.end() && (*old_endpoint).second != endpoint) { + auto msg = sprint("Bootstrap Token collision between %s and %s (token %s", (*old_endpoint).second, endpoint, t); + throw std::runtime_error(msg); + } + + auto old_endpoint2 = _token_to_endpoint_map.find(t); + if (old_endpoint2 != _token_to_endpoint_map.end() && (*old_endpoint2).second != endpoint) { + auto msg = sprint("Bootstrap Token collision between %s and %s (token %s", (*old_endpoint2).second, endpoint, t); + throw std::runtime_error(msg); + } + } + + // Unfortunately, std::remove_if does not work with std::map + for (auto it = _bootstrap_tokens.begin(); it != _bootstrap_tokens.end();) { + if ((*it).second == endpoint) { + it = _bootstrap_tokens.erase(it); + } else { + it++; + } + } + + for (auto t : tokens) { + _bootstrap_tokens[t] = endpoint; + } +} + +void token_metadata::remove_bootstrap_tokens(std::unordered_set tokens) { + assert(!tokens.empty()); + for (auto t : tokens) { + _bootstrap_tokens.erase(t); + } +} + +bool token_metadata::is_leaving(inet_address endpoint) { + return _leaving_endpoints.count(endpoint); +} + +} // namespace locator diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index a1df113809..96e36b747c 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -53,6 +53,10 @@ private: /** Maintains endpoint to host ID map of every node in the cluster */ std::unordered_map _endpoint_to_host_id_map; + std::unordered_map _bootstrap_tokens; + std::unordered_set _leaving_endpoints; + std::unordered_map _moving_endpoints; + std::vector _sorted_tokens; topology _topology; @@ -118,7 +122,7 @@ public: private final ConcurrentMap, InetAddress>> pendingRanges = new ConcurrentHashMap, InetAddress>>(); // nodes which are migrating to the new tokens in the ring - private final Set> movingEndpoints = new HashSet>(); + private final Set> _moving_endpoints = new HashSet>(); /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -165,7 +169,7 @@ public: lock.readLock().lock(); try { - for (Token token : bootstrapTokens.keySet()) + for (Token token : _bootstrap_tokens.keySet()) for (Range range : sourceRanges) if (range.contains(token)) n++; @@ -216,10 +220,10 @@ public: assert tokens != null && !tokens.isEmpty(); - bootstrapTokens.removeValue(endpoint); + _bootstrap_tokens.removeValue(endpoint); tokenToEndpointMap.removeValue(endpoint); topology.addEndpoint(endpoint); - leavingEndpoints.remove(endpoint); + _leaving_endpoints.remove(endpoint); removeFromMoving(endpoint); // also removing this endpoint from moving for (Token token : tokens) @@ -252,105 +256,22 @@ public: * @param endpoint */ void update_host_id(const UUID& host_id, inet_address endpoint); -#if 0 + /** Return the unique host ID for an end-point. */ - public UUID getHostId(InetAddress endpoint) - { - lock.readLock().lock(); - try - { - return _endpoint_to_host_id_map.get(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } + UUID get_host_id(inet_address endpoint); /** Return the end-point for a unique host ID */ - public InetAddress getEndpointForHostId(UUID hostId) - { - lock.readLock().lock(); - try - { - return _endpoint_to_host_id_map.inverse().get(hostId); - } - finally - { - lock.readLock().unlock(); - } - } + inet_address get_endpoint_for_host_id(UUID host_id); /** @return a copy of the endpoint-to-id map for read-only operations */ - public Map getEndpointToHostIdMapForReading() - { - lock.readLock().lock(); - try - { - Map readMap = new HashMap(); - readMap.putAll(_endpoint_to_host_id_map); - return readMap; - } - finally - { - lock.readLock().unlock(); - } - } + const auto& get_endpoint_to_host_id_map_for_reading(); - @Deprecated - public void addBootstrapToken(Token token, InetAddress endpoint) - { - addBootstrapTokens(Collections.singleton(token), endpoint); - } + void add_bootstrap_token(token t, inet_address endpoint); - public void addBootstrapTokens(Collection tokens, InetAddress endpoint) - { - assert tokens != null && !tokens.isEmpty(); - assert endpoint != null; + void add_bootstrap_tokens(std::unordered_set tokens, inet_address endpoint); - lock.writeLock().lock(); - try - { - - InetAddress oldEndpoint; - - for (Token token : tokens) - { - oldEndpoint = bootstrapTokens.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); - - oldEndpoint = tokenToEndpointMap.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); - } - - bootstrapTokens.removeValue(endpoint); - - for (Token token : tokens) - bootstrapTokens.put(token, endpoint); - } - finally - { - lock.writeLock().unlock(); - } - } - - public void removeBootstrapTokens(Collection tokens) - { - assert tokens != null && !tokens.isEmpty(); - - lock.writeLock().lock(); - try - { - for (Token token : tokens) - bootstrapTokens.remove(token); - } - finally - { - lock.writeLock().unlock(); - } - } + void remove_bootstrap_tokens(std::unordered_set tokens); +#if 0 public void addLeavingEndpoint(InetAddress endpoint) { @@ -359,7 +280,7 @@ public: lock.writeLock().lock(); try { - leavingEndpoints.add(endpoint); + _leaving_endpoints.add(endpoint); } finally { @@ -380,7 +301,7 @@ public: try { - movingEndpoints.add(Pair.create(token, endpoint)); + _moving_endpoints.add(Pair.create(token, endpoint)); } finally { @@ -395,10 +316,10 @@ public: lock.writeLock().lock(); try { - bootstrapTokens.removeValue(endpoint); + _bootstrap_tokens.removeValue(endpoint); tokenToEndpointMap.removeValue(endpoint); topology.removeEndpoint(endpoint); - leavingEndpoints.remove(endpoint); + _leaving_endpoints.remove(endpoint); _endpoint_to_host_id_map.remove(endpoint); sortedTokens = sortTokens(); invalidateCachedRings(); @@ -420,11 +341,11 @@ public: lock.writeLock().lock(); try { - for (Pair pair : movingEndpoints) + for (Pair pair : _moving_endpoints) { if (pair.right.equals(endpoint)) { - movingEndpoints.remove(pair); + _moving_endpoints.remove(pair); break; } } @@ -459,58 +380,21 @@ public: return getTokens(endpoint).iterator().next(); } - public boolean isMember(InetAddress endpoint) - { - assert endpoint != null; +#endif - lock.readLock().lock(); - try - { - return tokenToEndpointMap.inverse().containsKey(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } + bool is_member(inet_address endpoint); - public boolean isLeaving(InetAddress endpoint) - { - assert endpoint != null; + bool is_leaving(inet_address endpoint); - lock.readLock().lock(); - try - { - return leavingEndpoints.contains(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } - - public boolean isMoving(InetAddress endpoint) - { - assert endpoint != null; - - lock.readLock().lock(); - - try - { - for (Pair pair : movingEndpoints) - { - if (pair.right.equals(endpoint)) - return true; + bool is_moving(inet_address endpoint) { + for (auto x : _moving_endpoints) { + if (x.second == endpoint) { + return true; } - - return false; - } - finally - { - lock.readLock().unlock(); } + return false; } - +#if 0 private final AtomicReference cachedTokenMap = new AtomicReference(); /** @@ -570,7 +454,7 @@ public: { TokenMetadata allLeftMetadata = cloneOnlyTokenMap(); - for (InetAddress endpoint : leavingEndpoints) + for (InetAddress endpoint : _leaving_endpoints) allLeftMetadata.removeEndpoint(endpoint); return allLeftMetadata; @@ -595,11 +479,11 @@ public: { TokenMetadata metadata = cloneOnlyTokenMap(); - for (InetAddress endpoint : leavingEndpoints) + for (InetAddress endpoint : _leaving_endpoints) metadata.removeEndpoint(endpoint); - for (Pair pair : movingEndpoints) + for (Pair pair : _moving_endpoints) metadata.updateNormalToken(pair.left, pair.right); return metadata; @@ -704,7 +588,7 @@ public: { Multimap, InetAddress> newPendingRanges = HashMultimap.create(); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) + if (_bootstrap_tokens.isEmpty() && _leaving_endpoints.isEmpty() && _moving_endpoints.isEmpty()) { if (logger.isDebugEnabled()) logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); @@ -720,7 +604,7 @@ public: // get all ranges that will be affected by leaving nodes Set> affectedRanges = new HashSet>(); - for (InetAddress endpoint : leavingEndpoints) + for (InetAddress endpoint : _leaving_endpoints) affectedRanges.addAll(addressRanges.get(endpoint)); // for each of those ranges, find what new nodes will be responsible for the range when @@ -738,7 +622,7 @@ public: // For each of the bootstrapping nodes, simply add and remove them one by one to // allLeftMetadata and check in between what their ranges would be. - Multimap bootstrapAddresses = bootstrapTokens.inverse(); + Multimap bootstrapAddresses = _bootstrap_tokens.inverse(); for (InetAddress endpoint : bootstrapAddresses.keySet()) { Collection tokens = bootstrapAddresses.get(endpoint); @@ -754,7 +638,7 @@ public: // For each of the moving nodes, we do the same thing we did for bootstrapping: // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair moving : movingEndpoints) + for (Pair moving : _moving_endpoints) { InetAddress endpoint = moving.right; // address of the moving node @@ -802,7 +686,7 @@ public: lock.readLock().lock(); try { - return new BiMultiValMap(bootstrapTokens); + return new BiMultiValMap(_bootstrap_tokens); } finally { @@ -823,13 +707,13 @@ public: } } - /** caller should not modify leavingEndpoints */ + /** caller should not modify _leaving_endpoints */ public Set getLeavingEndpoints() { lock.readLock().lock(); try { - return ImmutableSet.copyOf(leavingEndpoints); + return ImmutableSet.copyOf(_leaving_endpoints); } finally { @@ -846,7 +730,7 @@ public: lock.readLock().lock(); try { - return ImmutableSet.copyOf(movingEndpoints); + return ImmutableSet.copyOf(_moving_endpoints); } finally { @@ -922,10 +806,10 @@ public: { tokenToEndpointMap.clear(); _endpoint_to_host_id_map.clear(); - bootstrapTokens.clear(); - leavingEndpoints.clear(); + _bootstrap_tokens.clear(); + _leaving_endpoints.clear(); pendingRanges.clear(); - movingEndpoints.clear(); + _moving_endpoints.clear(); sortedTokens.clear(); topology.clear(); invalidateCachedRings(); @@ -957,22 +841,22 @@ public: } } - if (!bootstrapTokens.isEmpty()) + if (!_bootstrap_tokens.isEmpty()) { sb.append("Bootstrapping Tokens:" ); sb.append(System.getProperty("line.separator")); - for (Map.Entry entry : bootstrapTokens.entrySet()) + for (Map.Entry entry : _bootstrap_tokens.entrySet()) { sb.append(entry.getValue()).append(":").append(entry.getKey()); sb.append(System.getProperty("line.separator")); } } - if (!leavingEndpoints.isEmpty()) + if (!_leaving_endpoints.isEmpty()) { sb.append("Leaving Endpoints:"); sb.append(System.getProperty("line.separator")); - for (InetAddress ep : leavingEndpoints) + for (InetAddress ep : _leaving_endpoints) { sb.append(ep); sb.append(System.getProperty("line.separator")); @@ -1060,9 +944,9 @@ public: lock.readLock().lock(); try { - Map map = new HashMap(tokenToEndpointMap.size() + bootstrapTokens.size()); + Map map = new HashMap(tokenToEndpointMap.size() + _bootstrap_tokens.size()); map.putAll(tokenToEndpointMap); - map.putAll(bootstrapTokens); + map.putAll(_bootstrap_tokens); return map; } finally diff --git a/main.cc b/main.cc index bd4b407fe6..43dd22e04a 100644 --- a/main.cc +++ b/main.cc @@ -28,7 +28,7 @@ read_config(bpo::variables_map& opts, db::config& cfg) { } future<> init_storage_service() { - return service::get_storage_service().start_single().then([] { + return service::get_storage_service().start().then([] { print("Start Storage service ...\n"); }); } @@ -45,7 +45,7 @@ future<> init_messaging_service(auto listen_address, auto seed_provider) { return net::get_messaging_service().start(listen).then([seeds] { auto& ms = net::get_local_messaging_service(); print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port()); - return gms::get_failure_detector().start_single().then([seeds] { + return gms::get_failure_detector().start().then([seeds] { return gms::get_gossiper().start_single().then([seeds] { auto& gossiper = gms::get_local_gossiper(); gossiper.set_seeds(seeds); diff --git a/service/storage_service.cc b/service/storage_service.cc index 46823a694b..77063158d7 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -20,7 +20,7 @@ bool storage_service::should_bootstrap() { } future<> storage_service::prepare_to_join() { - if (!joined) { + if (!_joined) { std::map app_states; #if 0 if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) @@ -89,9 +89,8 @@ future<> storage_service::prepare_to_join() { future<> storage_service::join_token_ring(int delay) { auto f = make_ready_future<>(); + _joined = true; #if 0 - joined = true; - // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed. // If we are a seed, or if the user manually sets auto_bootstrap to false, // we'll skip streaming data from other nodes and jump directly into the ring. @@ -212,7 +211,7 @@ future<> storage_service::join_token_ring(int delay) { } bootstrap(_bootstrap_tokens); - assert !isBootstrapMode; // bootstrap will block until finished + assert !_is_bootstrap_mode; // bootstrap will block until finished #endif } else { // FIXME: DatabaseDescriptor.getNumTokens() @@ -256,7 +255,7 @@ future<> storage_service::join_token_ring(int delay) { if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null) MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); - if (!isSurveyMode) + if (!_is_survey_mode) { // start participating in the ring. SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); @@ -277,8 +276,32 @@ future<> storage_service::join_token_ring(int delay) { }); } -future<> storage_service::bootstrap(std::set tokens) { - // isBootstrapMode = true; +void storage_service::join_ring() { +#if 0 + if (!joined) { + logger.info("Joining ring by operator request"); + try + { + joinTokenRing(0); + } + catch (ConfigurationException e) + { + throw new IOException(e.getMessage()); + } + } else if (_is_survey_mode) { + set_tokens(SystemKeyspace.getSavedTokens()); + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + _is_survey_mode = false; + logger.info("Leaving write survey mode and joining ring at operator request"); + assert _token_metadata.sortedTokens().size() > 0; + + Auth.setup(); + } +#endif +} + +future<> storage_service::bootstrap(std::unordered_set tokens) { + _is_bootstrap_mode = true; // SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping // FIXME: DatabaseDescriptor.isReplacing() auto is_replacing = false; @@ -309,5 +332,586 @@ future<> storage_service::bootstrap(std::set tokens) { }); } +void storage_service::handle_state_bootstrap(inet_address endpoint) { + ss_debug("SS::handle_state_bootstrap endpoint=%s\n", endpoint); + // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified + auto tokens = get_tokens_for(endpoint); + // if (logger.isDebugEnabled()) + // logger.debug("Node {} state bootstrapping, token {}", endpoint, tokens); + + // if this node is present in token metadata, either we have missed intermediate states + // or the node had crashed. Print warning if needed, clear obsolete stuff and + // continue. + if (_token_metadata.is_member(endpoint)) { + // If isLeaving is false, we have missed both LEAVING and LEFT. However, if + // isLeaving is true, we have only missed LEFT. Waiting time between completing + // leave operation and rebootstrapping is relatively short, so the latter is quite + // common (not enough time for gossip to spread). Therefore we report only the + // former in the log. + if (!_token_metadata.is_leaving(endpoint)) { + // logger.info("Node {} state jump to bootstrap", endpoint); + } + // _token_metadata.removeEndpoint(endpoint); + } + + _token_metadata.add_bootstrap_tokens(tokens, endpoint); + // FIXME + // PendingRangeCalculatorService.instance.update(); + + auto& gossiper = gms::get_local_gossiper(); + if (gossiper.uses_host_id(endpoint)) { + _token_metadata.update_host_id(gossiper.get_host_id(endpoint), endpoint); + } } + +void storage_service::handle_state_normal(inet_address endpoint) { + ss_debug("SS::handle_state_bootstrap endpoint=%s\n", endpoint); + auto tokens = get_tokens_for(endpoint); + auto& gossiper = gms::get_local_gossiper(); + + std::unordered_set tokensToUpdateInMetadata; + std::unordered_set tokensToUpdateInSystemKeyspace; + std::unordered_set localTokensToRemove; + std::unordered_set endpointsToRemove; + + // if (logger.isDebugEnabled()) + // logger.debug("Node {} state normal, token {}", endpoint, tokens); + + if (_token_metadata.is_member(endpoint)) { + // logger.info("Node {} state jump to normal", endpoint); + } + update_peer_info(endpoint); +#if 1 + // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). + if (gossiper.uses_host_id(endpoint)) { + auto host_id = gossiper.get_host_id(endpoint); + //inet_address existing = _token_metadata.get_endpoint_for_host_id(host_id); + // if (DatabaseDescriptor.isReplacing() && + // Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && + // (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) { + if (false) { + // logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); + } else { + if (false /*existing != null && !existing.equals(endpoint)*/) { +#if 0 + if (existing == get_broadcast_address()) { + logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint); + _token_metadata.removeEndpoint(endpoint); + endpointsToRemove.add(endpoint); + } else if (gossiper.compare_endpoint_startup(endpoint, existing) > 0) { + logger.warn("Host ID collision for {} between {} and {}; {} is the new owner", hostId, existing, endpoint, endpoint); + _token_metadata.removeEndpoint(existing); + endpointsToRemove.add(existing); + _token_metadata.update_host_id(hostId, endpoint); + } else { + logger.warn("Host ID collision for {} between {} and {}; ignored {}", hostId, existing, endpoint, endpoint); + _token_metadata.removeEndpoint(endpoint); + endpointsToRemove.add(endpoint); + } +#endif + } else { + _token_metadata.update_host_id(host_id, endpoint); + } + } + } +#endif + + for (auto t : tokens) { + // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. + auto current_owner = _token_metadata.get_endpoint(t); + if (!current_owner) { + // logger.debug("New node {} at token {}", endpoint, t); + tokensToUpdateInMetadata.insert(t); + tokensToUpdateInSystemKeyspace.insert(t); + } else if (endpoint == *current_owner) { + // set state back to normal, since the node may have tried to leave, but failed and is now back up + tokensToUpdateInMetadata.insert(t); + tokensToUpdateInSystemKeyspace.insert(t); + } else if (gossiper.compare_endpoint_startup(endpoint, *current_owner) > 0) { + tokensToUpdateInMetadata.insert(t); + tokensToUpdateInSystemKeyspace.insert(t); +#if 0 + + // currentOwner is no longer current, endpoint is. Keep track of these moves, because when + // a host no longer has any tokens, we'll want to remove it. + Multimap epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading(); + epToTokenCopy.get(currentOwner).remove(token); + if (epToTokenCopy.get(currentOwner).size() < 1) + endpointsToRemove.add(currentOwner); + + logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner", + endpoint, + currentOwner, + token, + endpoint)); +#endif + } else { +#if 0 + logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s", + endpoint, + currentOwner, + token, + endpoint)); +#endif + } + } + + bool is_moving = _token_metadata.is_moving(endpoint); // capture because updateNormalTokens clears moving status + _token_metadata.update_normal_tokens(tokensToUpdateInMetadata, endpoint); + // for (auto ep : endpointsToRemove) { + // removeEndpoint(ep); + // if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep)) + // Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 + // } + if (!tokensToUpdateInSystemKeyspace.empty()) { + // SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace); + } + if (!localTokensToRemove.empty()) { + // SystemKeyspace.updateLocalTokens(Collections.emptyList(), localTokensToRemove); + } + + if (is_moving) { + // _token_metadata.remove_from_moving(endpoint); + // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + // subscriber.onMove(endpoint); + } else { + // for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + // subscriber.onJoinCluster(endpoint); + } + + // PendingRangeCalculatorService.instance.update(); +} + +void storage_service::handle_state_leaving(inet_address endpoint) { +#if 0 + Collection tokens; + tokens = get_tokens_for(endpoint); + + if (logger.isDebugEnabled()) + logger.debug("Node {} state leaving, tokens {}", endpoint, tokens); + + // If the node is previously unknown or tokens do not match, update tokenmetadata to + // have this node as 'normal' (it must have been using this token before the + // leave). This way we'll get pending ranges right. + if (!_token_metadata.isMember(endpoint)) + { + logger.info("Node {} state jump to leaving", endpoint); + _token_metadata.updateNormalTokens(tokens, endpoint); + } + else if (!_token_metadata.getTokens(endpoint).containsAll(tokens)) + { + logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint); + _token_metadata.updateNormalTokens(tokens, endpoint); + } + + // at this point the endpoint is certainly a member with this token, so let's proceed + // normally + _token_metadata.addLeavingEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); +#endif +} + +void storage_service::handle_state_left(inet_address endpoint, std::vector pieces) { +#if 0 + assert pieces.length >= 2; + Collection tokens; + tokens = get_tokens_for(endpoint); + + if (logger.isDebugEnabled()) + logger.debug("Node {} state left, tokens {}", endpoint, tokens); + + excise(tokens, endpoint, extractExpireTime(pieces)); +#endif +} + +void storage_service::handle_state_moving(inet_address endpoint, std::vector pieces) { +#if 0 + assert pieces.length >= 2; + Token token = getPartitioner().getTokenFactory().fromString(pieces[1]); + + if (logger.isDebugEnabled()) + logger.debug("Node {} state moving, new token {}", endpoint, token); + + _token_metadata.addMovingEndpoint(token, endpoint); + + PendingRangeCalculatorService.instance.update(); +#endif +} + +void storage_service::handle_state_removing(inet_address endpoint, std::vector pieces) { +#if 0 + assert (pieces.length > 0); + + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + { + logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?"); + try + { + drain(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + return; + } + if (_token_metadata.isMember(endpoint)) + { + String state = pieces[0]; + Collection removeTokens = _token_metadata.getTokens(endpoint); + + if (VersionedValue.REMOVED_TOKEN.equals(state)) + { + excise(removeTokens, endpoint, extractExpireTime(pieces)); + } + else if (VersionedValue.REMOVING_TOKEN.equals(state)) + { + if (logger.isDebugEnabled()) + logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint); + + // Note that the endpoint is being removed + _token_metadata.addLeavingEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); + + // find the endpoint coordinating this removal that we need to notify when we're done + String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); + UUID hostId = UUID.fromString(coordinator[1]); + // grab any data we are now responsible for and notify responsible node + restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId)); + } + } + else // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it + { + if (VersionedValue.REMOVED_TOKEN.equals(pieces[0])) + addExpireTimeIfFound(endpoint, extractExpireTime(pieces)); + removeEndpoint(endpoint); + } +#endif +} + +void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { + ss_debug("SS::on_join endpoint=%s\n", endpoint); + auto tokens = get_tokens_for(endpoint); + for (auto t : tokens) { + ss_debug("t=%s\n", t); + } + for (auto e : ep_state.get_application_state_map()) { + on_change(endpoint, e.first, e.second); + } + // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); +} + +void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { + ss_debug("SS::on_alive endpoint=%s\n", endpoint); +#if 0 + MigrationManager.instance.scheduleSchemaPull(endpoint, state); + + if (_token_metadata.isMember(endpoint)) + { + HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onUp(endpoint); + } +#endif +} + +void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) { + // no-op +} + +void storage_service::on_change(inet_address endpoint, application_state state, versioned_value value) { + ss_debug("SS::on_change endpoint=%s\n", endpoint); + if (state == application_state::STATUS) { + std::vector pieces; + boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + assert(pieces.size() > 0); + sstring move_name = pieces[0]; + if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { + handle_state_bootstrap(endpoint); + } else if (move_name == sstring(versioned_value::STATUS_NORMAL)) { + handle_state_normal(endpoint); + } else if (move_name == sstring(versioned_value::REMOVING_TOKEN) || + move_name == sstring(versioned_value::REMOVED_TOKEN)) { + handle_state_removing(endpoint, pieces); + } else if (move_name == sstring(versioned_value::STATUS_LEAVING)) { + handle_state_leaving(endpoint); + } else if (move_name == sstring(versioned_value::STATUS_LEFT)) { + handle_state_left(endpoint, pieces); + } else if (move_name == sstring(versioned_value::STATUS_MOVING)) { + handle_state_moving(endpoint, pieces); + } + } else { + auto& gossiper = gms::get_local_gossiper(); + auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint); + if (!ep_state || gossiper.is_dead_state(*ep_state)) { + // logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); + return; + } + + if (state == application_state::RELEASE_VERSION) { + // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value); + } else if (state == application_state::DC) { + // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value); + } else if (state == application_state::RACK) { + // SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value); + } else if (state == application_state::RPC_ADDRESS) { + // try { + // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value)); + // } catch (UnknownHostException e) { + // throw new RuntimeException(e); + // } + } else if (state == application_state::SCHEMA) { + // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); + // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); + } else if (state == application_state::HOST_ID) { + // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); + } + } + replicate_to_all_cores(); +} + + +void storage_service::on_remove(gms::inet_address endpoint) { +#if 0 + _token_metadata.removeEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); +#endif +} + +void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) { +#if 0 + MessagingService.instance().convict(endpoint); + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onDown(endpoint); +#endif +} + +void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { +#if 0 + // If we have restarted before the node was even marked down, we need to reset the connection pool + if (state.isAlive()) + onDead(endpoint, state); +#endif +} + +void storage_service::update_peer_info(gms::inet_address endpoint) { + using namespace gms; + auto& gossiper = gms::get_local_gossiper(); + auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint); + if (!ep_state) { + return; + } + for (auto& entry : ep_state->get_application_state_map()) { + auto& app_state = entry.first; + //auto& value = entry.second.value + if (app_state == application_state::RELEASE_VERSION) { + // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value); + } else if (app_state == application_state::DC) { + // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value); + } else if (app_state == application_state::RACK) { + // SystemKeyspace.updatePeerInfo(endpoint, "rack", value); + } else if (app_state == application_state::RPC_ADDRESS) { + // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value)); + } else if (app_state == application_state::SCHEMA) { + // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value)); + } else if (app_state == application_state::HOST_ID) { + // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value)); + } + } +} + +sstring storage_service::get_application_state_value(inet_address endpoint, application_state appstate) { + auto& gossiper = gms::get_local_gossiper(); + auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint); + if (!eps) { + return {}; + } + auto v = eps->get_application_state(appstate); + if (!v) { + return {}; + } + return v->value; +} + +std::unordered_set storage_service::get_tokens_for(inet_address endpoint) { + auto tokens_string = get_application_state_value(endpoint, application_state::TOKENS); + ss_debug("endpoint=%s, tokens_string=%s\n", endpoint, tokens_string); + std::vector tokens; + std::unordered_set ret; + boost::split(tokens, tokens_string, boost::is_any_of(";")); + for (auto str : tokens) { + ss_debug("token=%s\n", str); + sstring_view sv(str); + bytes b = from_hex(sv); + ret.emplace(token::kind::key, b); + } + return ret; +} + +void storage_service::set_tokens(std::unordered_set tokens) { + // if (logger.isDebugEnabled()) + // logger.debug("Setting tokens to {}", tokens); + // SystemKeyspace.updateTokens(tokens); + for (auto t : tokens) { + _token_metadata.update_normal_token(t, get_broadcast_address()); + } + // Collection localTokens = getLocalTokens(); + auto local_tokens = _bootstrap_tokens; + auto& gossiper = gms::get_local_gossiper(); + gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)); + //setMode(Mode.NORMAL, false); + replicate_to_all_cores(); +} + +future<> storage_service::init_server(int delay) { +#if 0 + logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); + logger.info("Thrift API version: {}", cassandraConstants.VERSION); + logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION); +#endif + _initialized = true; +#if 0 + try + { + // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797. + Class.forName("org.apache.cassandra.service.StorageProxy"); + // also IndexSummaryManager, which is otherwise unreferenced + Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager"); + } + catch (ClassNotFoundException e) + { + throw new AssertionError(e); + } + + if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) + { + logger.info("Loading persisted ring state"); + Multimap loadedTokens = SystemKeyspace.loadTokens(); + Map loadedHostIds = SystemKeyspace.loadHostIds(); + for (InetAddress ep : loadedTokens.keySet()) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + { + // entry has been mistakenly added, delete it + SystemKeyspace.removeEndpoint(ep); + } + else + { + _token_metadata.updateNormalTokens(loadedTokens.get(ep), ep); + if (loadedHostIds.containsKey(ep)) + _token_metadata.update_host_id(loadedHostIds.get(ep), ep); + Gossiper.instance.addSavedEndpoint(ep); + } + } + } + + // daemon threads, like our executors', continue to run while shutdown hooks are invoked + drainOnShutdown = new Thread(new WrappedRunnable() + { + @Override + public void runMayThrow() throws InterruptedException + { + ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); + ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); + if (mutationStage.isShutdown() && counterMutationStage.isShutdown()) + return; // drained already + + if (daemon != null) + shutdownClientServers(); + ScheduledExecutors.optionalTasks.shutdown(); + Gossiper.instance.stop(); + + // In-progress writes originating here could generate hints to be written, so shut down MessagingService + // before mutation stage, so we can get all the hints saved before shutting down + MessagingService.instance().shutdown(); + counterMutationStage.shutdown(); + mutationStage.shutdown(); + counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + StorageProxy.instance.verifyNoHintsInProgress(); + + List> flushes = new ArrayList<>(); + for (Keyspace keyspace : Keyspace.all()) + { + KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName()); + if (!ksm.durableWrites) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + } + try + { + FBUtilities.waitOnFutures(flushes); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + // don't let this stop us from shutting down the commitlog and other thread pools + logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t); + } + + CommitLog.instance.shutdownBlocking(); + + // wait for miscellaneous tasks like sstable and commitlog segment deletion + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) + logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); + } + }, "StorageServiceShutdownHook"); + Runtime.getRuntime().addShutdownHook(drainOnShutdown); +#endif + return prepare_to_join().then([this, delay] { + return join_token_ring(delay); + }); +#if 0 + // Has to be called after the host id has potentially changed in prepareToJoin(). + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + if (cfs.metadata.isCounter()) + cfs.initCounterCache(); + + if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) + { + joinTokenRing(delay); + } + else + { + Collection tokens = SystemKeyspace.getSavedTokens(); + if (!tokens.isEmpty()) + { + _token_metadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); + // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. + List> states = new ArrayList>(); + states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true))); + Gossiper.instance.addLocalApplicationStates(states); + } + logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining"); + } +#endif +} + +void storage_service::replicate_to_all_cores() { + assert(engine().cpu_id() == 0); + // FIXME: There is no back pressure. If the remote cores are slow, and + // replication is called often, it will queue tasks to the semaphore + // without end. + _replicate_task.wait().then([this] { + return _the_storage_service.invoke_on_all([tm = _token_metadata] (storage_service& local_ss) { + if (engine().cpu_id() != 0) { + local_ss._token_metadata = tm; + } + }); + }).then_wrapped([this] (auto&& f) { + try { + _replicate_task.signal(); + f.get(); + } catch (...) { + print("storage_service: Fail to replicate _token_metadata\n"); + } + }); +} + +} // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 6bdaa93013..67c62a8a93 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -32,6 +32,7 @@ #include "core/sleep.hh" #include "gms/application_state.hh" #include "db/system_keyspace.hh" +#include "core/semaphore.hh" namespace service { @@ -64,7 +65,7 @@ class storage_service : public gms::i_endpoint_state_change_subscriber public: static int RING_DELAY; // delay after which we assume ring has stablized - const locator::token_metadata& get_token_metadata() const { + locator::token_metadata& get_token_metadata() { return _token_metadata; } private: @@ -82,7 +83,7 @@ private: } else #endif - return 30 * 1000; + return 5 * 1000; } /* This abstraction maintains the token/endpoint metadata information */ token_metadata _token_metadata; @@ -119,16 +120,19 @@ private: private InetAddress removingNode; +#endif + +private: /* Are we starting this node in bootstrap mode? */ - private boolean isBootstrapMode; + bool _is_bootstrap_mode; /* we bootstrap but do NOT join the ring unless told to do so */ - private boolean isSurveyMode= Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false")); -#endif -private: - bool initialized; + // FIXME: System.getProperty("cassandra.write_survey", "false") + bool _is_survey_mode = false; - bool joined = false; + bool _initialized; + + bool _joined = false; #if 0 /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ @@ -150,33 +154,15 @@ private: #endif private: - std::set _bootstrap_tokens; -#if 0 + std::unordered_set _bootstrap_tokens; - public void finishBootstrapping() - { - isBootstrapMode = false; +public: + void finish_bootstrapping() { + _is_bootstrap_mode = false; } /** This method updates the local token on disk */ - -#endif - - void set_tokens(std::set tokens) { - // if (logger.isDebugEnabled()) - // logger.debug("Setting tokens to {}", tokens); - // SystemKeyspace.updateTokens(tokens); - for (auto t : tokens) { - _token_metadata.update_normal_token(t, get_broadcast_address()); - } - // Collection localTokens = getLocalTokens(); - auto local_tokens = _bootstrap_tokens; - auto& gossiper = gms::get_local_gossiper(); - gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)); - gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(local_tokens)); - //setMode(Mode.NORMAL, false); - } - + void set_tokens(std::unordered_set tokens); #if 0 public void registerDaemon(CassandraDaemon daemon) @@ -197,22 +183,22 @@ private: // should only be called via JMX public void stopGossiping() { - if (initialized) + if (_initialized) { logger.warn("Stopping gossip by operator request"); Gossiper.instance.stop(); - initialized = false; + _initialized = false; } } // should only be called via JMX public void startGossiping() { - if (!initialized) + if (!_initialized) { logger.warn("Starting gossip by operator request"); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); - initialized = true; + _initialized = true; } } @@ -321,11 +307,12 @@ private: Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); StageManager.shutdownNow(); } - - public boolean isInitialized() - { - return initialized; +#endif +public: + bool is_initialized() { + return _initialized; } +#if 0 public void stopDaemon() { @@ -397,7 +384,7 @@ private: // for testing only public void unsafeInitialize() throws ConfigurationException { - initialized = true; + _initialized = true; Gossiper.instance.register(this); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering. Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); @@ -410,133 +397,7 @@ public: return init_server(RING_DELAY); } - future<> init_server(int delay) { -#if 0 - logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString()); - logger.info("Thrift API version: {}", cassandraConstants.VERSION); - logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION); -#endif - initialized = true; -#if 0 - try - { - // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797. - Class.forName("org.apache.cassandra.service.StorageProxy"); - // also IndexSummaryManager, which is otherwise unreferenced - Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager"); - } - catch (ClassNotFoundException e) - { - throw new AssertionError(e); - } - - if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) - { - logger.info("Loading persisted ring state"); - Multimap loadedTokens = SystemKeyspace.loadTokens(); - Map loadedHostIds = SystemKeyspace.loadHostIds(); - for (InetAddress ep : loadedTokens.keySet()) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - { - // entry has been mistakenly added, delete it - SystemKeyspace.removeEndpoint(ep); - } - else - { - _token_metadata.updateNormalTokens(loadedTokens.get(ep), ep); - if (loadedHostIds.containsKey(ep)) - _token_metadata.update_host_id(loadedHostIds.get(ep), ep); - Gossiper.instance.addSavedEndpoint(ep); - } - } - } - - // daemon threads, like our executors', continue to run while shutdown hooks are invoked - drainOnShutdown = new Thread(new WrappedRunnable() - { - @Override - public void runMayThrow() throws InterruptedException - { - ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); - ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isShutdown() && counterMutationStage.isShutdown()) - return; // drained already - - if (daemon != null) - shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); - Gossiper.instance.stop(); - - // In-progress writes originating here could generate hints to be written, so shut down MessagingService - // before mutation stage, so we can get all the hints saved before shutting down - MessagingService.instance().shutdown(); - counterMutationStage.shutdown(); - mutationStage.shutdown(); - counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - mutationStage.awaitTermination(3600, TimeUnit.SECONDS); - StorageProxy.instance.verifyNoHintsInProgress(); - - List> flushes = new ArrayList<>(); - for (Keyspace keyspace : Keyspace.all()) - { - KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName()); - if (!ksm.durableWrites) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - } - try - { - FBUtilities.waitOnFutures(flushes); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - // don't let this stop us from shutting down the commitlog and other thread pools - logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t); - } - - CommitLog.instance.shutdownBlocking(); - - // wait for miscellaneous tasks like sstable and commitlog segment deletion - ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); - } - }, "StorageServiceShutdownHook"); - Runtime.getRuntime().addShutdownHook(drainOnShutdown); -#endif - return prepare_to_join().then([this, delay] { - return join_token_ring(delay); - }); -#if 0 - // Has to be called after the host id has potentially changed in prepareToJoin(). - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - if (cfs.metadata.isCounter()) - cfs.initCounterCache(); - - if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) - { - joinTokenRing(delay); - } - else - { - Collection tokens = SystemKeyspace.getSavedTokens(); - if (!tokens.isEmpty()) - { - _token_metadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); - // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. - List> states = new ArrayList>(); - states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); - states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true))); - Gossiper.instance.addLocalApplicationStates(states); - } - logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining"); - } -#endif - } + future<> init_server(int delay); #if 0 /** * In the event of forceful termination we need to remove the shutdown hook to prevent hanging (OOM for instance) @@ -560,38 +421,13 @@ private: Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc)); Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack)); } - - public synchronized void joinRing() throws IOException - { - if (!joined) - { - logger.info("Joining ring by operator request"); - try - { - joinTokenRing(0); - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } - } - else if (isSurveyMode) - { - set_tokens(SystemKeyspace.getSavedTokens()); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); - isSurveyMode = false; - logger.info("Leaving write survey mode and joining ring at operator request"); - assert _token_metadata.sortedTokens().size() > 0; - - Auth.setup(); - } +#endif +public: + void join_ring(); + bool is_joined() { + return _joined; } - - public boolean isJoined() - { - return joined; - } - +#if 0 public void rebuild(String sourceDc) { logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); @@ -667,13 +503,14 @@ private: } #endif - future<> bootstrap(std::set tokens); -#if 0 - public boolean isBootstrapMode() - { - return isBootstrapMode; + future<> bootstrap(std::unordered_set tokens); + + bool is_bootstrap_mode() { + return _is_bootstrap_mode; } +#if 0 + public TokenMetadata getTokenMetadata() { return _token_metadata; @@ -961,11 +798,8 @@ private: } #endif public: - void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override - { - // no-op - } - + virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override; /* * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update @@ -998,159 +832,24 @@ public: * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that * you should never bootstrap a new node during a removenode, decommission or move. */ - void on_change(inet_address endpoint, application_state state, versioned_value value) override { - ss_debug("SS::on_change endpoint=%s\n", endpoint); - if (state == application_state::STATUS) { - std::vector pieces; - boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); - assert(pieces.size() > 0); - sstring move_name = pieces[0]; - if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { - handle_state_bootstrap(endpoint); - } else if (move_name == sstring(versioned_value::STATUS_NORMAL)) { - handle_state_normal(endpoint); - } else if (move_name == sstring(versioned_value::REMOVING_TOKEN) || - move_name == sstring(versioned_value::REMOVED_TOKEN)) { - handle_state_removing(endpoint, pieces); - } else if (move_name == sstring(versioned_value::STATUS_LEAVING)) { - handle_state_leaving(endpoint); - } else if (move_name == sstring(versioned_value::STATUS_LEFT)) { - handle_state_left(endpoint, pieces); - } else if (move_name == sstring(versioned_value::STATUS_MOVING)) { - handle_state_moving(endpoint, pieces); - } - } else { - auto& gossiper = gms::get_local_gossiper(); - auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint); - if (!ep_state || gossiper.is_dead_state(*ep_state)) { - // logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); - return; - } - - if (state == application_state::RELEASE_VERSION) { - // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value); - } else if (state == application_state::DC) { - // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value); - } else if (state == application_state::RACK) { - // SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value); - } else if (state == application_state::RPC_ADDRESS) { - // try { - // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value)); - // } catch (UnknownHostException e) { - // throw new RuntimeException(e); - // } - } else if (state == application_state::SCHEMA) { - // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); - // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); - } else if (state == application_state::HOST_ID) { - // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); - } - } - } - -#if 0 - private void updatePeerInfo(InetAddress endpoint) - { - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - for (Map.Entry entry : epState.getApplicationStateMap().entrySet()) - { - switch (entry.getKey()) - { - case RELEASE_VERSION: - SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value); - break; - case DC: - SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value); - break; - case RACK: - SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value); - break; - case RPC_ADDRESS: - try - { - SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value)); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - break; - case SCHEMA: - SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value)); - break; - case HOST_ID: - SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value)); - break; - } - } - } -#endif + virtual void on_change(inet_address endpoint, application_state state, versioned_value value) override; + virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual void on_remove(gms::inet_address endpoint) override; + virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; private: - sstring get_application_state_value(inet_address endpoint, application_state appstate) { - auto& gossiper = gms::get_local_gossiper(); - auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint); - if (!eps) { - return {}; - } - auto v = eps->get_application_state(appstate); - if (!v) { - return {}; - } - return v->value; - } - - std::set get_tokens_for(inet_address endpoint) { - auto tokens_string = get_application_state_value(endpoint, application_state::TOKENS); - ss_debug("endpoint=%s, tokens_string=%s\n", endpoint, tokens_string); - std::vector tokens; - std::set ret; - boost::split(tokens, tokens_string, boost::is_any_of(";")); - for (auto str : tokens) { - ss_debug("token=%s\n", str); - sstring_view sv(str); - bytes b = from_hex(sv); - ret.emplace(token::kind::key, b); - } - return ret; - } - + void update_peer_info(inet_address endpoint); + sstring get_application_state_value(inet_address endpoint, application_state appstate); + std::unordered_set get_tokens_for(inet_address endpoint); + void replicate_to_all_cores(); + semaphore _replicate_task{1}; private: /** * Handle node bootstrap * * @param endpoint bootstrapping node */ - void handle_state_bootstrap(inet_address endpoint) { -#if 0 - Collection tokens; - // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified - tokens = get_tokens_for(endpoint); - - if (logger.isDebugEnabled()) - logger.debug("Node {} state bootstrapping, token {}", endpoint, tokens); - - // if this node is present in token metadata, either we have missed intermediate states - // or the node had crashed. Print warning if needed, clear obsolete stuff and - // continue. - if (_token_metadata.isMember(endpoint)) - { - // If isLeaving is false, we have missed both LEAVING and LEFT. However, if - // isLeaving is true, we have only missed LEFT. Waiting time between completing - // leave operation and rebootstrapping is relatively short, so the latter is quite - // common (not enough time for gossip to spread). Therefore we report only the - // former in the log. - if (!_token_metadata.isLeaving(endpoint)) - logger.info("Node {} state jump to bootstrap", endpoint); - _token_metadata.removeEndpoint(endpoint); - } - - _token_metadata.addBootstrapTokens(tokens, endpoint); - PendingRangeCalculatorService.instance.update(); - - if (Gossiper.instance.usesHostId(endpoint)) - _token_metadata.update_host_id(Gossiper.instance.getHostId(endpoint), endpoint); -#endif - } + void handle_state_bootstrap(inet_address endpoint); /** * Handle node move to normal state. That is, node is entering token ring and participating @@ -1158,167 +857,14 @@ private: * * @param endpoint node */ - void handle_state_normal(inet_address endpoint) { -#if 0 - Collection tokens; - - tokens = get_tokens_for(endpoint); - - Set tokensToUpdateInMetadata = new HashSet<>(); - Set tokensToUpdateInSystemKeyspace = new HashSet<>(); - Set localTokensToRemove = new HashSet<>(); - Set endpointsToRemove = new HashSet<>(); - - - if (logger.isDebugEnabled()) - logger.debug("Node {} state normal, token {}", endpoint, tokens); - - if (_token_metadata.isMember(endpoint)) - logger.info("Node {} state jump to normal", endpoint); - - updatePeerInfo(endpoint); - // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). - if (Gossiper.instance.usesHostId(endpoint)) - { - UUID hostId = Gossiper.instance.getHostId(endpoint); - InetAddress existing = _token_metadata.getEndpointForHostId(hostId); - if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) - logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); - else - { - if (existing != null && !existing.equals(endpoint)) - { - if (existing.equals(FBUtilities.getBroadcastAddress())) - { - logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint); - _token_metadata.removeEndpoint(endpoint); - endpointsToRemove.add(endpoint); - } - else if (Gossiper.instance.compareEndpointStartup(endpoint, existing) > 0) - { - logger.warn("Host ID collision for {} between {} and {}; {} is the new owner", hostId, existing, endpoint, endpoint); - _token_metadata.removeEndpoint(existing); - endpointsToRemove.add(existing); - _token_metadata.update_host_id(hostId, endpoint); - } - else - { - logger.warn("Host ID collision for {} between {} and {}; ignored {}", hostId, existing, endpoint, endpoint); - _token_metadata.removeEndpoint(endpoint); - endpointsToRemove.add(endpoint); - } - } - else - _token_metadata.update_host_id(hostId, endpoint); - } - } - - for (final Token token : tokens) - { - // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. - InetAddress currentOwner = _token_metadata.getEndpoint(token); - if (currentOwner == null) - { - logger.debug("New node {} at token {}", endpoint, token); - tokensToUpdateInMetadata.add(token); - tokensToUpdateInSystemKeyspace.add(token); - } - else if (endpoint.equals(currentOwner)) - { - // set state back to normal, since the node may have tried to leave, but failed and is now back up - tokensToUpdateInMetadata.add(token); - tokensToUpdateInSystemKeyspace.add(token); - } - else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0) - { - tokensToUpdateInMetadata.add(token); - tokensToUpdateInSystemKeyspace.add(token); - - // currentOwner is no longer current, endpoint is. Keep track of these moves, because when - // a host no longer has any tokens, we'll want to remove it. - Multimap epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading(); - epToTokenCopy.get(currentOwner).remove(token); - if (epToTokenCopy.get(currentOwner).size() < 1) - endpointsToRemove.add(currentOwner); - - logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner", - endpoint, - currentOwner, - token, - endpoint)); - } - else - { - logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s", - endpoint, - currentOwner, - token, - endpoint)); - } - } - - boolean isMoving = _token_metadata.isMoving(endpoint); // capture because updateNormalTokens clears moving status - _token_metadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint); - for (InetAddress ep : endpointsToRemove) - { - removeEndpoint(ep); - if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep)) - Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 - } - if (!tokensToUpdateInSystemKeyspace.isEmpty()) - SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace); - if (!localTokensToRemove.isEmpty()) - SystemKeyspace.updateLocalTokens(Collections.emptyList(), localTokensToRemove); - - if (isMoving) - { - _token_metadata.removeFromMoving(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onMove(endpoint); - } - else - { - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onJoinCluster(endpoint); - } - - PendingRangeCalculatorService.instance.update(); -#endif - } + void handle_state_normal(inet_address endpoint); /** * Handle node preparing to leave the ring * * @param endpoint node */ - void handle_state_leaving(inet_address endpoint) { -#if 0 - Collection tokens; - tokens = get_tokens_for(endpoint); - - if (logger.isDebugEnabled()) - logger.debug("Node {} state leaving, tokens {}", endpoint, tokens); - - // If the node is previously unknown or tokens do not match, update tokenmetadata to - // have this node as 'normal' (it must have been using this token before the - // leave). This way we'll get pending ranges right. - if (!_token_metadata.isMember(endpoint)) - { - logger.info("Node {} state jump to leaving", endpoint); - _token_metadata.updateNormalTokens(tokens, endpoint); - } - else if (!_token_metadata.getTokens(endpoint).containsAll(tokens)) - { - logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint); - _token_metadata.updateNormalTokens(tokens, endpoint); - } - - // at this point the endpoint is certainly a member with this token, so let's proceed - // normally - _token_metadata.addLeavingEndpoint(endpoint); - PendingRangeCalculatorService.instance.update(); -#endif - } + void handle_state_leaving(inet_address endpoint); /** * Handle node leaving the ring. This will happen when a node is decommissioned @@ -1326,18 +872,7 @@ private: * @param endpoint If reason for leaving is decommission, endpoint is the leaving node. * @param pieces STATE_LEFT,token */ - void handle_state_left(inet_address endpoint, std::vector pieces) { -#if 0 - assert pieces.length >= 2; - Collection tokens; - tokens = get_tokens_for(endpoint); - - if (logger.isDebugEnabled()) - logger.debug("Node {} state left, tokens {}", endpoint, tokens); - - excise(tokens, endpoint, extractExpireTime(pieces)); -#endif - } + void handle_state_left(inet_address endpoint, std::vector pieces); /** * Handle node moving inside the ring. @@ -1345,19 +880,7 @@ private: * @param endpoint moving endpoint address * @param pieces STATE_MOVING, token */ - void handle_state_moving(inet_address endpoint, std::vector pieces) { -#if 0 - assert pieces.length >= 2; - Token token = getPartitioner().getTokenFactory().fromString(pieces[1]); - - if (logger.isDebugEnabled()) - logger.debug("Node {} state moving, new token {}", endpoint, token); - - _token_metadata.addMovingEndpoint(token, endpoint); - - PendingRangeCalculatorService.instance.update(); -#endif - } + void handle_state_moving(inet_address endpoint, std::vector pieces); /** * Handle notification that a node being actively removed from the ring via 'removenode' @@ -1365,59 +888,9 @@ private: * @param endpoint node * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) */ - void handle_state_removing(inet_address endpoint, std::vector pieces) { -#if 0 - assert (pieces.length > 0); - - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - { - logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?"); - try - { - drain(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - return; - } - if (_token_metadata.isMember(endpoint)) - { - String state = pieces[0]; - Collection removeTokens = _token_metadata.getTokens(endpoint); - - if (VersionedValue.REMOVED_TOKEN.equals(state)) - { - excise(removeTokens, endpoint, extractExpireTime(pieces)); - } - else if (VersionedValue.REMOVING_TOKEN.equals(state)) - { - if (logger.isDebugEnabled()) - logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint); - - // Note that the endpoint is being removed - _token_metadata.addLeavingEndpoint(endpoint); - PendingRangeCalculatorService.instance.update(); - - // find the endpoint coordinating this removal that we need to notify when we're done - String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); - UUID hostId = UUID.fromString(coordinator[1]); - // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId)); - } - } - else // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it - { - if (VersionedValue.REMOVED_TOKEN.equals(pieces[0])) - addExpireTimeIfFound(endpoint, extractExpireTime(pieces)); - removeEndpoint(endpoint); - } -#endif - } + void handle_state_removing(inet_address endpoint, std::vector pieces); #if 0 - private void excise(Collection tokens, InetAddress endpoint) { logger.info("Removing tokens {} for {}", tokens, endpoint); @@ -1625,61 +1098,7 @@ private: return changedRanges; } -#endif -public: - void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override { - ss_debug("SS::on_join endpoint=%s\n", endpoint); - auto tokens = get_tokens_for(endpoint); - for (auto t : tokens) { - ss_debug("t=%s\n", t); - } - for (auto e : ep_state.get_application_state_map()) { - on_change(endpoint, e.first, e.second); - } - // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); - } - void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override - { - ss_debug("SS::on_alive endpoint=%s\n", endpoint); -#if 0 - MigrationManager.instance.scheduleSchemaPull(endpoint, state); - - if (_token_metadata.isMember(endpoint)) - { - HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onUp(endpoint); - } -#endif - } - - void on_remove(gms::inet_address endpoint) override - { -#if 0 - _token_metadata.removeEndpoint(endpoint); - PendingRangeCalculatorService.instance.update(); -#endif - } - - void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override - { -#if 0 - MessagingService.instance().convict(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onDown(endpoint); -#endif - } - - void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override - { -#if 0 - // If we have restarted before the node was even marked down, we need to reset the connection pool - if (state.isAlive()) - onDead(endpoint, state); -#endif - } -#if 0 /** raw load value */ public double getLoad() { diff --git a/tests/urchin/gossip.cc b/tests/urchin/gossip.cc index 92d8c17419..c3be6c49e6 100644 --- a/tests/urchin/gossip.cc +++ b/tests/urchin/gossip.cc @@ -20,7 +20,7 @@ int main(int ac, char ** av) { auto port = server.port(); auto listen = server.listen_address(); print("Messaging server listening on ip %s port %d ...\n", listen, port); - gms::get_failure_detector().start_single().then([config] { + gms::get_failure_detector().start().then([config] { gms::get_gossiper().start_single().then([config] { std::set seeds; for (auto s : config["seed"].as>()) { diff --git a/tests/urchin/gossip_test.cc b/tests/urchin/gossip_test.cc index b0928b8c78..acf5ec2d11 100644 --- a/tests/urchin/gossip_test.cc +++ b/tests/urchin/gossip_test.cc @@ -10,7 +10,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ return net::get_messaging_service().start(gms::inet_address("127.0.0.1")).then( [] () { - return gms::get_failure_detector().start_single().then([] { + return gms::get_failure_detector().start().then([] { return gms::get_gossiper().start_single().then([] { return gms::get_gossiper().stop().then( [] (){ return gms::get_failure_detector().stop().then( [] (){ diff --git a/utils/UUID.hh b/utils/UUID.hh index cea34ef552..d700be90ec 100644 --- a/utils/UUID.hh +++ b/utils/UUID.hh @@ -28,6 +28,7 @@ public: UUID() : most_sig_bits(0), least_sig_bits(0) {} UUID(int64_t most_sig_bits, int64_t least_sig_bits) : most_sig_bits(most_sig_bits), least_sig_bits(least_sig_bits) {} + explicit UUID(const sstring& uuid_string); int64_t get_most_significant_bits() const { return most_sig_bits; diff --git a/utils/uuid.cc b/utils/uuid.cc index aa2e115936..e1218832e4 100644 --- a/utils/uuid.cc +++ b/utils/uuid.cc @@ -7,6 +7,9 @@ #include "net/byteorder.hh" #include #include +#include +#include +#include "core/sstring.hh" namespace utils { @@ -35,4 +38,16 @@ std::ostream& operator<<(std::ostream& out, const UUID& uuid) { return out << uuid.to_sstring(); } +UUID::UUID(const sstring& uuid) { + auto uuid_string = uuid; + boost::erase_all(uuid_string, "-"); + auto size = uuid_string.size() / 2; + assert(size == 16); + sstring most = sstring(uuid_string.begin(), uuid_string.begin() + size); + sstring least = sstring(uuid_string.begin() + size, uuid_string.end()); + int base = 16; + this->most_sig_bits = std::stoull(most, nullptr, base); + this->least_sig_bits = std::stoull(least, nullptr, base); +} + }