diff --git a/service/storage_service.cc b/service/storage_service.cc index e40fc92eec..57523a5811 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1926,4 +1926,43 @@ void storage_service::leave_ring() { sleep(delay).get(); } +future +storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { + using stream_plan = streaming::stream_plan; + // First, we build a list of ranges to stream to each host, per table + std::unordered_map>>> sessions_to_stream_by_keyspace; + for (auto& entry : ranges_to_stream_by_keyspace) { + const auto& keyspace = entry.first; + auto& ranges_with_endpoints = entry.second; + + if (ranges_with_endpoints.empty()) { + continue; + } + + std::unordered_map>> ranges_per_endpoint; + for (auto& end_point_entry : ranges_with_endpoints) { + range r = end_point_entry.first; + inet_address endpoint = end_point_entry.second; + ranges_per_endpoint[endpoint].emplace_back(r); + } + sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint)); + } + stream_plan sp("Unbootstrap", true); + for (auto& entry : sessions_to_stream_by_keyspace) { + const auto& keyspace_name = entry.first; + // TODO: we can move to avoid copy of std::vector + auto& ranges_per_endpoint = entry.second; + + for (auto& ranges_entry : ranges_per_endpoint) { + auto& ranges = ranges_entry.second; + auto new_endpoint = ranges_entry.first; + auto preferred = new_endpoint; // FIXME: SystemKeyspace.getPreferredIP(newEndpoint); + + // TODO each call to transferRanges re-flushes, this is potentially a lot of waste + sp.transfer_ranges(new_endpoint, preferred, keyspace_name, ranges); + } + } + return sp.execute(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 9176f102fd..d183d1c5b2 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -54,6 +54,7 @@ #include "utils/fb_utilities.hh" #include "database.hh" #include +#include "streaming/stream_state.hh" namespace service { @@ -2401,62 +2402,18 @@ public: if (oldSnitch instanceof DynamicEndpointSnitch) ((DynamicEndpointSnitch)oldSnitch).unregisterMBean(); } +#endif +private: /** * Seed data to the endpoints that will be responsible for it at the future * * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - private Future streamRanges(Map, InetAddress>> rangesToStreamByKeyspace) - { - // First, we build a list of ranges to stream to each host, per table - Map>>> sessionsToStreamByKeyspace = new HashMap<>(); - for (Map.Entry, InetAddress>> entry : rangesToStreamByKeyspace.entrySet()) - { - String keyspace = entry.getKey(); - Multimap, InetAddress> rangesWithEndpoints = entry.getValue(); - - if (rangesWithEndpoints.isEmpty()) - continue; - - Map>> rangesPerEndpoint = new HashMap<>(); - for (Map.Entry, InetAddress> endPointEntry : rangesWithEndpoints.entries()) - { - Range range = endPointEntry.getKey(); - InetAddress endpoint = endPointEntry.getValue(); - - List> curRanges = rangesPerEndpoint.get(endpoint); - if (curRanges == null) - { - curRanges = new LinkedList<>(); - rangesPerEndpoint.put(endpoint, curRanges); - } - curRanges.add(range); - } - - sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint); - } - - StreamPlan streamPlan = new StreamPlan("Unbootstrap"); - for (Map.Entry>>> entry : sessionsToStreamByKeyspace.entrySet()) - { - String keyspaceName = entry.getKey(); - Map>> rangesPerEndpoint = entry.getValue(); - - for (Map.Entry>> rangesEntry : rangesPerEndpoint.entrySet()) - { - List> ranges = rangesEntry.getValue(); - InetAddress newEndpoint = rangesEntry.getKey(); - InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); - - // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); - } - } - return streamPlan.execute(); - } + future stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); +#if 0 /** * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for keyspace and ranges after move to new token) @@ -2564,6 +2521,7 @@ public: return loader.stream(); } #endif +public: int32_t get_exception_count(); #if 0 public void rescheduleFailedDeletions()