From e4ac4db1c57f83f03bad42fdc59e253539798b37 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 14:36:28 +0200 Subject: [PATCH 01/15] token_metadata::update_normal_tokens: take tokens by const ref --- locator/token_metadata.cc | 4 ++-- locator/token_metadata.hh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index f5769f2cd3..c9b0c8913e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -102,7 +102,7 @@ void token_metadata::update_normal_tokens(std::unordered_set tokens, inet * * @param endpointTokens */ -void token_metadata::update_normal_tokens(std::unordered_map>& endpoint_tokens) { +void token_metadata::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { if (endpoint_tokens.empty()) { return; } @@ -110,7 +110,7 @@ void token_metadata::update_normal_tokens(std::unordered_map& tokens = i.second; + const auto& tokens = i.second; if (tokens.empty()) { auto msg = format("tokens is empty in update_normal_tokens"); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 04fba6ab94..e5a86007d0 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -234,7 +234,7 @@ public: const std::vector& sorted_tokens() const; void update_normal_token(token token, inet_address endpoint); void update_normal_tokens(std::unordered_set tokens, inet_address endpoint); - void update_normal_tokens(std::unordered_map>& endpoint_tokens); + void update_normal_tokens(const std::unordered_map>& endpoint_tokens); const token& first_token(const token& start) const; size_t first_token_index(const token& start) const; std::optional get_endpoint(const token& token) const; From 00dcea34783de85ca1edd6de22d341c44b278c78 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 14:42:20 +0200 Subject: [PATCH 02/15] db::system_keyspace::prepare_tokens: make static, take tokens by const ref --- db/system_keyspace.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 6b0ad8e7b6..05f16a1941 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1454,7 +1454,7 @@ future get_truncated_at(utils::UUID cf_id) { }); } -set_type_impl::native_type prepare_tokens(std::unordered_set& tokens) { +static set_type_impl::native_type prepare_tokens(const std::unordered_set& tokens) { set_type_impl::native_type tset; for (auto& t: tokens) { tset.push_back(dht::global_partitioner().to_sstring(t)); From 8c8a17a0fe287a54471f8a2079fd78056a16d6ad Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 14:43:16 +0200 Subject: [PATCH 03/15] db::system_keyspace::update_tokens: take tokens by const ref --- db/system_keyspace.cc | 4 ++-- db/system_keyspace.hh | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 05f16a1941..3e547d44bc 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1475,7 +1475,7 @@ std::unordered_set decode_tokens(set_type_impl::native_type& tokens) /** * Record tokens being used by another node */ -future<> update_tokens(gms::inet_address ep, std::unordered_set tokens) +future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens) { if (ep == utils::fb_utilities::get_broadcast_address()) { return remove_endpoint(ep); @@ -1651,7 +1651,7 @@ future<> remove_endpoint(gms::inet_address ep) { /** * This method is used to update the System Keyspace with the new tokens for this node */ -future<> update_tokens(std::unordered_set tokens) { +future<> update_tokens(const std::unordered_set& tokens) { if (tokens.empty()) { throw std::invalid_argument("remove_endpoint should be used instead"); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 48ee1b877c..57afb22206 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -162,8 +162,8 @@ future<> setup(distributed& db, distributed& qp, distributed& ss); future<> update_schema_version(utils::UUID version); -future<> update_tokens(std::unordered_set tokens); -future<> update_tokens(gms::inet_address ep, std::unordered_set tokens); +future<> update_tokens(const std::unordered_set& tokens); +future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens); future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip); future> get_preferred_ips(); From 2db07c697fe2f9106c12a69086c8747eb769e655 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 14:47:55 +0200 Subject: [PATCH 04/15] storage_service::handle_state_normal: remove local_tokens_to_remove That was dead code. Removing tokens is handled inside remove_endpoint, using the endpoints_to_remove set. --- service/storage_service.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 5ef721238a..78c661b2b6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -969,7 +969,6 @@ void storage_service::handle_state_normal(inet_address endpoint) { std::unordered_set tokens_to_update_in_metadata; std::unordered_set tokens_to_update_in_system_keyspace; - std::unordered_set local_tokens_to_remove; std::unordered_set endpoints_to_remove; slogger.debug("Node {} state normal, token {}", endpoint, tokens); @@ -1071,9 +1070,6 @@ void storage_service::handle_state_normal(inet_address endpoint) { return make_ready_future<>(); }).get(); } - if (!local_tokens_to_remove.empty()) { - db::system_keyspace::update_local_tokens(std::unordered_set(), local_tokens_to_remove).discard_result().get(); - } // Send joined notification only when this node was not a member prior to this if (!is_member) { From 602c7268cc5ad7400d04fc89282a506b3e307af5 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 15:23:09 +0200 Subject: [PATCH 05/15] storage_service::handle_state_normal: tokens_to_update* -> owned_tokens Replace the two variables: tokens_to_update_in_metadata tokens_to_update_in_system_keyspace which were exactly the same, with one variable owned_tokens. The new name describes what the variable IS instead what's it used for. Add a comment to clarify what "owned" means: those are the tokens the node chose and any collision was resolved positively for this node. Move the variable definition further down in the code, where it's actually needed. --- service/storage_service.cc | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 78c661b2b6..280087e7b9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -967,10 +967,6 @@ void storage_service::handle_state_normal(inet_address endpoint) { slogger.debug("endpoint={} handle_state_normal", endpoint); auto tokens = get_tokens_for(endpoint); - std::unordered_set tokens_to_update_in_metadata; - std::unordered_set tokens_to_update_in_system_keyspace; - std::unordered_set endpoints_to_remove; - slogger.debug("Node {} state normal, token {}", endpoint, tokens); if (_token_metadata.is_member(endpoint)) { @@ -978,6 +974,8 @@ void storage_service::handle_state_normal(inet_address endpoint) { } update_peer_info(endpoint); + std::unordered_set endpoints_to_remove; + // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). if (_gossiper.uses_host_id(endpoint)) { auto host_id = _gossiper.get_host_id(endpoint); @@ -1009,22 +1007,24 @@ void storage_service::handle_state_normal(inet_address endpoint) { } } + // Tokens owned by the handled endpoint. + // The endpoint broadcasts its set of chosen tokens. If a token was also chosen by another endpoint, + // the collision is resolved by assigning the token to the endpoint which started later. + std::unordered_set owned_tokens; + for (auto t : tokens) { // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. auto current_owner = _token_metadata.get_endpoint(t); if (!current_owner) { slogger.debug("handle_state_normal: New node {} at token {}", endpoint, t); - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); } else if (endpoint == *current_owner) { slogger.debug("handle_state_normal: endpoint={} == current_owner={} token {}", endpoint, *current_owner, t); // set state back to normal, since the node may have tried to leave, but failed and is now back up - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); } else if (_gossiper.compare_endpoint_startup(endpoint, *current_owner) > 0) { slogger.debug("handle_state_normal: endpoint={} > current_owner={}, token {}", endpoint, *current_owner, t); - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); // currentOwner is no longer current, endpoint is. Keep track of these moves, because when // a host no longer has any tokens, we'll want to remove it. std::multimap ep_to_token_copy = get_token_metadata().get_endpoint_to_token_map_for_reading(); @@ -1049,7 +1049,7 @@ void storage_service::handle_state_normal(inet_address endpoint) { // Update pending ranges after update of normal tokens immediately to avoid // a race where natural endpoint was updated to contain node A, but A was // not yet removed from pending endpoints - _token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint); + _token_metadata.update_normal_tokens(owned_tokens, endpoint); _update_pending_ranges_action.trigger_later().get(); for (auto ep : endpoints_to_remove) { @@ -1059,9 +1059,9 @@ void storage_service::handle_state_normal(inet_address endpoint) { _gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 } } - slogger.debug("handle_state_normal: endpoint={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace); - if (!tokens_to_update_in_system_keyspace.empty()) { - db::system_keyspace::update_tokens(endpoint, tokens_to_update_in_system_keyspace).then_wrapped([endpoint] (auto&& f) { + slogger.debug("handle_state_normal: endpoint={} owned_tokens = {}", endpoint, owned_tokens); + if (!owned_tokens.empty()) { + db::system_keyspace::update_tokens(endpoint, owned_tokens).then_wrapped([endpoint] (auto&& f) { try { f.get(); } catch (...) { From 36ccf72f3c8c535495433b58f5675febfcdfb21a Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Sep 2019 18:04:29 +0200 Subject: [PATCH 06/15] storage_service: remove is_survey_mode That was dead, untested code, making it unnecessarily hard to implement new features. --- service/storage_service.cc | 79 +++++++++++++------------------------- service/storage_service.hh | 4 -- 2 files changed, 27 insertions(+), 56 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 280087e7b9..08161f98c2 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -599,11 +599,9 @@ static auth::service_config auth_service_config_from_db_config(const db::config& } void storage_service::maybe_start_sys_dist_ks() { - if (!_is_survey_mode) { - supervisor::notify("starting system distributed keyspace"); - _sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get(); - _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); - } + supervisor::notify("starting system distributed keyspace"); + _sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get(); + _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); } // Runs inside seastar::async context @@ -756,35 +754,31 @@ void storage_service::join_token_ring(int delay) { MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); #endif - if (!_is_survey_mode) { - // start participating in the ring. - db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - set_tokens(_bootstrap_tokens); - // remove the existing info about the replaced node. - if (!current.empty()) { - for (auto existing : current) { - _gossiper.replaced_endpoint(existing); - } + // start participating in the ring. + db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); + set_tokens(_bootstrap_tokens); + // remove the existing info about the replaced node. + if (!current.empty()) { + for (auto existing : current) { + _gossiper.replaced_endpoint(existing); } - if (_token_metadata.sorted_tokens().empty()) { - auto err = format("join_token_ring: Sorted token in token_metadata is empty"); - slogger.error("{}", err); - throw std::runtime_error(err); - } - - _auth_service.start( - permissions_cache_config_from_db_config(_db.local().get_config()), - std::ref(cql3::get_query_processor()), - std::ref(service::get_migration_manager()), - auth_service_config_from_db_config(_db.local().get_config())).get(); - - _auth_service.invoke_on_all(&auth::service::start).get(); - - supervisor::notify("starting tracing"); - tracing::tracing::start_tracing().get(); - } else { - slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } + if (_token_metadata.sorted_tokens().empty()) { + auto err = format("join_token_ring: Sorted token in token_metadata is empty"); + slogger.error("{}", err); + throw std::runtime_error(err); + } + + _auth_service.start( + permissions_cache_config_from_db_config(_db.local().get_config()), + std::ref(cql3::get_query_processor()), + std::ref(service::get_migration_manager()), + auth_service_config_from_db_config(_db.local().get_config())).get(); + + _auth_service.invoke_on_all(&auth::service::start).get(); + + supervisor::notify("starting tracing"); + tracing::tracing::start_tracing().get(); } future<> storage_service::join_ring() { @@ -793,25 +787,6 @@ future<> storage_service::join_ring() { if (!ss._joined) { slogger.info("Joining ring by operator request"); ss.join_token_ring(0); - } else if (ss._is_survey_mode) { - auto tokens = db::system_keyspace::get_saved_tokens().get0(); - ss.set_tokens(std::move(tokens)); - db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - ss._is_survey_mode = false; - slogger.info("Leaving write survey mode and joining ring at operator request"); - if (ss._token_metadata.sorted_tokens().empty()) { - auto err = format("join_ring: Sorted token in token_metadata is empty"); - slogger.error("{}", err); - throw std::runtime_error(err); - } - - ss._auth_service.start( - permissions_cache_config_from_db_config(ss._db.local().get_config()), - std::ref(cql3::get_query_processor()), - std::ref(service::get_migration_manager()), - auth_service_config_from_db_config(ss._db.local().get_config())).get(); - - ss._auth_service.invoke_on_all(&auth::service::start).get(); } }); }); @@ -820,7 +795,7 @@ future<> storage_service::join_ring() { bool storage_service::is_joined() { // Every time we set _joined, we do it on all shards, so we can read its // value locally. - return _joined && !_is_survey_mode; + return _joined; } void storage_service::mark_existing_views_as_built() { diff --git a/service/storage_service.hh b/service/storage_service.hh index f9e961fa47..95e2e34663 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -262,10 +262,6 @@ private: /* Are we starting this node in bootstrap mode? */ bool _is_bootstrap_mode; - /* we bootstrap but do NOT join the ring unless told to do so */ - // FIXME: System.getProperty("cassandra.write_survey", "false") - bool _is_survey_mode = false; - bool _initialized; bool _joined = false; From a223864f81f771516de0f98c0cb19e60d0339307 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 17:06:01 +0200 Subject: [PATCH 07/15] storage_service: remove storage_service::set_tokens method. After commit 36ccf72f3c8c535495433b58f5675febfcdfb21a, this method was used only in one place. Its name did not make it obvious what it does and when is it safe to call it. This commit pulls out the code from set_tokens to the point where it was called (join_token_ring). The code is only possible to understand in context. This code was also saving the tokens to the LOCAL table before retrieving them from this table again. There is no point in doing that: 1. there are no races, since when join_token_ring is running, it is the only function which can call system_keyspace::update_tokens (which saves them to the LOCAL table). There can be no multiple instances of join_token_ring. 2. Even if there was a race, this wouldn't fix anything. The tokens we retrieve from LOCAL by calling get_local_tokens().get0() could already be different in the LOCAL table when the get0() returns. --- service/storage_service.cc | 20 ++++++++------------ service/storage_service.hh | 2 -- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 08161f98c2..6ac8390b81 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -754,9 +754,15 @@ void storage_service::join_token_ring(int delay) { MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); #endif - // start participating in the ring. db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - set_tokens(_bootstrap_tokens); + slogger.debug("Setting tokens to {}", _bootstrap_tokens); + db::system_keyspace::update_tokens(_bootstrap_tokens).get(); + _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); + replicate_to_all_cores().get(); + // start participating in the ring. + set_gossip_tokens(_bootstrap_tokens); + set_mode(mode::NORMAL, "node is now in normal status", true); + // remove the existing info about the replaced node. if (!current.empty()) { for (auto existing : current) { @@ -1361,16 +1367,6 @@ std::unordered_set storage_service::get_tokens_for(inet_address } // Runs inside seastar::async context -void storage_service::set_tokens(std::unordered_set tokens) { - slogger.debug("Setting tokens to {}", tokens); - db::system_keyspace::update_tokens(tokens).get(); - auto local_tokens = get_local_tokens().get0(); - _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); - replicate_to_all_cores().get(); - set_gossip_tokens(local_tokens); - set_mode(mode::NORMAL, "node is now in normal status", true); -} - void storage_service::set_gossip_tokens(const std::unordered_set& local_tokens) { _gossiper.add_local_application_state({ { gms::application_state::TOKENS, value_factory.tokens(local_tokens) }, diff --git a/service/storage_service.hh b/service/storage_service.hh index 95e2e34663..e602fe0eb7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -351,8 +351,6 @@ public: _is_bootstrap_mode = false; } - /** This method updates the local token on disk */ - void set_tokens(std::unordered_set tokens); void set_gossip_tokens(const std::unordered_set& local_tokens); #if 0 From 06cc7d409dab1a5e2b5ad4471fcda04068e3c036 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 17:41:41 +0200 Subject: [PATCH 08/15] storage_service: remove redundant call to update_tokens during join_token_ring When a non-seed node was bootstrapping, system_keyspace::update_tokens was called twice: first right after the tokens were generated (or received if we were replacing a different node) in the call to `bootstrap`, and then later in join_token_ring. The second call was redundant. The join_token_ring call was also redundant if we were not bootstrapping and had tokens saved previously (e.g. when restarting). In that case we would have read them from LOCAL and then save the same tokens again. This commit removes the redundant call and inserts calls to update_tokens where they are necessary, when new tokens are generated. The aim is to make the code easier to understand. It also adds a comment which explains why the tokens don't need to be generated in one of the cases. --- service/storage_service.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 6ac8390b81..8fef591895 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -710,9 +710,11 @@ void storage_service::join_token_ring(int delay) { std::stringstream ss; ss << _bootstrap_tokens; set_mode(mode::JOINING, format("Replacing a node with token(s): {}", ss.str()), true); + // _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node } maybe_start_sys_dist_ks(); mark_existing_views_as_built(); + db::system_keyspace::update_tokens(_bootstrap_tokens).get(); bootstrap(_bootstrap_tokens); // bootstrap will block until finished if (_is_bootstrap_mode) { @@ -740,6 +742,7 @@ void storage_service::join_token_ring(int delay) { } slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens); } + db::system_keyspace::update_tokens(_bootstrap_tokens).get(); } else { if (_bootstrap_tokens.size() != num_tokens) { throw std::runtime_error(format("Cannot change the number of tokens from {:d} to {:d}", _bootstrap_tokens.size(), num_tokens)); @@ -756,7 +759,6 @@ void storage_service::join_token_ring(int delay) { db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); slogger.debug("Setting tokens to {}", _bootstrap_tokens); - db::system_keyspace::update_tokens(_bootstrap_tokens).get(); _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); replicate_to_all_cores().get(); // start participating in the ring. @@ -819,8 +821,6 @@ void storage_service::mark_existing_views_as_built() { // Runs inside seastar::async context void storage_service::bootstrap(std::unordered_set tokens) { _is_bootstrap_mode = true; - // DON'T use set_token, that makes us part of the ring locally which is incorrect until we are done bootstrapping - db::system_keyspace::update_tokens(tokens).get(); if (!db().local().is_replacing()) { // Wait until we know tokens of existing node before announcing join status. _gossiper.wait_for_range_setup().get(); From 2ff4f9b8f4f36667dd9afc3a63e44059b5b966db Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 17:55:56 +0200 Subject: [PATCH 09/15] storage_service: remove unnecessary use of stringstream --- service/storage_service.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8fef591895..1fc046ceeb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -707,9 +707,7 @@ void storage_service::join_token_ring(int delay) { } else { sleep_abortable(get_ring_delay(), _abort_source).get(); } - std::stringstream ss; - ss << _bootstrap_tokens; - set_mode(mode::JOINING, format("Replacing a node with token(s): {}", ss.str()), true); + set_mode(mode::JOINING, format("Replacing a node with token(s): {}", _bootstrap_tokens), true); // _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node } maybe_start_sys_dist_ks(); From 84b41bd89b19ba6d5885f72755113ac226505ea4 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 7 Oct 2019 13:24:59 +0200 Subject: [PATCH 10/15] storage_service: fix typo in handle_state_moving --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 1fc046ceeb..f836285bed 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1112,7 +1112,7 @@ void storage_service::handle_state_left(inet_address endpoint, std::vector pieces) { - throw std::runtime_error(format("Move opeartion is not supported only more, endpoint={}", endpoint)); + throw std::runtime_error(format("Move operation is not supported anymore, endpoint={}", endpoint)); } void storage_service::handle_state_removing(inet_address endpoint, std::vector pieces) { From b757a19f846ebaefe35272fd4feeb63ca197aa82 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 18:48:15 +0200 Subject: [PATCH 11/15] storage_service: simplify storage_service::bootstrap method The storage_service::bootstrap method took a parameter: tokens to bootstrap with. However, this method is only called in one place (join_token_ring) with only one parameter: _bootstrap_tokens. It doesn't make sense to call this method anywhere else with any other parameter. This commit also adds a comment explaining what the method does and moves it into the private section of storage_service. --- service/storage_service.cc | 17 +++++++++-------- service/storage_service.hh | 8 ++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f836285bed..60d2814667 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -713,7 +713,7 @@ void storage_service::join_token_ring(int delay) { maybe_start_sys_dist_ks(); mark_existing_views_as_built(); db::system_keyspace::update_tokens(_bootstrap_tokens).get(); - bootstrap(_bootstrap_tokens); + bootstrap(); // bootstrap will block until finished if (_is_bootstrap_mode) { auto err = format("We are not supposed in bootstrap mode any more"); @@ -817,21 +817,21 @@ void storage_service::mark_existing_views_as_built() { } // Runs inside seastar::async context -void storage_service::bootstrap(std::unordered_set tokens) { +void storage_service::bootstrap() { _is_bootstrap_mode = true; if (!db().local().is_replacing()) { // Wait until we know tokens of existing node before announcing join status. _gossiper.wait_for_range_setup().get(); // if not an existing token then bootstrap _gossiper.add_local_application_state({ - { gms::application_state::TOKENS, value_factory.tokens(tokens) }, - { gms::application_state::STATUS, value_factory.bootstrapping(tokens) }, + { gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens) }, + { gms::application_state::STATUS, value_factory.bootstrapping(_bootstrap_tokens) }, }).get(); set_mode(mode::JOINING, format("sleeping {} ms for pending range setup", get_ring_delay().count()), true); _gossiper.wait_for_range_setup().get(); } else { // Dont set any state for the node which is bootstrapping the existing token... - _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); + _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); replicate_to_all_cores().get(); auto replace_addr = db().local().get_replace_address(); if (replace_addr) { @@ -843,9 +843,10 @@ void storage_service::bootstrap(std::unordered_set tokens) { _gossiper.check_seen_seeds(); set_mode(mode::JOINING, "Starting to bootstrap...", true); - dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), tokens, _token_metadata); - bs.bootstrap().get(); // handles token update - slogger.info("Bootstrap completed! for the tokens {}", tokens); + dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); + // Does the actual streaming of newly replicated token ranges. + bs.bootstrap().get(); + slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens); } sstring diff --git a/service/storage_service.hh b/service/storage_service.hh index e602fe0eb7..984a119efc 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -547,9 +547,13 @@ private: void set_mode(mode m, bool log); void set_mode(mode m, sstring msg, bool log); void mark_existing_views_as_built(); -public: - void bootstrap(std::unordered_set tokens); + // Stream data for which we become a new replica. + // Before that, if we're not replacing another node, inform other nodes about our chosen tokens (_bootstrap_tokens) + // and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming. + void bootstrap(); + +public: bool is_bootstrap_mode() { return _is_bootstrap_mode; } From dbca327b469184ea19d0b11f15f323dcdea8b26b Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 19:07:51 +0200 Subject: [PATCH 12/15] storage_service: remove storage_service::_is_bootstrap_mode. The flag did nothing. It was used in one place to check if there's a bug, but it can easily by proven by reading the code that the check would never pass. --- dht/boot_strapper.cc | 4 +--- service/storage_service.cc | 6 ------ service/storage_service.hh | 11 ----------- 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index be24b9119e..4eabe492b9 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -62,9 +62,7 @@ future<> boot_strapper::bootstrap() { return streamer->add_ranges(keyspace_name, ranges); }).then([this, streamer] { _abort_source.check(); - return streamer->stream_async().then([streamer] () { - service::get_local_storage_service().finish_bootstrapping(); - }).handle_exception([streamer] (std::exception_ptr eptr) { + return streamer->stream_async().handle_exception([streamer] (std::exception_ptr eptr) { blogger.warn("Error during bootstrap: {}", eptr); return make_exception_future<>(std::move(eptr)); }); diff --git a/service/storage_service.cc b/service/storage_service.cc index 60d2814667..0cc05bea14 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -715,11 +715,6 @@ void storage_service::join_token_ring(int delay) { db::system_keyspace::update_tokens(_bootstrap_tokens).get(); bootstrap(); // bootstrap will block until finished - if (_is_bootstrap_mode) { - auto err = format("We are not supposed in bootstrap mode any more"); - slogger.warn("{}", err); - throw std::runtime_error(err); - } } else { maybe_start_sys_dist_ks(); size_t num_tokens = _db.local().get_config().num_tokens(); @@ -818,7 +813,6 @@ void storage_service::mark_existing_views_as_built() { // Runs inside seastar::async context void storage_service::bootstrap() { - _is_bootstrap_mode = true; if (!db().local().is_replacing()) { // Wait until we know tokens of existing node before announcing join status. _gossiper.wait_for_range_setup().get(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 984a119efc..3a480390ca 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -259,9 +259,6 @@ private: std::optional _removing_node; - /* Are we starting this node in bootstrap mode? */ - bool _is_bootstrap_mode; - bool _initialized; bool _joined = false; @@ -347,10 +344,6 @@ public: sstables::sstable_version_types sstables_format() const { return _sstables_format; } void enable_all_features(); - void finish_bootstrapping() { - _is_bootstrap_mode = false; - } - void set_gossip_tokens(const std::unordered_set& local_tokens); #if 0 @@ -554,10 +547,6 @@ private: void bootstrap(); public: - bool is_bootstrap_mode() { - return _is_bootstrap_mode; - } - #if 0 public TokenMetadata getTokenMetadata() From 1b0c8e5d995c827b53a8b5b50602c1aa06f331dd Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 16:06:22 +0200 Subject: [PATCH 13/15] db: improve documentation for update_tokens and get_saved_tokens in system_keyspace --- db/system_keyspace.cc | 6 ------ db/system_keyspace.hh | 12 ++++++++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 3e547d44bc..207038dc57 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1472,9 +1472,6 @@ std::unordered_set decode_tokens(set_type_impl::native_type& tokens) return tset; } -/** - * Record tokens being used by another node - */ future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens) { if (ep == utils::fb_utilities::get_broadcast_address()) { @@ -1648,9 +1645,6 @@ future<> remove_endpoint(gms::inet_address ep) { }); } - /** - * This method is used to update the System Keyspace with the new tokens for this node - */ future<> update_tokens(const std::unordered_set& tokens) { if (tokens.empty()) { throw std::invalid_argument("remove_endpoint should be used instead"); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 57afb22206..be30b989d8 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -162,7 +162,15 @@ future<> setup(distributed& db, distributed& qp, distributed& ss); future<> update_schema_version(utils::UUID version); + +/* + * Save tokens used by this node in the LOCAL table. + */ future<> update_tokens(const std::unordered_set& tokens); + +/** + * Record tokens being used by another node in the PEERS table. + */ future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens); future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip); @@ -500,6 +508,10 @@ enum class bootstrap_state { */ future> load_host_ids(); + /* + * Read this node's tokens stored in the LOCAL table. + * Used to initialize a restarting node. + */ future> get_saved_tokens(); future> load_peer_features(); From fb1e35f032f9a31e855deb0ac0584afa79cb8fe1 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 4 Oct 2019 16:30:56 +0200 Subject: [PATCH 14/15] db: remove system_keyspace::update_local_tokens That was dead code. --- db/system_keyspace.cc | 15 --------------- db/system_keyspace.hh | 11 ----------- 2 files changed, 26 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 207038dc57..c06f6c4a2d 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1485,21 +1485,6 @@ future<> update_tokens(gms::inet_address ep, const std::unordered_set> update_local_tokens( - const std::unordered_set add_tokens, - const std::unordered_set rm_tokens) { - return get_saved_tokens().then([add_tokens = std::move(add_tokens), rm_tokens = std::move(rm_tokens)] (auto tokens) { - for (auto& x : rm_tokens) { - tokens.erase(x); - } - for (auto& x : add_tokens) { - tokens.insert(x); - } - return update_tokens(tokens).then([tokens] { - return tokens; - }); - }); -} future>> load_tokens() { sstring req = format("SELECT peer, tokens FROM system.{}", PEERS); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index be30b989d8..3aece0ab94 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -485,17 +485,6 @@ enum class bootstrap_state { } #endif - /** - * Convenience method to update the list of tokens in the local system keyspace. - * - * @param addTokens tokens to add - * @param rmTokens tokens to remove - * @return the collection of persisted tokens - */ - future> update_local_tokens( - const std::unordered_set add_tokens, - const std::unordered_set rm_tokens); - /** * Return a map of stored tokens to IP addresses * From f1c26bf5c9214916b318e86813c4a31a8c3c898e Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 17 Oct 2019 16:19:23 +0200 Subject: [PATCH 15/15] storage_service: more comments in join_token_ring Explain why a call to update_normal_tokens is needed. --- service/storage_service.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 0cc05bea14..db11287887 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -750,9 +750,15 @@ void storage_service::join_token_ring(int delay) { MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); #endif + // At this point our local tokens are chosen (_bootstrap_tokens) and will not be changed unless we bootstrap again. db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); + slogger.debug("Setting tokens to {}", _bootstrap_tokens); + // This node must know about its chosen tokens before other nodes do + // since they may start sending writes to this node after it gossips status = NORMAL. + // Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now. _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); + replicate_to_all_cores().get(); // start participating in the ring. set_gossip_tokens(_bootstrap_tokens);