storage_service: Implement get_tokens_for

This commit is contained in:
Asias He
2015-05-29 10:50:57 +08:00
parent 322b27c12b
commit 03b526f06b

View File

@@ -390,7 +390,7 @@ private:
{
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
@@ -1435,25 +1435,37 @@ public:
}
}
}
private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
{
String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
return vvalue.getBytes(ISO_8859_1);
#endif
private:
sstring get_application_state_value(inet_address endpoint, application_state appstate) {
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!eps) {
return {};
}
auto v = eps->get_application_state(appstate);
if (!v) {
return {};
}
return v->value;
}
private Collection<Token> getTokensFor(InetAddress endpoint)
{
try
{
return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
}
catch (IOException e)
{
throw new RuntimeException(e);
std::set<token> get_tokens_for(inet_address endpoint) {
auto tokens_string = get_application_state_value(endpoint, application_state::TOKENS);
ss_debug("endpoint=%s, tokens_string=%s\n", endpoint, tokens_string);
std::vector<sstring> tokens;
std::set<token> ret;
boost::split(tokens, tokens_string, boost::is_any_of(";"));
for (auto str : tokens) {
ss_debug("token=%s\n", str);
sstring_view sv(str);
bytes b = from_hex(sv);
ret.emplace(token::kind::key, b);
}
return ret;
}
#if 0
/**
* Handle node bootstrap
*
@@ -1463,7 +1475,7 @@ public:
{
Collection<Token> tokens;
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
tokens = getTokensFor(endpoint);
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state bootstrapping, token {}", endpoint, tokens);
@@ -1500,7 +1512,7 @@ public:
{
Collection<Token> tokens;
tokens = getTokensFor(endpoint);
tokens = get_tokens_for(endpoint);
Set<Token> tokensToUpdateInMetadata = new HashSet<>();
Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
@@ -1631,7 +1643,7 @@ public:
private void handleStateLeaving(InetAddress endpoint)
{
Collection<Token> tokens;
tokens = getTokensFor(endpoint);
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
@@ -1666,7 +1678,7 @@ public:
{
assert pieces.length >= 2;
Collection<Token> tokens;
tokens = getTokensFor(endpoint);
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state left, tokens {}", endpoint, tokens);