From 268f2fa8a1866f9a89f556b9dab7eb374575ab53 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 09:17:34 +0800 Subject: [PATCH 1/6] db/system_keyspace: Change host_id to uuid_type --- db/system_keyspace.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 36288317c6..63cc546cd5 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -198,7 +198,7 @@ schema_ptr built_indexes() { // regular columns { {"data_center", utf8_type}, - {"host_id", utf8_type}, + {"host_id", uuid_type}, {"preferred_ip", inet_addr_type}, {"rack", utf8_type}, {"release_version", utf8_type}, From b8be01393486b2869b44d323d67d716ca72d6a3c Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 10:33:58 +0800 Subject: [PATCH 2/6] messaging_service: Fix undefined reference to current_version /usr/include/boost/format/feed_args.hpp:135: undefined reference to `net::messaging_service::current_version' --- message/messaging_service.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 947b0996ad..06e73272b0 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,6 +135,9 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; + +constexpr int32_t messaging_service::current_version; + distributed _the_messaging_service; future<> deinit_messaging_service() { From ba04f0bc6bf8fa6a7c6ae8e4f32c6fe3f1097626 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 10:37:32 +0800 Subject: [PATCH 3/6] gms/versioned_value: Fix network_version --- gms/versioned_value.hh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 0abb93ef6f..5d63e679e8 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -31,6 +31,7 @@ #include "to_string.hh" #include #include +#include "message/messaging_service.hh" namespace gms { @@ -250,9 +251,7 @@ public: versioned_value network_version() { - // FIXME: MessagingService.current_version - sstring ver("ms_1_0"); - return versioned_value(ver); + return versioned_value(sprint("%s",net::messaging_service::current_version)); } versioned_value internalIP(const sstring &private_ip) From b16eb27c581ac8aa895eb287d9326be83893a231 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 10:37:56 +0800 Subject: [PATCH 4/6] gms/versioned_value: Fix rpcaddress --- gms/versioned_value.hh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 5d63e679e8..24189e6324 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -237,9 +237,7 @@ public: versioned_value rpcaddress(gms::inet_address endpoint) { - // FIXME: endpoint.getHostAddress() - sstring addr; - return versioned_value(addr); + return versioned_value(sprint("%s", endpoint)); } versioned_value release_version() From d8a281e81158074f5d9f4f17f5af16c4a7eae6df Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 10:38:31 +0800 Subject: [PATCH 5/6] storage_service: Use broadcast_address as broadcast_rpc_address for now Until we can get it from the config system. It is better than a empty address. --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 2acbfad266..c034deb640 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -53,7 +53,7 @@ future<> storage_service::prepare_to_join() { _token_metadata.update_host_id(local_host_id, this->get_broadcast_address()); // FIXME: DatabaseDescriptor.getBroadcastRpcAddress() - gms::inet_address broadcast_rpc_address; + auto broadcast_rpc_address = this->get_broadcast_address(); app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version()); app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); From 344d8e95d106bb14dd41cb49f66ce05067321c98 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 17 Jul 2015 08:47:27 +0800 Subject: [PATCH 6/6] storage_service: Update system.peers table Before: cqlsh> SELECT * from system.peers ; cqlsh:system> SELECT * FROM system.peers ; peer | data_center | host_id | preferred_ip | rack | release_version | rpc_address | schema_version | tokens ------+-------------+---------+--------------+------+-----------------+-------------+----------------+-------- (0 rows) After: cqlsh> SELECT * from system.peers ; peer | data_center | host_id | preferred_ip | rack | release_version | rpc_address | schema_version | tokens -----------+-------------+--------------------------------------+--------------+-------+-----------------+-------------+----------------+-------- 127.0.0.2 | datacenter1 | 7daa116a-03d0-4623-8084-f213701f3136 | null | rack1 | urchin_1_0 | 127.0.0.2 | null | null (1 rows) --- service/storage_service.cc | 79 +++++++++++++++++++++++--------------- service/storage_service.hh | 1 + 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index c034deb640..a7dab3c140 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6,6 +6,8 @@ #include "core/distributed.hh" #include "locator/snitch_base.hh" #include "db/system_keyspace.hh" +#include "utils/UUID.hh" +#include "gms/inet_address.hh" namespace service { @@ -675,24 +677,9 @@ void storage_service::on_change(inet_address endpoint, application_state state, // logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); return; } - - if (state == application_state::RELEASE_VERSION) { - // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value); - } else if (state == application_state::DC) { - // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value); - } else if (state == application_state::RACK) { - // SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value); - } else if (state == application_state::RPC_ADDRESS) { - // try { - // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value)); - // } catch (UnknownHostException e) { - // throw new RuntimeException(e); - // } - } else if (state == application_state::SCHEMA) { - // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); + do_update_system_peers_table(endpoint, state, value); + if (state == application_state::SCHEMA) { // MigrationManager.instance.scheduleSchemaPull(endpoint, epState); - } else if (state == application_state::HOST_ID) { - // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); } } replicate_to_all_cores(); @@ -722,6 +709,48 @@ void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state #endif } +void storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) { + ss_debug("storage_service:: Update ep=%s, state=%d, value=%s\n", endpoint, int(state), value.value); + if (state == application_state::RELEASE_VERSION) { + auto col = sstring("release_version"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::DC) { + auto col = sstring("data_center"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::RACK) { + auto col = sstring("rack"); + db::system_keyspace::update_peer_info(endpoint, col, value.value).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::RPC_ADDRESS) { + auto col = sstring("rpc_address"); + inet_address ep; + try { + ep = gms::inet_address(value.value); + } catch (...) { + print("storage_service: fail to update %s for %s: invalid rcpaddr %s\n", col, endpoint, value.value); + return; + } + db::system_keyspace::update_peer_info(endpoint, col, ep.addr()).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::SCHEMA) { + auto col = sstring("schema_version"); + db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } else if (state == application_state::HOST_ID) { + auto col = sstring("host_id"); + db::system_keyspace::update_peer_info(endpoint, col, utils::UUID(value.value)).then_wrapped([col, endpoint] (auto&& f) { + try { f.get(); } catch (...) { print("storage_service: fail to update %s for %s\n", col, endpoint); } + }); + } +} + void storage_service::update_peer_info(gms::inet_address endpoint) { using namespace gms; auto& gossiper = gms::get_local_gossiper(); @@ -731,20 +760,8 @@ void storage_service::update_peer_info(gms::inet_address endpoint) { } for (auto& entry : ep_state->get_application_state_map()) { auto& app_state = entry.first; - //auto& value = entry.second.value - if (app_state == application_state::RELEASE_VERSION) { - // SystemKeyspace.updatePeerInfo(endpoint, "release_version", value); - } else if (app_state == application_state::DC) { - // SystemKeyspace.updatePeerInfo(endpoint, "data_center", value); - } else if (app_state == application_state::RACK) { - // SystemKeyspace.updatePeerInfo(endpoint, "rack", value); - } else if (app_state == application_state::RPC_ADDRESS) { - // SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value)); - } else if (app_state == application_state::SCHEMA) { - // SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value)); - } else if (app_state == application_state::HOST_ID) { - // SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value)); - } + auto& value = entry.second; + do_update_system_peers_table(endpoint, app_state, value); } } diff --git a/service/storage_service.hh b/service/storage_service.hh index 2c28cbca76..6af3cb54f3 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -840,6 +840,7 @@ public: virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; private: void update_peer_info(inet_address endpoint); + void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value); sstring get_application_state_value(inet_address endpoint, application_state appstate); std::unordered_set get_tokens_for(inet_address endpoint); void replicate_to_all_cores();