diff --git a/service/storage_service.cc b/service/storage_service.cc index 57523a5811..923f2c1acf 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1965,4 +1965,44 @@ storage_service::stream_ranges(std::unordered_map storage_service::stream_hints() { + // FIXME: flush hits column family +#if 0 + // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) + ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); + FBUtilities.waitOnFuture(hintsCF.forceFlush()); +#endif + + // gather all live nodes in the cluster that aren't also leaving + auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints(); + auto beg = candidates.begin(); + auto end = candidates.end(); + auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) { + return ep == br || !gms::get_local_failure_detector().is_alive(ep); + }; + candidates.erase(std::remove_if(beg, end, remove_fn), end); + + if (candidates.empty()) { + logger.warn("Unable to stream hints since no live endpoints seen"); + throw std::runtime_error("Unable to stream hints since no live endpoints seen"); + } else { + // stream to the closest peer as chosen by the snitch + auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); + + snitch->sort_by_proximity(get_broadcast_address(), candidates); + auto hints_destination_host = candidates.front(); + auto preferred = hints_destination_host; // FIXME: SystemKeyspace.getPreferredIP(hints_destination_host); + + // stream all hints -- range list will be a singleton of "the entire ring" + auto t = dht::global_partitioner().get_minimum_token(); + std::vector> ranges = {range(t)}; + + streaming::stream_plan sp("Hints", true); + std::vector column_families = { db::system_keyspace::HINTS }; + auto keyspace = db::system_keyspace::NAME; + sp.transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families); + return sp.execute(); + } +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index d183d1c5b2..e92159e16e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1891,47 +1891,8 @@ public: private: void leave_ring(); future<> unbootstrap(); + future stream_hints(); #if 0 - private Future streamHints() - { - // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) - ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); - FBUtilities.waitOnFuture(hintsCF.forceFlush()); - - // gather all live nodes in the cluster that aren't also leaving - List candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); - candidates.remove(FBUtilities.getBroadcastAddress()); - for (Iterator iter = candidates.iterator(); iter.hasNext(); ) - { - InetAddress address = iter.next(); - if (!FailureDetector.instance.isAlive(address)) - iter.remove(); - } - - if (candidates.isEmpty()) - { - logger.warn("Unable to stream hints since no live endpoints seen"); - return Futures.immediateFuture(null); - } - else - { - // stream to the closest peer as chosen by the snitch - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); - InetAddress hintsDestinationHost = candidates.get(0); - InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost); - - // stream all hints -- range list will be a singleton of "the entire ring" - Token token = StorageService.getPartitioner().getMinimumToken(); - List> ranges = Collections.singletonList(new Range<>(token, token)); - - return new StreamPlan("Hints").transferRanges(hintsDestinationHost, - preferred, - SystemKeyspace.NAME, - ranges, - SystemKeyspace.HINTS) - .execute(); - } - } public void move(String newToken) throws IOException {