diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 6b0ad8e7b6..c06f6c4a2d 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1454,7 +1454,7 @@ future get_truncated_at(utils::UUID cf_id) { }); } -set_type_impl::native_type prepare_tokens(std::unordered_set& tokens) { +static set_type_impl::native_type prepare_tokens(const std::unordered_set& tokens) { set_type_impl::native_type tset; for (auto& t: tokens) { tset.push_back(dht::global_partitioner().to_sstring(t)); @@ -1472,10 +1472,7 @@ std::unordered_set decode_tokens(set_type_impl::native_type& tokens) return tset; } -/** - * Record tokens being used by another node - */ -future<> update_tokens(gms::inet_address ep, std::unordered_set tokens) +future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens) { if (ep == utils::fb_utilities::get_broadcast_address()) { return remove_endpoint(ep); @@ -1488,21 +1485,6 @@ future<> update_tokens(gms::inet_address ep, std::unordered_set toke }); } -future> update_local_tokens( - const std::unordered_set add_tokens, - const std::unordered_set rm_tokens) { - return get_saved_tokens().then([add_tokens = std::move(add_tokens), rm_tokens = std::move(rm_tokens)] (auto tokens) { - for (auto& x : rm_tokens) { - tokens.erase(x); - } - for (auto& x : add_tokens) { - tokens.insert(x); - } - return update_tokens(tokens).then([tokens] { - return tokens; - }); - }); -} future>> load_tokens() { sstring req = format("SELECT peer, tokens FROM system.{}", PEERS); @@ -1648,10 +1630,7 @@ future<> remove_endpoint(gms::inet_address ep) { }); } - /** - * This method is used to update the System Keyspace with the new tokens for this node - */ -future<> update_tokens(std::unordered_set tokens) { +future<> update_tokens(const std::unordered_set& tokens) { if (tokens.empty()) { throw std::invalid_argument("remove_endpoint should be used instead"); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 48ee1b877c..3aece0ab94 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -162,8 +162,16 @@ future<> setup(distributed& db, distributed& qp, distributed& ss); future<> update_schema_version(utils::UUID version); -future<> update_tokens(std::unordered_set tokens); -future<> update_tokens(gms::inet_address ep, std::unordered_set tokens); + +/* + * Save tokens used by this node in the LOCAL table. + */ +future<> update_tokens(const std::unordered_set& tokens); + +/** + * Record tokens being used by another node in the PEERS table. + */ +future<> update_tokens(gms::inet_address ep, const std::unordered_set& tokens); future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip); future> get_preferred_ips(); @@ -477,17 +485,6 @@ enum class bootstrap_state { } #endif - /** - * Convenience method to update the list of tokens in the local system keyspace. - * - * @param addTokens tokens to add - * @param rmTokens tokens to remove - * @return the collection of persisted tokens - */ - future> update_local_tokens( - const std::unordered_set add_tokens, - const std::unordered_set rm_tokens); - /** * Return a map of stored tokens to IP addresses * @@ -500,6 +497,10 @@ enum class bootstrap_state { */ future> load_host_ids(); + /* + * Read this node's tokens stored in the LOCAL table. + * Used to initialize a restarting node. + */ future> get_saved_tokens(); future> load_peer_features(); diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index be24b9119e..4eabe492b9 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -62,9 +62,7 @@ future<> boot_strapper::bootstrap() { return streamer->add_ranges(keyspace_name, ranges); }).then([this, streamer] { _abort_source.check(); - return streamer->stream_async().then([streamer] () { - service::get_local_storage_service().finish_bootstrapping(); - }).handle_exception([streamer] (std::exception_ptr eptr) { + return streamer->stream_async().handle_exception([streamer] (std::exception_ptr eptr) { blogger.warn("Error during bootstrap: {}", eptr); return make_exception_future<>(std::move(eptr)); }); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index f5769f2cd3..c9b0c8913e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -102,7 +102,7 @@ void token_metadata::update_normal_tokens(std::unordered_set tokens, inet * * @param endpointTokens */ -void token_metadata::update_normal_tokens(std::unordered_map>& endpoint_tokens) { +void token_metadata::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { if (endpoint_tokens.empty()) { return; } @@ -110,7 +110,7 @@ void token_metadata::update_normal_tokens(std::unordered_map& tokens = i.second; + const auto& tokens = i.second; if (tokens.empty()) { auto msg = format("tokens is empty in update_normal_tokens"); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 04fba6ab94..e5a86007d0 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -234,7 +234,7 @@ public: const std::vector& sorted_tokens() const; void update_normal_token(token token, inet_address endpoint); void update_normal_tokens(std::unordered_set tokens, inet_address endpoint); - void update_normal_tokens(std::unordered_map>& endpoint_tokens); + void update_normal_tokens(const std::unordered_map>& endpoint_tokens); const token& first_token(const token& start) const; size_t first_token_index(const token& start) const; std::optional get_endpoint(const token& token) const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 5ef721238a..db11287887 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -599,11 +599,9 @@ static auth::service_config auth_service_config_from_db_config(const db::config& } void storage_service::maybe_start_sys_dist_ks() { - if (!_is_survey_mode) { - supervisor::notify("starting system distributed keyspace"); - _sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get(); - _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); - } + supervisor::notify("starting system distributed keyspace"); + _sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get(); + _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); } // Runs inside seastar::async context @@ -709,19 +707,14 @@ void storage_service::join_token_ring(int delay) { } else { sleep_abortable(get_ring_delay(), _abort_source).get(); } - std::stringstream ss; - ss << _bootstrap_tokens; - set_mode(mode::JOINING, format("Replacing a node with token(s): {}", ss.str()), true); + set_mode(mode::JOINING, format("Replacing a node with token(s): {}", _bootstrap_tokens), true); + // _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node } maybe_start_sys_dist_ks(); mark_existing_views_as_built(); - bootstrap(_bootstrap_tokens); + db::system_keyspace::update_tokens(_bootstrap_tokens).get(); + bootstrap(); // bootstrap will block until finished - if (_is_bootstrap_mode) { - auto err = format("We are not supposed in bootstrap mode any more"); - slogger.warn("{}", err); - throw std::runtime_error(err); - } } else { maybe_start_sys_dist_ks(); size_t num_tokens = _db.local().get_config().num_tokens(); @@ -742,6 +735,7 @@ void storage_service::join_token_ring(int delay) { } slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens); } + db::system_keyspace::update_tokens(_bootstrap_tokens).get(); } else { if (_bootstrap_tokens.size() != num_tokens) { throw std::runtime_error(format("Cannot change the number of tokens from {:d} to {:d}", _bootstrap_tokens.size(), num_tokens)); @@ -756,35 +750,42 @@ void storage_service::join_token_ring(int delay) { MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false); #endif - if (!_is_survey_mode) { - // start participating in the ring. - db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - set_tokens(_bootstrap_tokens); - // remove the existing info about the replaced node. - if (!current.empty()) { - for (auto existing : current) { - _gossiper.replaced_endpoint(existing); - } + // At this point our local tokens are chosen (_bootstrap_tokens) and will not be changed unless we bootstrap again. + db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); + + slogger.debug("Setting tokens to {}", _bootstrap_tokens); + // This node must know about its chosen tokens before other nodes do + // since they may start sending writes to this node after it gossips status = NORMAL. + // Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now. + _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); + + replicate_to_all_cores().get(); + // start participating in the ring. + set_gossip_tokens(_bootstrap_tokens); + set_mode(mode::NORMAL, "node is now in normal status", true); + + // remove the existing info about the replaced node. + if (!current.empty()) { + for (auto existing : current) { + _gossiper.replaced_endpoint(existing); } - if (_token_metadata.sorted_tokens().empty()) { - auto err = format("join_token_ring: Sorted token in token_metadata is empty"); - slogger.error("{}", err); - throw std::runtime_error(err); - } - - _auth_service.start( - permissions_cache_config_from_db_config(_db.local().get_config()), - std::ref(cql3::get_query_processor()), - std::ref(service::get_migration_manager()), - auth_service_config_from_db_config(_db.local().get_config())).get(); - - _auth_service.invoke_on_all(&auth::service::start).get(); - - supervisor::notify("starting tracing"); - tracing::tracing::start_tracing().get(); - } else { - slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } + if (_token_metadata.sorted_tokens().empty()) { + auto err = format("join_token_ring: Sorted token in token_metadata is empty"); + slogger.error("{}", err); + throw std::runtime_error(err); + } + + _auth_service.start( + permissions_cache_config_from_db_config(_db.local().get_config()), + std::ref(cql3::get_query_processor()), + std::ref(service::get_migration_manager()), + auth_service_config_from_db_config(_db.local().get_config())).get(); + + _auth_service.invoke_on_all(&auth::service::start).get(); + + supervisor::notify("starting tracing"); + tracing::tracing::start_tracing().get(); } future<> storage_service::join_ring() { @@ -793,25 +794,6 @@ future<> storage_service::join_ring() { if (!ss._joined) { slogger.info("Joining ring by operator request"); ss.join_token_ring(0); - } else if (ss._is_survey_mode) { - auto tokens = db::system_keyspace::get_saved_tokens().get0(); - ss.set_tokens(std::move(tokens)); - db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - ss._is_survey_mode = false; - slogger.info("Leaving write survey mode and joining ring at operator request"); - if (ss._token_metadata.sorted_tokens().empty()) { - auto err = format("join_ring: Sorted token in token_metadata is empty"); - slogger.error("{}", err); - throw std::runtime_error(err); - } - - ss._auth_service.start( - permissions_cache_config_from_db_config(ss._db.local().get_config()), - std::ref(cql3::get_query_processor()), - std::ref(service::get_migration_manager()), - auth_service_config_from_db_config(ss._db.local().get_config())).get(); - - ss._auth_service.invoke_on_all(&auth::service::start).get(); } }); }); @@ -820,7 +802,7 @@ future<> storage_service::join_ring() { bool storage_service::is_joined() { // Every time we set _joined, we do it on all shards, so we can read its // value locally. - return _joined && !_is_survey_mode; + return _joined; } void storage_service::mark_existing_views_as_built() { @@ -836,23 +818,20 @@ void storage_service::mark_existing_views_as_built() { } // Runs inside seastar::async context -void storage_service::bootstrap(std::unordered_set tokens) { - _is_bootstrap_mode = true; - // DON'T use set_token, that makes us part of the ring locally which is incorrect until we are done bootstrapping - db::system_keyspace::update_tokens(tokens).get(); +void storage_service::bootstrap() { if (!db().local().is_replacing()) { // Wait until we know tokens of existing node before announcing join status. _gossiper.wait_for_range_setup().get(); // if not an existing token then bootstrap _gossiper.add_local_application_state({ - { gms::application_state::TOKENS, value_factory.tokens(tokens) }, - { gms::application_state::STATUS, value_factory.bootstrapping(tokens) }, + { gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens) }, + { gms::application_state::STATUS, value_factory.bootstrapping(_bootstrap_tokens) }, }).get(); set_mode(mode::JOINING, format("sleeping {} ms for pending range setup", get_ring_delay().count()), true); _gossiper.wait_for_range_setup().get(); } else { // Dont set any state for the node which is bootstrapping the existing token... - _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); + _token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); replicate_to_all_cores().get(); auto replace_addr = db().local().get_replace_address(); if (replace_addr) { @@ -864,9 +843,10 @@ void storage_service::bootstrap(std::unordered_set tokens) { _gossiper.check_seen_seeds(); set_mode(mode::JOINING, "Starting to bootstrap...", true); - dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), tokens, _token_metadata); - bs.bootstrap().get(); // handles token update - slogger.info("Bootstrap completed! for the tokens {}", tokens); + dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); + // Does the actual streaming of newly replicated token ranges. + bs.bootstrap().get(); + slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens); } sstring @@ -967,11 +947,6 @@ void storage_service::handle_state_normal(inet_address endpoint) { slogger.debug("endpoint={} handle_state_normal", endpoint); auto tokens = get_tokens_for(endpoint); - std::unordered_set tokens_to_update_in_metadata; - std::unordered_set tokens_to_update_in_system_keyspace; - std::unordered_set local_tokens_to_remove; - std::unordered_set endpoints_to_remove; - slogger.debug("Node {} state normal, token {}", endpoint, tokens); if (_token_metadata.is_member(endpoint)) { @@ -979,6 +954,8 @@ void storage_service::handle_state_normal(inet_address endpoint) { } update_peer_info(endpoint); + std::unordered_set endpoints_to_remove; + // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). if (_gossiper.uses_host_id(endpoint)) { auto host_id = _gossiper.get_host_id(endpoint); @@ -1010,22 +987,24 @@ void storage_service::handle_state_normal(inet_address endpoint) { } } + // Tokens owned by the handled endpoint. + // The endpoint broadcasts its set of chosen tokens. If a token was also chosen by another endpoint, + // the collision is resolved by assigning the token to the endpoint which started later. + std::unordered_set owned_tokens; + for (auto t : tokens) { // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. auto current_owner = _token_metadata.get_endpoint(t); if (!current_owner) { slogger.debug("handle_state_normal: New node {} at token {}", endpoint, t); - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); } else if (endpoint == *current_owner) { slogger.debug("handle_state_normal: endpoint={} == current_owner={} token {}", endpoint, *current_owner, t); // set state back to normal, since the node may have tried to leave, but failed and is now back up - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); } else if (_gossiper.compare_endpoint_startup(endpoint, *current_owner) > 0) { slogger.debug("handle_state_normal: endpoint={} > current_owner={}, token {}", endpoint, *current_owner, t); - tokens_to_update_in_metadata.insert(t); - tokens_to_update_in_system_keyspace.insert(t); + owned_tokens.insert(t); // currentOwner is no longer current, endpoint is. Keep track of these moves, because when // a host no longer has any tokens, we'll want to remove it. std::multimap ep_to_token_copy = get_token_metadata().get_endpoint_to_token_map_for_reading(); @@ -1050,7 +1029,7 @@ void storage_service::handle_state_normal(inet_address endpoint) { // Update pending ranges after update of normal tokens immediately to avoid // a race where natural endpoint was updated to contain node A, but A was // not yet removed from pending endpoints - _token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint); + _token_metadata.update_normal_tokens(owned_tokens, endpoint); _update_pending_ranges_action.trigger_later().get(); for (auto ep : endpoints_to_remove) { @@ -1060,9 +1039,9 @@ void storage_service::handle_state_normal(inet_address endpoint) { _gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 } } - slogger.debug("handle_state_normal: endpoint={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace); - if (!tokens_to_update_in_system_keyspace.empty()) { - db::system_keyspace::update_tokens(endpoint, tokens_to_update_in_system_keyspace).then_wrapped([endpoint] (auto&& f) { + slogger.debug("handle_state_normal: endpoint={} owned_tokens = {}", endpoint, owned_tokens); + if (!owned_tokens.empty()) { + db::system_keyspace::update_tokens(endpoint, owned_tokens).then_wrapped([endpoint] (auto&& f) { try { f.get(); } catch (...) { @@ -1071,9 +1050,6 @@ void storage_service::handle_state_normal(inet_address endpoint) { return make_ready_future<>(); }).get(); } - if (!local_tokens_to_remove.empty()) { - db::system_keyspace::update_local_tokens(std::unordered_set(), local_tokens_to_remove).discard_result().get(); - } // Send joined notification only when this node was not a member prior to this if (!is_member) { @@ -1137,7 +1113,7 @@ void storage_service::handle_state_left(inet_address endpoint, std::vector pieces) { - throw std::runtime_error(format("Move opeartion is not supported only more, endpoint={}", endpoint)); + throw std::runtime_error(format("Move operation is not supported anymore, endpoint={}", endpoint)); } void storage_service::handle_state_removing(inet_address endpoint, std::vector pieces) { @@ -1390,16 +1366,6 @@ std::unordered_set storage_service::get_tokens_for(inet_address } // Runs inside seastar::async context -void storage_service::set_tokens(std::unordered_set tokens) { - slogger.debug("Setting tokens to {}", tokens); - db::system_keyspace::update_tokens(tokens).get(); - auto local_tokens = get_local_tokens().get0(); - _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); - replicate_to_all_cores().get(); - set_gossip_tokens(local_tokens); - set_mode(mode::NORMAL, "node is now in normal status", true); -} - void storage_service::set_gossip_tokens(const std::unordered_set& local_tokens) { _gossiper.add_local_application_state({ { gms::application_state::TOKENS, value_factory.tokens(local_tokens) }, diff --git a/service/storage_service.hh b/service/storage_service.hh index f9e961fa47..3a480390ca 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -259,13 +259,6 @@ private: std::optional _removing_node; - /* Are we starting this node in bootstrap mode? */ - bool _is_bootstrap_mode; - - /* we bootstrap but do NOT join the ring unless told to do so */ - // FIXME: System.getProperty("cassandra.write_survey", "false") - bool _is_survey_mode = false; - bool _initialized; bool _joined = false; @@ -351,12 +344,6 @@ public: sstables::sstable_version_types sstables_format() const { return _sstables_format; } void enable_all_features(); - void finish_bootstrapping() { - _is_bootstrap_mode = false; - } - - /** This method updates the local token on disk */ - void set_tokens(std::unordered_set tokens); void set_gossip_tokens(const std::unordered_set& local_tokens); #if 0 @@ -553,13 +540,13 @@ private: void set_mode(mode m, bool log); void set_mode(mode m, sstring msg, bool log); void mark_existing_views_as_built(); + + // Stream data for which we become a new replica. + // Before that, if we're not replacing another node, inform other nodes about our chosen tokens (_bootstrap_tokens) + // and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming. + void bootstrap(); + public: - void bootstrap(std::unordered_set tokens); - - bool is_bootstrap_mode() { - return _is_bootstrap_mode; - } - #if 0 public TokenMetadata getTokenMetadata()