diff --git a/service/storage_service.cc b/service/storage_service.cc index f886b94c68..57efe1e68f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2474,5 +2474,59 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } } +future<> storage_service::move(token new_token) { + return run_with_write_api_lock([new_token] (storage_service& ss) mutable { + return seastar::async([new_token, &ss] { + auto tokens = ss._token_metadata.sorted_tokens(); + if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) { + throw std::runtime_error(sprint("target token %s is already owned by another node.", new_token)); + } + + // address of the current node + auto local_address = ss.get_broadcast_address(); + + // This doesn't make any sense in a vnodes environment. + if (ss.get_token_metadata().get_tokens(local_address).size() > 1) { + logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly."); + throw std::runtime_error("This node has more than one token and cannot be moved thusly."); + } + + auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces(); + + get_local_pending_range_calculator_service().block_until_finished().get(); + + // checking if data is moving to this node + for (auto keyspace_name : keyspaces_to_process) { + if (ss._token_metadata.get_pending_ranges(keyspace_name, local_address).size() > 0) { + throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); + } + } + + gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get(); + ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true); + + ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true); + sleep(std::chrono::milliseconds(RING_DELAY)).get(); + + storage_service::range_relocator relocator(std::unordered_set{new_token}, keyspaces_to_process); + + if (relocator.streams_needed()) { + ss.set_mode(mode::MOVING, "fetching new ranges and streaming old ranges", true); + try { + relocator.stream().get(); + } catch (...) { + throw std::runtime_error(sprint("Interrupted while waiting for stream/fetch ranges to finish: %s", std::current_exception())); + } + } else { + ss.set_mode(mode::MOVING, "No ranges to fetch/stream", true); + } + + ss.set_tokens(std::unordered_set{new_token}); // setting new token as we have everything settled + + logger.debug("Successfully moved to new token {}", *(ss.get_local_tokens().begin())); + }); + }); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 250dd42cfb..b06abc43bd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1803,21 +1803,14 @@ private: void leave_ring(); void unbootstrap(); future<> stream_hints(); -#if 0 - public void move(String newToken) throws IOException - { - try - { - getPartitioner().getTokenFactory().validate(newToken); - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } - move(getPartitioner().getTokenFactory().fromString(newToken)); +public: + future<> move(sstring new_token) { + // FIXME: getPartitioner().getTokenFactory().validate(newToken); + return move(dht::global_partitioner().from_sstring(new_token)); } +private: /** * move the node to new token or find a new token to boot to according to load * @@ -1825,67 +1818,8 @@ private: * * @throws IOException on any I/O operation error */ - private void move(Token newToken) throws IOException - { - if (newToken == null) - throw new IOException("Can't move to the undefined (null) token."); - - if (_token_metadata.sortedTokens().contains(newToken)) - throw new IOException("target token " + newToken + " is already owned by another node."); - - // address of the current node - InetAddress localAddress = FBUtilities.getBroadcastAddress(); - - // This doesn't make any sense in a vnodes environment. - if (getTokenMetadata().getTokens(localAddress).size() > 1) - { - logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly."); - throw new UnsupportedOperationException("This node has more than one token and cannot be moved thusly."); - } - - List keyspacesToProcess = Schema.instance.getNonSystemKeyspaces(); - - PendingRangeCalculatorService.instance.blockUntilFinished(); - // checking if data is moving to this node - for (String keyspaceName : keyspacesToProcess) - { - if (_token_metadata.getPendingRanges(keyspaceName, localAddress).size() > 0) - throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); - } - - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken)); - setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true); - - setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true); - Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS); - - RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess); - - if (relocator.streamsNeeded()) - { - setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true); - try - { - relocator.stream().get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage()); - } - } - else - { - setMode(Mode.MOVING, "No ranges to fetch/stream", true); - } - - set_tokens(Collections.singleton(newToken)); // setting new token as we have everything settled - - if (logger.isDebugEnabled()) - logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next()); - } -#endif - -private: + future<> move(token new_token); +public: class range_relocator { private: