Merge "Update system.peers table" from Asias

This commit is contained in:
Avi Kivity
2015-07-21 13:03:40 +03:00
5 changed files with 57 additions and 39 deletions

View File

@@ -198,7 +198,7 @@ schema_ptr built_indexes() {
// regular columns
{
{"data_center", utf8_type},
{"host_id", utf8_type},
{"host_id", uuid_type},
{"preferred_ip", inet_addr_type},
{"rack", utf8_type},
{"release_version", utf8_type},

View File

@@ -31,6 +31,7 @@
#include "to_string.hh"
#include <unordered_set>
#include <vector>
#include "message/messaging_service.hh"
namespace gms {
@@ -236,9 +237,7 @@ public:
versioned_value rpcaddress(gms::inet_address endpoint)
{
// FIXME: endpoint.getHostAddress()
sstring addr;
return versioned_value(addr);
return versioned_value(sprint("%s", endpoint));
}
versioned_value release_version()
@@ -250,9 +249,7 @@ public:
versioned_value network_version()
{
// FIXME: MessagingService.current_version
sstring ver("ms_1_0");
return versioned_value(ver);
return versioned_value(sprint("%s",net::messaging_service::current_version));
}
versioned_value internalIP(const sstring &private_ip)

View File

@@ -135,6 +135,9 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; };
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
constexpr int32_t messaging_service::current_version;
distributed<messaging_service> _the_messaging_service;
future<> deinit_messaging_service() {

View File

@@ -6,6 +6,8 @@
#include "core/distributed.hh"
#include "locator/snitch_base.hh"
#include "db/system_keyspace.hh"
#include "utils/UUID.hh"
#include "gms/inet_address.hh"
namespace service {
@@ -53,7 +55,7 @@ future<> storage_service::prepare_to_join() {
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
gms::inet_address broadcast_rpc_address;
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
@@ -675,24 +677,9 @@ void storage_service::on_change(inet_address endpoint, application_state state,
// logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return;
}
if (state == application_state::RELEASE_VERSION) {
// SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
} else if (state == application_state::DC) {
// SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
} else if (state == application_state::RACK) {
// SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
} else if (state == application_state::RPC_ADDRESS) {
// try {
// SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
// } catch (UnknownHostException e) {
// throw new RuntimeException(e);
// }
} else if (state == application_state::SCHEMA) {
// SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
do_update_system_peers_table(endpoint, state, value);
if (state == application_state::SCHEMA) {
// MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
} else if (state == application_state::HOST_ID) {
// SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
}
}
replicate_to_all_cores();
@@ -722,6 +709,48 @@ void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state
#endif
}
void storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) {
ss_debug("storage_service:: Update ep=%s, state=%d, value=%s\n", endpoint, int(state), value.value);
if (state == application_state::RELEASE_VERSION) {
auto col = sstring("release_version");
db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
} else if (state == application_state::DC) {
auto col = sstring("data_center");
db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
} else if (state == application_state::RACK) {
auto col = sstring("rack");
db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
} else if (state == application_state::RPC_ADDRESS) {
auto col = sstring("rpc_address");
inet_address ep;
try {
ep = gms::inet_address(value.value);
} catch (...) {
print("storage_service: fail to update %s for %s: invalid rcpaddr %s\n", col, endpoint, value.value);
return;
}
db::system_keyspace::update_peer_info(endpoint, col, ep.addr()).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
} else if (state == application_state::SCHEMA) {
auto col = sstring("schema_version");
db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
} else if (state == application_state::HOST_ID) {
auto col = sstring("host_id");
db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) {
try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); }
});
}
}
void storage_service::update_peer_info(gms::inet_address endpoint) {
using namespace gms;
auto& gossiper = gms::get_local_gossiper();
@@ -731,20 +760,8 @@ void storage_service::update_peer_info(gms::inet_address endpoint) {
}
for (auto& entry : ep_state->get_application_state_map()) {
auto& app_state = entry.first;
//auto& value = entry.second.value
if (app_state == application_state::RELEASE_VERSION) {
// SystemKeyspace.updatePeerInfo(endpoint, "release_version", value);
} else if (app_state == application_state::DC) {
// SystemKeyspace.updatePeerInfo(endpoint, "data_center", value);
} else if (app_state == application_state::RACK) {
// SystemKeyspace.updatePeerInfo(endpoint, "rack", value);
} else if (app_state == application_state::RPC_ADDRESS) {
// SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value));
} else if (app_state == application_state::SCHEMA) {
// SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value));
} else if (app_state == application_state::HOST_ID) {
// SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value));
}
auto& value = entry.second;
do_update_system_peers_table(endpoint, app_state, value);
}
}

View File

@@ -840,6 +840,7 @@ public:
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
private:
void update_peer_info(inet_address endpoint);
void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value);
sstring get_application_state_value(inet_address endpoint, application_state appstate);
std::unordered_set<token> get_tokens_for(inet_address endpoint);
void replicate_to_all_cores();