range_streamer: Implement get_all_ranges_with_strict_sources_for

This commit is contained in:
Asias He
2015-10-12 11:28:28 +08:00
parent 84de936e43
commit d47ea88aa8
2 changed files with 67 additions and 56 deletions

View File

@@ -40,6 +40,7 @@
#include "utils/fb_utilities.hh"
#include "locator/snitch_base.hh"
#include "database.hh"
#include "gms/gossiper.hh"
namespace dht {
@@ -137,4 +138,68 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st
return range_sources;
}
std::unordered_multimap<range<token>, inet_address>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
assert (_tokens.empty() == false);
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
//Active ranges
auto metadata_clone = _metadata.clone_only_token_map();
auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone));
//Pending ranges
metadata_clone.update_normal_tokens(_tokens, _address);
auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone));
//Collects the source that will have its range moved to the new node
std::unordered_multimap<range<token>, inet_address> range_sources;
for (auto& desired_range : desired_ranges) {
for (auto& x : range_addresses) {
const range<token>& src_range = x.first;
if (src_range.contains(desired_range, dht::tri_compare)) {
auto old_endpoints = x.second;
auto it = pending_range_addresses.find(desired_range);
assert (it != pending_range_addresses.end());
auto new_endpoints = it->second;
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (old_endpoints.size() == strat.get_replication_factor()) {
std::unordered_set<inet_address> diff;
std::set_difference(old_endpoints.begin(), old_endpoints.end(),
new_endpoints.begin(), new_endpoints.end(), std::inserter(diff, diff.begin()));
old_endpoints = std::move(diff);
if (old_endpoints.size() != 1) {
throw std::runtime_error(sprint("Expected 1 endpoint but found ", old_endpoints.size()));
}
}
range_sources.emplace(desired_range, *(old_endpoints.begin()));
}
}
//Validate
auto nr = range_sources.count(desired_range);
if (nr < 1) {
throw std::runtime_error(sprint("No sources found for %s", desired_range));
}
if (nr > 1) {
throw std::runtime_error(sprint("Multiple endpoints found for %s", desired_range));
}
inet_address source_ip = range_sources.find(desired_range)->second;
auto& gossiper = gms::get_local_gossiper();
auto source_state = gossiper.get_endpoint_state_for_endpoint(source_ip);
if (gossiper.is_enabled() && source_state && !source_state->is_alive()) {
throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false", source_ip));
}
}
return range_sources;
}
} // dht

View File

@@ -163,67 +163,13 @@ private:
*/
std::unordered_multimap<range<token>, inet_address>
get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges);
#if 0
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*/
private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges)
{
assert tokens != null;
AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy();
//Active ranges
TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone);
//Pending ranges
metadataClone.updateNormalTokens(tokens, address);
Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
//Collects the source that will have its range moved to the new node
Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
for (Range<Token> desiredRange : desiredRanges)
{
for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : addressRanges.asMap().entrySet())
{
if (preEntry.getKey().contains(desiredRange))
{
Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue());
Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (oldEndpoints.size() == strat.getReplicationFactor())
{
oldEndpoints.removeAll(newEndpoints);
assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
}
rangeSources.put(desiredRange, oldEndpoints.iterator().next());
}
}
//Validate
Collection<InetAddress> addressList = rangeSources.get(desiredRange);
if (addressList == null || addressList.isEmpty())
throw new IllegalStateException("No sources found for " + desiredRange);
if (addressList.size() > 1)
throw new IllegalStateException("Multiple endpoints found for " + desiredRange);
InetAddress sourceIp = addressList.iterator().next();
EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive()))
throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
}
return rangeSources;
}
#endif
std::unordered_multimap<range<token>, inet_address>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges);
private:
/**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)