storage_service: Implement move

Moves the node on the token ring to a new token.

$ nodetool move <new token>
This commit is contained in:
Asias He
2015-11-05 14:26:35 +08:00
parent b12d95d955
commit e637faefbe
2 changed files with 61 additions and 73 deletions

View File

@@ -2474,5 +2474,59 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
}
future<> storage_service::move(token new_token) {
return run_with_write_api_lock([new_token] (storage_service& ss) mutable {
return seastar::async([new_token, &ss] {
auto tokens = ss._token_metadata.sorted_tokens();
if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) {
throw std::runtime_error(sprint("target token %s is already owned by another node.", new_token));
}
// address of the current node
auto local_address = ss.get_broadcast_address();
// This doesn't make any sense in a vnodes environment.
if (ss.get_token_metadata().get_tokens(local_address).size() > 1) {
logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly.");
throw std::runtime_error("This node has more than one token and cannot be moved thusly.");
}
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
get_local_pending_range_calculator_service().block_until_finished().get();
// checking if data is moving to this node
for (auto keyspace_name : keyspaces_to_process) {
if (ss._token_metadata.get_pending_ranges(keyspace_name, local_address).size() > 0) {
throw std::runtime_error("data is currently moving to this node; unable to leave the ring");
}
}
gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get();
ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true);
ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true);
sleep(std::chrono::milliseconds(RING_DELAY)).get();
storage_service::range_relocator relocator(std::unordered_set<token>{new_token}, keyspaces_to_process);
if (relocator.streams_needed()) {
ss.set_mode(mode::MOVING, "fetching new ranges and streaming old ranges", true);
try {
relocator.stream().get();
} catch (...) {
throw std::runtime_error(sprint("Interrupted while waiting for stream/fetch ranges to finish: %s", std::current_exception()));
}
} else {
ss.set_mode(mode::MOVING, "No ranges to fetch/stream", true);
}
ss.set_tokens(std::unordered_set<token>{new_token}); // setting new token as we have everything settled
logger.debug("Successfully moved to new token {}", *(ss.get_local_tokens().begin()));
});
});
}
} // namespace service

View File

@@ -1803,21 +1803,14 @@ private:
void leave_ring();
void unbootstrap();
future<> stream_hints();
#if 0
public void move(String newToken) throws IOException
{
try
{
getPartitioner().getTokenFactory().validate(newToken);
}
catch (ConfigurationException e)
{
throw new IOException(e.getMessage());
}
move(getPartitioner().getTokenFactory().fromString(newToken));
public:
future<> move(sstring new_token) {
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
return move(dht::global_partitioner().from_sstring(new_token));
}
private:
/**
* move the node to new token or find a new token to boot to according to load
*
@@ -1825,67 +1818,8 @@ private:
*
* @throws IOException on any I/O operation error
*/
private void move(Token newToken) throws IOException
{
if (newToken == null)
throw new IOException("Can't move to the undefined (null) token.");
if (_token_metadata.sortedTokens().contains(newToken))
throw new IOException("target token " + newToken + " is already owned by another node.");
// address of the current node
InetAddress localAddress = FBUtilities.getBroadcastAddress();
// This doesn't make any sense in a vnodes environment.
if (getTokenMetadata().getTokens(localAddress).size() > 1)
{
logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly.");
throw new UnsupportedOperationException("This node has more than one token and cannot be moved thusly.");
}
List<String> keyspacesToProcess = Schema.instance.getNonSystemKeyspaces();
PendingRangeCalculatorService.instance.blockUntilFinished();
// checking if data is moving to this node
for (String keyspaceName : keyspacesToProcess)
{
if (_token_metadata.getPendingRanges(keyspaceName, localAddress).size() > 0)
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
}
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken));
setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true);
setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true);
Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess);
if (relocator.streamsNeeded())
{
setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
try
{
relocator.stream().get();
}
catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage());
}
}
else
{
setMode(Mode.MOVING, "No ranges to fetch/stream", true);
}
set_tokens(Collections.singleton(newToken)); // setting new token as we have everything settled
if (logger.isDebugEnabled())
logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
}
#endif
private:
future<> move(token new_token);
public:
class range_relocator {
private: