system_keyspace: implement (remote) update_tokens

There are two versions of update_tokens: one for the tokens used by this node,
which goes to the local table, and another for the remote tokens, used by
remote nodes, which goes to the peers table.

The former was implemented, the latter was not. Implement it.

One node: Origin does not issue a flush here, at least in the version of the
code we imported. However, a flush is present in all other variants, and won't
hurt, aside from creating an extra, probably very small, sstable. So I'm
flushing.

Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
This commit is contained in:
Glauber Costa
2015-07-09 18:01:47 -04:00
committed by Tomasz Grabiec
parent 31f4601329
commit fe154efffe
2 changed files with 24 additions and 19 deletions

View File

@@ -588,22 +588,31 @@ future<db_clock::time_point> get_truncated_at(cql3::query_processor& qp, utils::
});
}
#if 0
/**
* Record tokens being used by another node
*/
public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
{
removeEndpoint(ep);
return;
}
String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
set_type_impl::native_type prepare_tokens(std::unordered_set<dht::token>& tokens) {
set_type_impl::native_type tset;
for (auto& t: tokens) {
tset.push_back(boost::any(dht::global_partitioner().to_sstring(t)));
}
return tset;
}
/**
* Record tokens being used by another node
*/
future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> tokens)
{
if (ep == utils::fb_utilities::get_broadcast_address()) {
return remove_endpoint(ep);
}
sstring req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
return execute_cql(req, PEERS, ep, prepare_tokens(tokens)).discard_result().then([] {
return force_blocking_flush(PEERS);
});
}
#if 0
public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
{
String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
@@ -704,13 +713,8 @@ future<> update_tokens(std::unordered_set<dht::token> tokens) {
throw std::invalid_argument("remove_endpoint should be used instead");
}
set_type_impl::native_type tset;
for (auto& t: tokens) {
tset.push_back(boost::any(dht::global_partitioner().to_sstring(t)));
}
sstring req = "INSERT INTO system.%s (key, tokens) VALUES (?, ?)";
return execute_cql(req, LOCAL, sstring(LOCAL), std::move(tset)).discard_result().then([] {
return execute_cql(req, LOCAL, sstring(LOCAL), prepare_tokens(tokens)).discard_result().then([] {
return force_blocking_flush(LOCAL);
});
}

View File

@@ -68,6 +68,7 @@ extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp);
future<> update_schema_version(utils::UUID version);
future<> update_tokens(std::unordered_set<dht::token> tokens);
future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> tokens);
template <typename Value>
future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value);