diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 2d8d9faa45..bb87d06f1e 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -117,6 +117,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh auto tm = get_token_metadata().clone_only_token_map().get0(); auto range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes); + tm.clear_gently().get(); logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size()); @@ -163,6 +164,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n //Pending ranges metadata_clone.update_normal_tokens(_tokens, _address).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); + metadata_clone.clear_gently().get(); //Collects the source that will have its range moved to the new node std::unordered_map> range_sources; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index d97e35c65e..aae401ca9c 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -304,6 +304,9 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p for (auto& x : get_address_ranges(temp, pending_address, can_yield)) { ret.push_back(x.second); } + if (can_yield) { + temp.clear_gently().get(); + } return ret; } diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 3a97d03a82..e4491e6afb 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -448,6 +448,12 @@ public: }); } + /** + * Destroy the token_metadata members using continuations + * to prevent reactor stalls. + */ + future<> clear_gently() noexcept; + public: dht::token_range_vector get_primary_ranges_for(std::unordered_set tokens) const; @@ -1009,6 +1015,43 @@ future token_metadata_impl::clone_only_token_map(bool clone }); } +template +static future<> clear_container_gently(Container& c) noexcept; + +// The vector elements we use here (token / inet_address) have trivial destructors +// so they can be safely cleared in bulk +template > +static future<> clear_container_gently(Container& vect) noexcept { + vect.clear(); + return make_ready_future<>(); +} + +template +static future<> clear_container_gently(Container& c) noexcept { + for (auto b = c.begin(); b != c.end(); b = c.erase(b)) { + co_await make_ready_future<>(); // maybe yield + } +} + +template +static future<> clear_nested_container_gently(Container& c) noexcept { + for (auto b = c.begin(); b != c.end(); b = c.erase(b)) { + co_await clear_container_gently(b->second); + } +} + +future<> token_metadata_impl::clear_gently() noexcept { + co_await clear_container_gently(_token_to_endpoint_map); + co_await clear_container_gently(_endpoint_to_host_id_map); + co_await clear_container_gently(_bootstrap_tokens); + co_await clear_container_gently(_leaving_endpoints); + co_await clear_container_gently(_replacing_endpoints); + co_await clear_container_gently(_pending_ranges_interval_map); + co_await clear_container_gently(_sorted_tokens); + co_await _topology.clear_gently(); + co_return; +} + void token_metadata_impl::sort_tokens() { std::vector sorted; sorted.reserve(_token_to_endpoint_map.size()); @@ -1518,6 +1561,7 @@ void token_metadata_impl::calculate_pending_ranges_for_leaving( new_pending_ranges.emplace(r, ep); } } + metadata.clear_gently().get(); tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges_size); } @@ -1594,6 +1638,7 @@ future<> token_metadata_impl::update_pending_ranges( // At this stage newPendingRanges has been updated according to leave operations. We can // now continue the calculation by checking bootstrapping nodes. calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata); + all_left_metadata->clear_gently().get(); // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. set_pending_ranges(keyspace_name, std::move(new_pending_ranges), can_yield::yes); @@ -1951,6 +1996,10 @@ token_metadata::clone_after_all_left() const noexcept { }); } +future<> token_metadata::clear_gently() noexcept { + return _impl->clear_gently(); +} + dht::token_range_vector token_metadata::get_primary_ranges_for(std::unordered_set tokens) const { return _impl->get_primary_ranges_for(std::move(tokens)); @@ -2027,10 +2076,11 @@ token_metadata::invalidate_cached_rings() { } /////////////////// class topology ///////////////////////////////////////////// -inline void topology::clear() { - _dc_endpoints.clear(); - _dc_racks.clear(); - _current_locations.clear(); +inline future<> topology::clear_gently() noexcept { + co_await clear_nested_container_gently(_dc_endpoints); + co_await clear_nested_container_gently(_dc_racks); + co_await clear_container_gently(_current_locations); + co_return; } topology::topology(const topology& other) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index ff41e9e9f8..6fe5994c4d 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -73,7 +73,7 @@ public: topology() {} topology(const topology& other); - void clear(); + future<> clear_gently() noexcept; /** * Stores current DC/rack assignment for ep @@ -286,6 +286,12 @@ public: */ future clone_after_all_left() const noexcept; + /** + * Gently clear the token_metadata members. + * Yield if needed to prevent reactor stalls. + */ + future<> clear_gently() noexcept; + dht::token_range_vector get_primary_ranges_for(std::unordered_set tokens) const; dht::token_range_vector get_primary_ranges_for(token right) const; diff --git a/repair/repair.cc b/repair/repair.cc index b8539a00f6..16653edb05 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1812,6 +1812,7 @@ future<> bootstrap_with_repair(seastar::sharded& db, seastar::sharded< //Pending ranges metadata_clone.update_normal_tokens(tokens, myip).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); + metadata_clone.clear_gently().get(); //Collects the source that will have its range moved to the new node std::unordered_map range_sources; @@ -2110,6 +2111,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt }).then_wrapped([this] (future<> f) { if (f.failed()) { return container().invoke_on_all([] (storage_service& ss) { - ss._pending_token_metadata_ptr = {}; + if (auto tmptr = std::move(ss._pending_token_metadata_ptr)) { + return tmptr->clear_gently().then_wrapped([tmptr = std::move(tmptr)] (future<> f) { + if (f.failed()) { + slogger.warn("Failure to reset pending token_metadata in cleanup path: {}. Ignored.", f.get_exception()); + } + }); + } else { + return make_ready_future<>(); + } }).finally([ep = f.get_exception()] () mutable { return make_exception_future<>(std::move(ep)); }); @@ -2116,7 +2124,10 @@ future<> storage_service::decommission() { throw std::runtime_error("local node is not a member of the token ring yet"); } - if (tmptr->clone_after_all_left().get0().sorted_tokens().size() < 2) { + auto temp = tmptr->clone_after_all_left().get0(); + auto num_tokens_after_all_left = temp.sorted_tokens().size(); + temp.clear_gently().get(); + if (num_tokens_after_all_left < 2) { throw std::runtime_error("no other normal nodes in the ring; decommission would be pointless"); } @@ -2544,6 +2555,7 @@ std::unordered_multimap storage_service::get_cha changed_ranges.emplace(r, ep); } } + temp.clear_gently().get(); return changed_ranges; }