From c96bc8bbd2645cd9382e60517f2c895511bbbebf Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Oct 2015 16:08:24 +0800 Subject: [PATCH] token_metadata: Implement calculate_pending_ranges --- locator/abstract_replication_strategy.hh | 1 + locator/token_metadata.cc | 102 +++++++++++++++++++++++ locator/token_metadata.hh | 87 +------------------ 3 files changed, 106 insertions(+), 84 deletions(-) diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 3c01a59b83..5ce4f20c65 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -80,6 +80,7 @@ protected: void validate_replication_factor(sstring rf) const; virtual std::vector calculate_natural_endpoints(const token& search_token, token_metadata& tm) const = 0; + friend token_metadata; public: abstract_replication_strategy( diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 4c3a8929e6..fa4c08c89f 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -23,9 +23,15 @@ #include "token_metadata.hh" #include #include "locator/snitch_base.hh" +#include "locator/abstract_replication_strategy.hh" +#include "log.hh" +#include +#include namespace locator { +static logging::logger logger("token_metadata"); + template static void remove_by_value(C& container, V value) { for (auto it = container.begin(); it != container.end();) { @@ -362,6 +368,102 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint) return ret; } +void token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) { + std::unordered_multimap, inet_address> new_pending_ranges; + + if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) { + logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name); + _pending_ranges[keyspace_name] = std::move(new_pending_ranges); + return; + } + + std::unordered_multimap> address_ranges = strategy.get_address_ranges(*this); + + // FIMXE + // Copy of metadata reflecting the situation after all leave operations are finished. + auto all_left_metadata = clone_after_all_left(); + + // get all ranges that will be affected by leaving nodes + std::unordered_set> affected_ranges; + for (auto endpoint : _leaving_endpoints) { + auto r = address_ranges.equal_range(endpoint); + for (auto x = r.first; x != r.second; x++) { + affected_ranges.emplace(x->second); + } + } + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + auto metadata = clone_only_token_map(); // don't do this in the loop! #7758 + for (const auto& r : affected_ranges) { + auto t = r.end()->value(); + auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata); + auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata); + std::vector diff; + std::set_difference(new_endpoints.begin(), new_endpoints.end(), + current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); + for (auto& ep : diff) { + new_pending_ranges.emplace(r, ep); + } + } + + // At this stage newPendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + std::unordered_multimap bootstrap_addresses; + for (auto& x : _bootstrap_tokens) { + bootstrap_addresses.emplace(x.second, x.first); + } + + // TODO: share code with unordered_multimap_to_unordered_map + std::unordered_map> tmp; + for (auto& x : bootstrap_addresses) { + auto& addr = x.first; + auto& t = x.second; + tmp[addr].insert(t); + } + for (auto& x : tmp) { + auto& endpoint = x.first; + auto& tokens = x.second; + all_left_metadata.update_normal_tokens(tokens, endpoint); + for (auto& x : strategy.get_address_ranges(all_left_metadata)) { + if (x.first == endpoint) { + new_pending_ranges.emplace(x.second, endpoint); + } + } + all_left_metadata.remove_endpoint(endpoint); + } + + // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving nodes. + + // For each of the moving nodes, we do the same thing we did for bootstrapping: + // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. + for (auto& moving : _moving_endpoints) { + auto& t = moving.first; + auto& endpoint = moving.second; // address of the moving node + + // moving.left is a new token of the endpoint + all_left_metadata.update_normal_token(t, endpoint); + + for (auto& x : strategy.get_address_ranges(all_left_metadata)) { + if (x.first == endpoint) { + new_pending_ranges.emplace(x.second, endpoint); + } + } + + all_left_metadata.remove_endpoint(endpoint); + } + + _pending_ranges[keyspace_name] = std::move(new_pending_ranges); + + if (logger.is_enabled(logging::log_level::debug)) { + // TODO: Enable printPendingRanges + // logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "" : printPendingRanges())); + } +} + /////////////////// class topology ///////////////////////////////////////////// inline void topology::clear() { _dc_endpoints.clear(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index b673e6870b..a243c32793 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -54,6 +54,8 @@ class keyspace; namespace locator { +class abstract_replication_strategy; + using inet_address = gms::inet_address; using token = dht::token; @@ -654,7 +656,6 @@ public: std::unordered_map, std::unordered_set> get_pending_ranges(sstring keyspace_name); std::vector> get_pending_ranges(sstring keyspace_name, inet_address endpoint); -#if 0 /** * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: * @@ -678,89 +679,7 @@ public: * NOTE: This is heavy and ineffective operation. This will be done only once when a node * changes state in the cluster, so it should be manageable. */ - public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) - { - lock.readLock().lock(); - try - { - Multimap, InetAddress> newPendingRanges = HashMultimap.create(); - - if (_bootstrap_tokens.isEmpty() && _leaving_endpoints.isEmpty() && _moving_endpoints.isEmpty()) - { - if (logger.isDebugEnabled()) - logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - - _pending_ranges.put(keyspaceName, newPendingRanges); - return; - } - - Multimap> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = cloneAfterAllLeft(); - - // get all ranges that will be affected by leaving nodes - Set> affectedRanges = new HashSet>(); - for (InetAddress endpoint : _leaving_endpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758 - for (Range range : affectedRanges) - { - Set currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - newPendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); - } - - // At this stage newPendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - Multimap bootstrapAddresses = _bootstrap_tokens.inverse(); - for (InetAddress endpoint : bootstrapAddresses.keySet()) - { - Collection tokens = bootstrapAddresses.get(endpoint); - - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - newPendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } - - // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair moving : _moving_endpoints) - { - InetAddress endpoint = moving.right; // address of the moving node - - // moving.left is a new token of the endpoint - allLeftMetadata.updateNormalToken(moving.left, endpoint); - - for (Range range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - newPendingRanges.put(range, endpoint); - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - _pending_ranges.put(keyspaceName, newPendingRanges); - - if (logger.isDebugEnabled()) - logger.debug("Pending ranges:\n{}", (_pending_ranges.isEmpty() ? "" : printPendingRanges())); - } - finally - { - lock.readLock().unlock(); - } - } -#endif + void calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name); public: token get_predecessor(token t);