Merge "Functions to update the peer table" from Glauber

This commit is contained in:
Avi Kivity
2015-07-09 19:33:10 +03:00
4 changed files with 107 additions and 32 deletions

View File

@@ -372,12 +372,51 @@ static future<> setup_version() {
future<> check_health();
future<> force_blocking_flush(sstring cfname);
// Changing the real load_dc_rack_info into a future would trigger a tidal wave of futurization that would spread
// even into simple string operations like get_rack() / get_dc(). We will cache those at startup, and then change
// our view of it every time we do updates on those values.
//
// The cache must be distributed, because the values themselves may not update atomically, so a shard reading that
// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all
// updates are propagated correctly.
struct local_cache {
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> _cached_dc_rack_info;
future<> stop() {
return make_ready_future<>();
}
};
static distributed<local_cache> _local_cache;
static future<> build_dc_rack_info() {
return _local_cache.start().then([] {
return execute_cql("SELECT peer, data_center, rack from system.%s", PEERS).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return do_for_each(*msg, [] (auto& row) {
// Not ideal to assume ipv4 here, but currently this is what the cql types wraps.
net::ipv4_address peer = row.template get_as<net::ipv4_address>("peer");
if (!row.has("data_center") || !row.has("rack")) {
return make_ready_future<>();
}
gms::inet_address gms_addr(std::move(peer));
sstring dc = row.template get_as<sstring>("data_center");
sstring rack = row.template get_as<sstring>("rack");
locator::endpoint_dc_rack element = { dc, rack };
return _local_cache.invoke_on_all([gms_addr = std::move(gms_addr), element = std::move(element)] (local_cache& lc) {
lc._cached_dc_rack_info.emplace(gms_addr, element);
});
});
});
});
}
future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp) {
auto new_ctx = std::make_unique<query_context>(db, qp);
qctx.swap(new_ctx);
assert(!new_ctx);
return setup_version().then([] {
return update_schema_version(utils::make_random_uuid()); // FIXME: should not be random
}).then([] {
return build_dc_rack_info();
}).then([] {
return check_health();
}).then([] {
@@ -571,16 +610,46 @@ future<db_clock::time_point> get_truncated_at(cql3::query_processor& qp, utils::
executeInternal(String.format(req, PEERS), ep, preferred_ip);
forceBlockingFlush(PEERS);
}
#endif
public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
return;
template <typename Value>
static future<> update_cached_values(gms::inet_address ep, sstring column_name, Value value) {
return make_ready_future<>();
}
String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
executeInternal(String.format(req, PEERS, columnName), ep, value);
template <>
future<> update_cached_values(gms::inet_address ep, sstring column_name, sstring value) {
return _local_cache.invoke_on_all([ep = std::move(ep),
column_name = std::move(column_name),
value = std::move(value)] (local_cache& lc) {
if (column_name == "data_center") {
lc._cached_dc_rack_info[ep].dc = value;
} else if (column_name == "rack") {
lc._cached_dc_rack_info[ep].rack = value;
}
return make_ready_future<>();
});
}
template <typename Value>
future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value) {
if (ep == utils::fb_utilities::get_broadcast_address()) {
return make_ready_future<>();
}
return update_cached_values(ep, column_name, value).then([ep, column_name, value] {
sstring clause = sprint("(peer, %s) VALUES (?, ?)", column_name);
sstring req = "INSERT INTO system.%s " + clause;
return execute_cql(req, PEERS, ep.addr(), value).discard_result();
});
}
// sets are not needed, since tokens are updated by another method
template future<> update_peer_info<sstring>(gms::inet_address ep, sstring column_name, sstring);
template future<> update_peer_info<utils::UUID>(gms::inet_address ep, sstring column_name, utils::UUID);
template future<> update_peer_info<net::ipv4_address>(gms::inet_address ep, sstring column_name, net::ipv4_address);
#if 0
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
{
// with 30 day TTL
@@ -614,16 +683,19 @@ future<> update_schema_version(utils::UUID version) {
return tokens;
}
/**
* Remove stored tokens being used by another node
*/
public static synchronized void removeEndpoint(InetAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
executeInternal(String.format(req, PEERS), ep);
}
#endif
/**
* Remove stored tokens being used by another node
*/
future<> remove_endpoint(gms::inet_address ep) {
return _local_cache.invoke_on_all([ep] (local_cache& lc) {
lc._cached_dc_rack_info.erase(ep);
}).then([ep] {
sstring req = "DELETE FROM system.%s WHERE peer = ?";
return execute_cql(req, PEERS, ep).discard_result();
});
}
/**
* This method is used to update the System Keyspace with the new tokens for this node
*/
@@ -1086,23 +1158,8 @@ future<utils::UUID> set_local_host_id(const utils::UUID& host_id) {
}
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>
load_dc_rack_info()
{
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> result;
#if 0 //TODO
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("data_center") && row.has("rack"))
{
Map<String, String> dcRack = new HashMap<>();
dcRack.put("data_center", row.getString("data_center"));
dcRack.put("rack", row.getString("rack"));
result.put(peer, dcRack);
}
}
#endif
return result;
load_dc_rack_info() {
return _local_cache.local()._cached_dc_rack_info;
}
future<lw_shared_ptr<query::result_set>>

View File

@@ -69,6 +69,11 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
future<> update_schema_version(utils::UUID version);
future<> update_tokens(std::unordered_set<dht::token> tokens);
template <typename Value>
future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value);
future<> remove_endpoint(gms::inet_address ep);
std::vector<schema_ptr> all_tables();
void make(database& db, bool durable);

View File

@@ -20,6 +20,12 @@ public:
inet_address(int32_t ip)
: _addr(uint32_t(ip)) {
}
inet_address(net::ipv4_address&& addr) : _addr(std::move(addr)) {}
const net::ipv4_address& addr() const {
return _addr;
}
inet_address(const sstring& addr) {
// FIXME: We need a real DNS resolver
if (addr == "localhost") {

View File

@@ -25,6 +25,7 @@
#include <boost/range/algorithm/for_each.hpp>
#include <boost/range/numeric.hpp>
#include <boost/range/combine.hpp>
#include "net/ip.hh"
class tuple_type_impl;
@@ -674,6 +675,12 @@ shared_ptr<const abstract_type> data_type_for<db_clock::time_point>() {
return date_type;
}
template <>
inline
shared_ptr<const abstract_type> data_type_for<net::ipv4_address>() {
return inet_addr_type;
}
namespace std {
template <>