From fe154efffed2cf26658ed07db2dac4d07f13394c Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 9 Jul 2015 18:01:47 -0400 Subject: [PATCH] system_keyspace: implement (remote) update_tokens There are two versions of update_tokens: one for the tokens used by this node, which goes to the local table, and another for the remote tokens, used by remote nodes, which goes to the peers table. The former was implemented, the latter was not. Implement it. One node: Origin does not issue a flush here, at least in the version of the code we imported. However, a flush is present in all other variants, and won't hurt, aside from creating an extra, probably very small, sstable. So I'm flushing. Signed-off-by: Glauber Costa --- db/system_keyspace.cc | 42 +++++++++++++++++++++++------------------- db/system_keyspace.hh | 1 + 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index d115d2b354..e7823f73db 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -588,22 +588,31 @@ future get_truncated_at(cql3::query_processor& qp, utils:: }); } -#if 0 - /** - * Record tokens being used by another node - */ - public static synchronized void updateTokens(InetAddress ep, Collection tokens) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - { - removeEndpoint(ep); - return; - } - String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)); +set_type_impl::native_type prepare_tokens(std::unordered_set& tokens) { + set_type_impl::native_type tset; + for (auto& t: tokens) { + tset.push_back(boost::any(dht::global_partitioner().to_sstring(t))); + } + return tset; +} + +/** + * Record tokens being used by another node + */ +future<> update_tokens(gms::inet_address ep, std::unordered_set tokens) +{ + if (ep == utils::fb_utilities::get_broadcast_address()) { + return remove_endpoint(ep); } + sstring req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; + return execute_cql(req, PEERS, ep, prepare_tokens(tokens)).discard_result().then([] { + return force_blocking_flush(PEERS); + }); +} + +#if 0 public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) { String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; @@ -704,13 +713,8 @@ future<> update_tokens(std::unordered_set tokens) { throw std::invalid_argument("remove_endpoint should be used instead"); } - set_type_impl::native_type tset; - for (auto& t: tokens) { - tset.push_back(boost::any(dht::global_partitioner().to_sstring(t))); - } - sstring req = "INSERT INTO system.%s (key, tokens) VALUES (?, ?)"; - return execute_cql(req, LOCAL, sstring(LOCAL), std::move(tset)).discard_result().then([] { + return execute_cql(req, LOCAL, sstring(LOCAL), prepare_tokens(tokens)).discard_result().then([] { return force_blocking_flush(LOCAL); }); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 4c04570ab8..245fbf21a0 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -68,6 +68,7 @@ extern schema_ptr built_indexes(); // TODO (from Cassandra): make private future<> setup(distributed& db, distributed& qp); future<> update_schema_version(utils::UUID version); future<> update_tokens(std::unordered_set tokens); +future<> update_tokens(gms::inet_address ep, std::unordered_set tokens); template future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value);