storage_service: Implement stream_ranges

Needed by unbootstrap.
This commit is contained in:
Asias He
2015-10-21 09:45:57 +08:00
parent d1eaccd234
commit 3f5e9baa17
2 changed files with 45 additions and 48 deletions

View File

@@ -1926,4 +1926,43 @@ void storage_service::leave_ring() {
sleep(delay).get();
}
future<streaming::stream_state>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace) {
using stream_plan = streaming::stream_plan;
// First, we build a list of ranges to stream to each host, per table
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<range<token>>>> sessions_to_stream_by_keyspace;
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
if (ranges_with_endpoints.empty()) {
continue;
}
std::unordered_map<inet_address, std::vector<range<token>>> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
range<token> r = end_point_entry.first;
inet_address endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
}
sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint));
}
stream_plan sp("Unbootstrap", true);
for (auto& entry : sessions_to_stream_by_keyspace) {
const auto& keyspace_name = entry.first;
// TODO: we can move to avoid copy of std::vector
auto& ranges_per_endpoint = entry.second;
for (auto& ranges_entry : ranges_per_endpoint) {
auto& ranges = ranges_entry.second;
auto new_endpoint = ranges_entry.first;
auto preferred = new_endpoint; // FIXME: SystemKeyspace.getPreferredIP(newEndpoint);
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
sp.transfer_ranges(new_endpoint, preferred, keyspace_name, ranges);
}
}
return sp.execute();
}
} // namespace service

View File

@@ -54,6 +54,7 @@
#include "utils/fb_utilities.hh"
#include "database.hh"
#include <seastar/core/distributed.hh>
#include "streaming/stream_state.hh"
namespace service {
@@ -2401,62 +2402,18 @@ public:
if (oldSnitch instanceof DynamicEndpointSnitch)
((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
}
#endif
private:
/**
* Seed data to the endpoints that will be responsible for it at the future
*
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
{
// First, we build a list of ranges to stream to each host, per table
Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>();
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet())
{
String keyspace = entry.getKey();
Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue();
if (rangesWithEndpoints.isEmpty())
continue;
Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>();
for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
{
Range<Token> range = endPointEntry.getKey();
InetAddress endpoint = endPointEntry.getValue();
List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
if (curRanges == null)
{
curRanges = new LinkedList<>();
rangesPerEndpoint.put(endpoint, curRanges);
}
curRanges.add(range);
}
sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint);
}
StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet())
{
String keyspaceName = entry.getKey();
Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
{
List<Range<Token>> ranges = rangesEntry.getValue();
InetAddress newEndpoint = rangesEntry.getKey();
InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint);
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges);
}
}
return streamPlan.execute();
}
future<streaming::stream_state> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace);
#if 0
/**
* Calculate pair of ranges to stream/fetch for given two range collections
* (current ranges for keyspace and ranges after move to new token)
@@ -2564,6 +2521,7 @@ public:
return loader.stream();
}
#endif
public:
int32_t get_exception_count();
#if 0
public void rescheduleFailedDeletions()