diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index ae20d0a096..d95eb57505 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -40,6 +40,7 @@ #include "utils/fb_utilities.hh" #include "locator/snitch_base.hh" #include "database.hh" +#include "gms/gossiper.hh" namespace dht { @@ -137,4 +138,68 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st return range_sources; } +std::unordered_multimap, inet_address> +range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { + logger.debug("{} ks={}", __func__, keyspace_name); + assert (_tokens.empty() == false); + + auto& ks = _db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + + //Active ranges + auto metadata_clone = _metadata.clone_only_token_map(); + auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone)); + + //Pending ranges + metadata_clone.update_normal_tokens(_tokens, _address); + auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone)); + + //Collects the source that will have its range moved to the new node + std::unordered_multimap, inet_address> range_sources; + + for (auto& desired_range : desired_ranges) { + for (auto& x : range_addresses) { + const range& src_range = x.first; + if (src_range.contains(desired_range, dht::tri_compare)) { + auto old_endpoints = x.second; + auto it = pending_range_addresses.find(desired_range); + assert (it != pending_range_addresses.end()); + auto new_endpoints = it->second; + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (old_endpoints.size() == strat.get_replication_factor()) { + std::unordered_set diff; + std::set_difference(old_endpoints.begin(), old_endpoints.end(), + new_endpoints.begin(), new_endpoints.end(), std::inserter(diff, diff.begin())); + old_endpoints = std::move(diff); + if (old_endpoints.size() != 1) { + throw std::runtime_error(sprint("Expected 1 endpoint but found ", old_endpoints.size())); + } + } + range_sources.emplace(desired_range, *(old_endpoints.begin())); + } + } + + //Validate + auto nr = range_sources.count(desired_range); + if (nr < 1) { + throw std::runtime_error(sprint("No sources found for %s", desired_range)); + } + + if (nr > 1) { + throw std::runtime_error(sprint("Multiple endpoints found for %s", desired_range)); + } + + inet_address source_ip = range_sources.find(desired_range)->second; + auto& gossiper = gms::get_local_gossiper(); + auto source_state = gossiper.get_endpoint_state_for_endpoint(source_ip); + if (gossiper.is_enabled() && source_state && !source_state->is_alive()) { + throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false", source_ip)); + } + } + + return range_sources; +} + } // dht diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index 6903221f49..71093ab18d 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -163,67 +163,13 @@ private: */ std::unordered_multimap, inet_address> get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); -#if 0 /** * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. */ - private Multimap, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection> desiredRanges) - { - - assert tokens != null; - AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy(); - - //Active ranges - TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); - Multimap,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone); - - //Pending ranges - metadataClone.updateNormalTokens(tokens, address); - Multimap,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); - - //Collects the source that will have its range moved to the new node - Multimap, InetAddress> rangeSources = ArrayListMultimap.create(); - - for (Range desiredRange : desiredRanges) - { - for (Map.Entry, Collection> preEntry : addressRanges.asMap().entrySet()) - { - if (preEntry.getKey().contains(desiredRange)) - { - Set oldEndpoints = Sets.newHashSet(preEntry.getValue()); - Set newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); - - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strat.getReplicationFactor()) - { - oldEndpoints.removeAll(newEndpoints); - assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); - } - - rangeSources.put(desiredRange, oldEndpoints.iterator().next()); - } - } - - //Validate - Collection addressList = rangeSources.get(desiredRange); - if (addressList == null || addressList.isEmpty()) - throw new IllegalStateException("No sources found for " + desiredRange); - - if (addressList.size() > 1) - throw new IllegalStateException("Multiple endpoints found for " + desiredRange); - - InetAddress sourceIp = addressList.iterator().next(); - EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp); - if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive())) - throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); - } - - return rangeSources; - } -#endif + std::unordered_multimap, inet_address> + get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); private: /** * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)