diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 45789bbc17..26eaa91794 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -372,12 +372,51 @@ static future<> setup_version() { future<> check_health(); future<> force_blocking_flush(sstring cfname); +// Changing the real load_dc_rack_info into a future would trigger a tidal wave of futurization that would spread +// even into simple string operations like get_rack() / get_dc(). We will cache those at startup, and then change +// our view of it every time we do updates on those values. +// +// The cache must be distributed, because the values themselves may not update atomically, so a shard reading that +// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all +// updates are propagated correctly. +struct local_cache { + std::unordered_map _cached_dc_rack_info; + future<> stop() { + return make_ready_future<>(); + } +}; +static distributed _local_cache; + +static future<> build_dc_rack_info() { + return _local_cache.start().then([] { + return execute_cql("SELECT peer, data_center, rack from system.%s", PEERS).then([] (::shared_ptr msg) { + return do_for_each(*msg, [] (auto& row) { + // Not ideal to assume ipv4 here, but currently this is what the cql types wraps. + net::ipv4_address peer = row.template get_as("peer"); + if (!row.has("data_center") || !row.has("rack")) { + return make_ready_future<>(); + } + gms::inet_address gms_addr(std::move(peer)); + sstring dc = row.template get_as("data_center"); + sstring rack = row.template get_as("rack"); + + locator::endpoint_dc_rack element = { dc, rack }; + return _local_cache.invoke_on_all([gms_addr = std::move(gms_addr), element = std::move(element)] (local_cache& lc) { + lc._cached_dc_rack_info.emplace(gms_addr, element); + }); + }); + }); + }); +} + future<> setup(distributed& db, distributed& qp) { auto new_ctx = std::make_unique(db, qp); qctx.swap(new_ctx); assert(!new_ctx); return setup_version().then([] { return update_schema_version(utils::make_random_uuid()); // FIXME: should not be random + }).then([] { + return build_dc_rack_info(); }).then([] { return check_health(); }).then([] { @@ -571,16 +610,46 @@ future get_truncated_at(cql3::query_processor& qp, utils:: executeInternal(String.format(req, PEERS), ep, preferred_ip); forceBlockingFlush(PEERS); } +#endif - public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value) - { - if (ep.equals(FBUtilities.getBroadcastAddress())) - return; +template +static future<> update_cached_values(gms::inet_address ep, sstring column_name, Value value) { + return make_ready_future<>(); +} - String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS, columnName), ep, value); +template <> +future<> update_cached_values(gms::inet_address ep, sstring column_name, sstring value) { + return _local_cache.invoke_on_all([ep = std::move(ep), + column_name = std::move(column_name), + value = std::move(value)] (local_cache& lc) { + if (column_name == "data_center") { + lc._cached_dc_rack_info[ep].dc = value; + } else if (column_name == "rack") { + lc._cached_dc_rack_info[ep].rack = value; + } + return make_ready_future<>(); + }); +} + +template +future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value) { + if (ep == utils::fb_utilities::get_broadcast_address()) { + return make_ready_future<>(); } + return update_cached_values(ep, column_name, value).then([ep, column_name, value] { + sstring clause = sprint("(peer, %s) VALUES (?, ?)", column_name); + sstring req = "INSERT INTO system.%s " + clause; + return execute_cql(req, PEERS, ep.addr(), value).discard_result(); + }); +} +// sets are not needed, since tokens are updated by another method +template future<> update_peer_info(gms::inet_address ep, sstring column_name, sstring); +template future<> update_peer_info(gms::inet_address ep, sstring column_name, utils::UUID); +template future<> update_peer_info(gms::inet_address ep, sstring column_name, net::ipv4_address); + +#if 0 + public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) { // with 30 day TTL @@ -614,16 +683,19 @@ future<> update_schema_version(utils::UUID version) { return tokens; } - /** - * Remove stored tokens being used by another node - */ - public static synchronized void removeEndpoint(InetAddress ep) - { - String req = "DELETE FROM system.%s WHERE peer = ?"; - executeInternal(String.format(req, PEERS), ep); - } - #endif +/** + * Remove stored tokens being used by another node + */ +future<> remove_endpoint(gms::inet_address ep) { + return _local_cache.invoke_on_all([ep] (local_cache& lc) { + lc._cached_dc_rack_info.erase(ep); + }).then([ep] { + sstring req = "DELETE FROM system.%s WHERE peer = ?"; + return execute_cql(req, PEERS, ep).discard_result(); + }); +} + /** * This method is used to update the System Keyspace with the new tokens for this node */ @@ -1086,23 +1158,8 @@ future set_local_host_id(const utils::UUID& host_id) { } std::unordered_map -load_dc_rack_info() -{ - std::unordered_map result; -#if 0 //TODO - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) - { - InetAddress peer = row.getInetAddress("peer"); - if (row.has("data_center") && row.has("rack")) - { - Map dcRack = new HashMap<>(); - dcRack.put("data_center", row.getString("data_center")); - dcRack.put("rack", row.getString("rack")); - result.put(peer, dcRack); - } - } -#endif - return result; +load_dc_rack_info() { + return _local_cache.local()._cached_dc_rack_info; } future> diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index fb9b09c70d..c481f178fb 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -69,6 +69,11 @@ future<> setup(distributed& db, distributed& qp future<> update_schema_version(utils::UUID version); future<> update_tokens(std::unordered_set tokens); +template +future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value); + +future<> remove_endpoint(gms::inet_address ep); + std::vector all_tables(); void make(database& db, bool durable); diff --git a/gms/inet_address.hh b/gms/inet_address.hh index 95fcfb75a9..8813c91fce 100644 --- a/gms/inet_address.hh +++ b/gms/inet_address.hh @@ -20,6 +20,12 @@ public: inet_address(int32_t ip) : _addr(uint32_t(ip)) { } + inet_address(net::ipv4_address&& addr) : _addr(std::move(addr)) {} + + const net::ipv4_address& addr() const { + return _addr; + } + inet_address(const sstring& addr) { // FIXME: We need a real DNS resolver if (addr == "localhost") { diff --git a/types.hh b/types.hh index 55f4b28c30..c72b0f79a8 100644 --- a/types.hh +++ b/types.hh @@ -25,6 +25,7 @@ #include #include #include +#include "net/ip.hh" class tuple_type_impl; @@ -674,6 +675,12 @@ shared_ptr data_type_for() { return date_type; } +template <> +inline +shared_ptr data_type_for() { + return inet_addr_type; +} + namespace std { template <>