storage_service: Implement stream_hints

Needed by unbootstrap.
This commit is contained in:
Asias He
2015-10-21 10:29:25 +08:00
parent 3f5e9baa17
commit 8a9374b331
2 changed files with 41 additions and 40 deletions

View File

@@ -1965,4 +1965,44 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
return sp.execute();
}
future<streaming::stream_state> storage_service::stream_hints() {
// FIXME: flush hits column family
#if 0
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
FBUtilities.waitOnFuture(hintsCF.forceFlush());
#endif
// gather all live nodes in the cluster that aren't also leaving
auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints();
auto beg = candidates.begin();
auto end = candidates.end();
auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) {
return ep == br || !gms::get_local_failure_detector().is_alive(ep);
};
candidates.erase(std::remove_if(beg, end, remove_fn), end);
if (candidates.empty()) {
logger.warn("Unable to stream hints since no live endpoints seen");
throw std::runtime_error("Unable to stream hints since no live endpoints seen");
} else {
// stream to the closest peer as chosen by the snitch
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
snitch->sort_by_proximity(get_broadcast_address(), candidates);
auto hints_destination_host = candidates.front();
auto preferred = hints_destination_host; // FIXME: SystemKeyspace.getPreferredIP(hints_destination_host);
// stream all hints -- range list will be a singleton of "the entire ring"
auto t = dht::global_partitioner().get_minimum_token();
std::vector<range<token>> ranges = {range<token>(t)};
streaming::stream_plan sp("Hints", true);
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
auto keyspace = db::system_keyspace::NAME;
sp.transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families);
return sp.execute();
}
}
} // namespace service

View File

@@ -1891,47 +1891,8 @@ public:
private:
void leave_ring();
future<> unbootstrap();
future<streaming::stream_state> stream_hints();
#if 0
private Future<StreamState> streamHints()
{
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
FBUtilities.waitOnFuture(hintsCF.forceFlush());
// gather all live nodes in the cluster that aren't also leaving
List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
candidates.remove(FBUtilities.getBroadcastAddress());
for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
{
InetAddress address = iter.next();
if (!FailureDetector.instance.isAlive(address))
iter.remove();
}
if (candidates.isEmpty())
{
logger.warn("Unable to stream hints since no live endpoints seen");
return Futures.immediateFuture(null);
}
else
{
// stream to the closest peer as chosen by the snitch
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
InetAddress hintsDestinationHost = candidates.get(0);
InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost);
// stream all hints -- range list will be a singleton of "the entire ring"
Token token = StorageService.getPartitioner().getMinimumToken();
List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
preferred,
SystemKeyspace.NAME,
ranges,
SystemKeyspace.HINTS)
.execute();
}
}
public void move(String newToken) throws IOException
{