From 6b8d823c82b6371f6ff9533a7376094f3216eb50 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 8 Jul 2015 12:16:59 -0400 Subject: [PATCH 1/6] gms: allow the construction of the object from a net address That is what is going to be stored in the data_type(), so provide the conversion Signed-off-by: Glauber Costa --- gms/inet_address.hh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gms/inet_address.hh b/gms/inet_address.hh index 95fcfb75a9..0e55548978 100644 --- a/gms/inet_address.hh +++ b/gms/inet_address.hh @@ -20,6 +20,8 @@ public: inet_address(int32_t ip) : _addr(uint32_t(ip)) { } + inet_address(net::ipv4_address&& addr) : _addr(std::move(addr)) {} + inet_address(const sstring& addr) { // FIXME: We need a real DNS resolver if (addr == "localhost") { From 4ed027bf898d6a50fac0393bd2f1c1efca123f3e Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 8 Jul 2015 12:17:55 -0400 Subject: [PATCH 2/6] types: add data_type for inet_address That was still missing Signed-off-by: Glauber Costa --- types.hh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/types.hh b/types.hh index 55f4b28c30..c72b0f79a8 100644 --- a/types.hh +++ b/types.hh @@ -25,6 +25,7 @@ #include #include #include +#include "net/ip.hh" class tuple_type_impl; @@ -674,6 +675,12 @@ shared_ptr data_type_for() { return date_type; } +template <> +inline +shared_ptr data_type_for() { + return inet_addr_type; +} + namespace std { template <> From d43933e64225196653d54176531f50488c5d7903 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 8 Jul 2015 21:02:43 -0400 Subject: [PATCH 3/6] gms: add addr method to inet_addr Because the cql types deal with a raw inet address and not the gms container, we need a method to fetch it Signed-off-by: Glauber Costa --- gms/inet_address.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gms/inet_address.hh b/gms/inet_address.hh index 0e55548978..8813c91fce 100644 --- a/gms/inet_address.hh +++ b/gms/inet_address.hh @@ -22,6 +22,10 @@ public: } inet_address(net::ipv4_address&& addr) : _addr(std::move(addr)) {} + const net::ipv4_address& addr() const { + return _addr; + } + inet_address(const sstring& addr) { // FIXME: We need a real DNS resolver if (addr == "localhost") { From 38465dea70f3295a7b09550eb164dd75be8a31d2 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 8 Jul 2015 11:29:50 -0400 Subject: [PATCH 4/6] system_keyspaces: implement load_dc_rack_info To implement that, we will resort to a cache mechanism, instead of doing the query all the time. This is mainly because we want to avoid overfuturization of the callers, that are usually just interested in passing simple strings around. We will be able to intercept all updates to it, and maintain consistency with our internal cache. The updates are not done in this patchset. Signed-off-by: Glauber Costa --- db/system_keyspace.cc | 58 ++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 45789bbc17..4877e37673 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -372,12 +372,51 @@ static future<> setup_version() { future<> check_health(); future<> force_blocking_flush(sstring cfname); +// Changing the real load_dc_rack_info into a future would trigger a tidal wave of futurization that would spread +// even into simple string operations like get_rack() / get_dc(). We will cache those at startup, and then change +// our view of it every time we do updates on those values. +// +// The cache must be distributed, because the values themselves may not update atomically, so a shard reading that +// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all +// updates are propagated correctly. +struct local_cache { + std::unordered_map _cached_dc_rack_info; + future<> stop() { + return make_ready_future<>(); + } +}; +static distributed _local_cache; + +static future<> build_dc_rack_info() { + return _local_cache.start().then([] { + return execute_cql("SELECT peer, data_center, rack from system.%s", PEERS).then([] (::shared_ptr msg) { + return do_for_each(*msg, [] (auto& row) { + // Not ideal to assume ipv4 here, but currently this is what the cql types wraps. + net::ipv4_address peer = row.template get_as("peer"); + if (!row.has("data_center") || !row.has("rack")) { + return make_ready_future<>(); + } + gms::inet_address gms_addr(std::move(peer)); + sstring dc = row.template get_as("data_center"); + sstring rack = row.template get_as("rack"); + + locator::endpoint_dc_rack element = { dc, rack }; + return _local_cache.invoke_on_all([gms_addr = std::move(gms_addr), element = std::move(element)] (local_cache& lc) { + lc._cached_dc_rack_info.emplace(gms_addr, element); + }); + }); + }); + }); +} + future<> setup(distributed& db, distributed& qp) { auto new_ctx = std::make_unique(db, qp); qctx.swap(new_ctx); assert(!new_ctx); return setup_version().then([] { return update_schema_version(utils::make_random_uuid()); // FIXME: should not be random + }).then([] { + return build_dc_rack_info(); }).then([] { return check_health(); }).then([] { @@ -1086,23 +1125,8 @@ future set_local_host_id(const utils::UUID& host_id) { } std::unordered_map -load_dc_rack_info() -{ - std::unordered_map result; -#if 0 //TODO - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("data_center") && row.has("rack")) - { - Map dcRack = new HashMap<>(); - dcRack.put("data_center", row.getString("data_center")); - dcRack.put("rack", row.getString("rack")); - result.put(peer, dcRack); - } - } -#endif - return result; +load_dc_rack_info() { + return _local_cache.local()._cached_dc_rack_info; } future> From e559959632734e88801b20e4155cd81d3ce92869 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 8 Jul 2015 20:32:13 -0400 Subject: [PATCH 5/6] system_keyspace: implement update_peer_info Signed-off-by: Glauber Costa --- db/system_keyspace.cc | 42 ++++++++++++++++++++++++++++++++++++------ db/system_keyspace.hh | 3 +++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 4877e37673..1b51d637a8 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -610,16 +610,46 @@ future get_truncated_at(cql3::query_processor& qp, utils:: executeInternal(String.format(req, PEERS), ep, preferred_ip); forceBlockingFlush(PEERS); } +#endif - public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - return; +template +static future<> update_cached_values(gms::inet_address ep, sstring column_name, Value value) { + return make_ready_future<>(); +} - String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS, columnName), ep, value); +template <> +future<> update_cached_values(gms::inet_address ep, sstring column_name, sstring value) { + return _local_cache.invoke_on_all([ep = std::move(ep), + column_name = std::move(column_name), + value = std::move(value)] (local_cache& lc) { + if (column_name == "data_center") { + lc._cached_dc_rack_info[ep].dc = value; + } else if (column_name == "rack") { + lc._cached_dc_rack_info[ep].rack = value; + } + return make_ready_future<>(); + }); +} + +template +future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value) { + if (ep == utils::fb_utilities::get_broadcast_address()) { + return make_ready_future<>(); } + return update_cached_values(ep, column_name, value).then([ep, column_name, value] { + sstring clause = sprint("(peer, %s) VALUES (?, ?)", column_name); + sstring req = "INSERT INTO system.%s " + clause; + return execute_cql(req, PEERS, ep.addr(), value).discard_result(); + }); +} +// sets are not needed, since tokens are updated by another method +template future<> update_peer_info(gms::inet_address ep, sstring column_name, sstring); +template future<> update_peer_info(gms::inet_address ep, sstring column_name, utils::UUID); +template future<> update_peer_info(gms::inet_address ep, sstring column_name, net::ipv4_address); + +#if 0 + public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) { // with 30 day TTL diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index fb9b09c70d..659e256a17 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -69,6 +69,9 @@ future<> setup(distributed& db, distributed& qp future<> update_schema_version(utils::UUID version); future<> update_tokens(std::unordered_set tokens); +template +future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value); + std::vector all_tables(); void make(database& db, bool durable); From afee9ab72a1be539d130ecb19271d79ee9639ace Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 9 Jul 2015 11:32:46 -0400 Subject: [PATCH 6/6] system_keyspaces: implement remove_endpoint Signed-off-by: Glauber Costa --- db/system_keyspace.cc | 21 ++++++++++++--------- db/system_keyspace.hh | 2 ++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 1b51d637a8..26eaa91794 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -683,16 +683,19 @@ future<> update_schema_version(utils::UUID version) { return tokens; } - /** - * Remove stored tokens being used by another node - */ - public static synchronized void removeEndpoint(InetAddress ep) - { - String req = "DELETE FROM system.%s WHERE peer = ?"; - executeInternal(String.format(req, PEERS), ep); - } - #endif +/** + * Remove stored tokens being used by another node + */ +future<> remove_endpoint(gms::inet_address ep) { + return _local_cache.invoke_on_all([ep] (local_cache& lc) { + lc._cached_dc_rack_info.erase(ep); + }).then([ep] { + sstring req = "DELETE FROM system.%s WHERE peer = ?"; + return execute_cql(req, PEERS, ep).discard_result(); + }); +} + /** * This method is used to update the System Keyspace with the new tokens for this node */ diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 659e256a17..c481f178fb 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -72,6 +72,8 @@ future<> update_tokens(std::unordered_set tokens); template future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value); +future<> remove_endpoint(gms::inet_address ep); + std::vector all_tables(); void make(database& db, bool durable);