diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 36288317c6..63cc546cd5 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -198,7 +198,7 @@ schema_ptr built_indexes() { // regular columns { {"data_center", utf8_type}, - {"host_id", utf8_type}, + {"host_id", uuid_type}, {"preferred_ip", inet_addr_type}, {"rack", utf8_type}, {"release_version", utf8_type}, diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 0abb93ef6f..24189e6324 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -31,6 +31,7 @@ #include "to_string.hh" #include #include +#include "message/messaging_service.hh" namespace gms { @@ -236,9 +237,7 @@ public: versioned_value rpcaddress(gms::inet_address endpoint) { - // FIXME: endpoint.getHostAddress() - sstring addr; - return versioned_value(addr); + return versioned_value(sprint("%s", endpoint)); } versioned_value release_version() @@ -250,9 +249,7 @@ public: versioned_value network_version() { - // FIXME: MessagingService.current_version - sstring ver("ms_1_0"); - return versioned_value(ver); + return versioned_value(sprint("%s",net::messaging_service::current_version)); } versioned_value internalIP(const sstring &private_ip) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 947b0996ad..06e73272b0 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,6 +135,9 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; + +constexpr int32_t messaging_service::current_version; + distributed _the_messaging_service; future<> deinit_messaging_service() { diff --git a/service/storage_service.cc b/service/storage_service.cc index 2acbfad266..a7dab3c140 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6,6 +6,8 @@ #include "core/distributed.hh" #include "locator/snitch_base.hh" #include "db/system_keyspace.hh" +#include "utils/UUID.hh" +#include "gms/inet_address.hh" namespace service { @@ -53,7 +55,7 @@ future<> storage_service::prepare_to_join() { _token_metadata.update_host_id(local_host_id, this->get_broadcast_address()); // FIXME: DatabaseDescriptor.getBroadcastRpcAddress() - gms::inet_address broadcast_rpc_address; + auto broadcast_rpc_address = this->get_broadcast_address(); app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version()); app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); @@ -675,24 +677,9 @@ void storage_service::on_change(inet_address endpoint, application_state state, // logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); return; } - - if (state == application_state::RELEASE_VERSION) { - // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value); - } else if (state == application_state::DC) { - // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value); - } else if (state == application_state::RACK) { - // SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value); - } else if (state == application_state::RPC_ADDRESS) { - // try { - // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value)); - // } catch (UnknownHostException e) { - // throw new RuntimeException(e); - // } - } else if (state == application_state::SCHEMA) { - // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); + do_update_system_peers_table(endpoint, state, value); + if (state == application_state::SCHEMA) { // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); - } else if (state == application_state::HOST_ID) { - // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); } } replicate_to_all_cores(); @@ -722,6 +709,48 @@ void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state #endif } +void storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) { + ss_debug("storage_service:: Update ep=%s, state=%d, value=%s\n", endpoint, int(state), value.value); + if (state == application_state::RELEASE_VERSION) { + auto col = sstring("release_version"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::DC) { + auto col = sstring("data_center"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::RACK) { + auto col = sstring("rack"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::RPC_ADDRESS) { + auto col = sstring("rpc_address"); + inet_address ep; + try { + ep = gms::inet_address(value.value); + } catch (...) { + print("storage_service: fail to update %s for %s: invalid rcpaddr %s\n", col, endpoint, value.value); + return; + } + db::system_keyspace::update_peer_info(endpoint, col, ep.addr()).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::SCHEMA) { + auto col = sstring("schema_version"); + db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::HOST_ID) { + auto col = sstring("host_id"); + db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } +} + void storage_service::update_peer_info(gms::inet_address endpoint) { using namespace gms; auto& gossiper = gms::get_local_gossiper(); @@ -731,20 +760,8 @@ void storage_service::update_peer_info(gms::inet_address endpoint) { } for (auto& entry : ep_state->get_application_state_map()) { auto& app_state = entry.first; - //auto& value = entry.second.value - if (app_state == application_state::RELEASE_VERSION) { - // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value); - } else if (app_state == application_state::DC) { - // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value); - } else if (app_state == application_state::RACK) { - // SystemKeyspace.updatePeerInfo(endpoint, "rack", value); - } else if (app_state == application_state::RPC_ADDRESS) { - // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value)); - } else if (app_state == application_state::SCHEMA) { - // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value)); - } else if (app_state == application_state::HOST_ID) { - // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value)); - } + auto& value = entry.second; + do_update_system_peers_table(endpoint, app_state, value); } } diff --git a/service/storage_service.hh b/service/storage_service.hh index 2c28cbca76..6af3cb54f3 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -840,6 +840,7 @@ public: virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; private: void update_peer_info(inet_address endpoint); + void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value); sstring get_application_state_value(inet_address endpoint, application_state appstate); std::unordered_set get_tokens_for(inet_address endpoint); void replicate_to_all_cores();