token_metadata: add clear_gently

clear_gently gently clears the token_metadata members.
It uses continuations to allow yielding if needed
to prevent reactor stalls.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-12-17 10:32:25 +02:00
parent 56aa49ca81
commit 322aa2f8b5
6 changed files with 82 additions and 7 deletions

View File

@@ -117,6 +117,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh
auto tm = get_token_metadata().clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes);
tm.clear_gently().get();
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
@@ -163,6 +164,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
//Pending ranges
metadata_clone.update_normal_tokens(_tokens, _address).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
metadata_clone.clear_gently().get();
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;

View File

@@ -304,6 +304,9 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
for (auto& x : get_address_ranges(temp, pending_address, can_yield)) {
ret.push_back(x.second);
}
if (can_yield) {
temp.clear_gently().get();
}
return ret;
}

View File

@@ -448,6 +448,12 @@ public:
});
}
/**
* Destroy the token_metadata members using continuations
* to prevent reactor stalls.
*/
future<> clear_gently() noexcept;
public:
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
@@ -1009,6 +1015,43 @@ future<token_metadata_impl> token_metadata_impl::clone_only_token_map(bool clone
});
}
template <typename Container>
static future<> clear_container_gently(Container& c) noexcept;
// The vector elements we use here (token / inet_address) have trivial destructors
// so they can be safely cleared in bulk
template <typename T, typename Container = std::vector<T>>
static future<> clear_container_gently(Container& vect) noexcept {
vect.clear();
return make_ready_future<>();
}
template <typename Container>
static future<> clear_container_gently(Container& c) noexcept {
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
co_await make_ready_future<>(); // maybe yield
}
}
template <typename Container>
static future<> clear_nested_container_gently(Container& c) noexcept {
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
co_await clear_container_gently(b->second);
}
}
future<> token_metadata_impl::clear_gently() noexcept {
co_await clear_container_gently(_token_to_endpoint_map);
co_await clear_container_gently(_endpoint_to_host_id_map);
co_await clear_container_gently(_bootstrap_tokens);
co_await clear_container_gently(_leaving_endpoints);
co_await clear_container_gently(_replacing_endpoints);
co_await clear_container_gently(_pending_ranges_interval_map);
co_await clear_container_gently(_sorted_tokens);
co_await _topology.clear_gently();
co_return;
}
void token_metadata_impl::sort_tokens() {
std::vector<token> sorted;
sorted.reserve(_token_to_endpoint_map.size());
@@ -1518,6 +1561,7 @@ void token_metadata_impl::calculate_pending_ranges_for_leaving(
new_pending_ranges.emplace(r, ep);
}
}
metadata.clear_gently().get();
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges_size);
}
@@ -1594,6 +1638,7 @@ future<> token_metadata_impl::update_pending_ranges(
// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.
calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata);
all_left_metadata->clear_gently().get();
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
set_pending_ranges(keyspace_name, std::move(new_pending_ranges), can_yield::yes);
@@ -1951,6 +1996,10 @@ token_metadata::clone_after_all_left() const noexcept {
});
}
future<> token_metadata::clear_gently() noexcept {
return _impl->clear_gently();
}
dht::token_range_vector
token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) const {
return _impl->get_primary_ranges_for(std::move(tokens));
@@ -2027,10 +2076,11 @@ token_metadata::invalidate_cached_rings() {
}
/////////////////// class topology /////////////////////////////////////////////
inline void topology::clear() {
_dc_endpoints.clear();
_dc_racks.clear();
_current_locations.clear();
inline future<> topology::clear_gently() noexcept {
co_await clear_nested_container_gently(_dc_endpoints);
co_await clear_nested_container_gently(_dc_racks);
co_await clear_container_gently(_current_locations);
co_return;
}
topology::topology(const topology& other) {

View File

@@ -73,7 +73,7 @@ public:
topology() {}
topology(const topology& other);
void clear();
future<> clear_gently() noexcept;
/**
* Stores current DC/rack assignment for ep
@@ -286,6 +286,12 @@ public:
*/
future<token_metadata> clone_after_all_left() const noexcept;
/**
* Gently clear the token_metadata members.
* Yield if needed to prevent reactor stalls.
*/
future<> clear_gently() noexcept;
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
dht::token_range_vector get_primary_ranges_for(token right) const;

View File

@@ -1812,6 +1812,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
//Pending ranges
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
metadata_clone.clear_gently().get();
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
@@ -2110,6 +2111,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
}
}
}
temp.clear_gently().get();
if (reason == streaming::stream_reason::decommission) {
db.invoke_on_all([nr_ranges_skipped] (database&) {
_node_ops_metrics.decommission_finished_ranges += nr_ranges_skipped;

View File

@@ -1706,7 +1706,15 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
}).then_wrapped([this] (future<> f) {
if (f.failed()) {
return container().invoke_on_all([] (storage_service& ss) {
ss._pending_token_metadata_ptr = {};
if (auto tmptr = std::move(ss._pending_token_metadata_ptr)) {
return tmptr->clear_gently().then_wrapped([tmptr = std::move(tmptr)] (future<> f) {
if (f.failed()) {
slogger.warn("Failure to reset pending token_metadata in cleanup path: {}. Ignored.", f.get_exception());
}
});
} else {
return make_ready_future<>();
}
}).finally([ep = f.get_exception()] () mutable {
return make_exception_future<>(std::move(ep));
});
@@ -2116,7 +2124,10 @@ future<> storage_service::decommission() {
throw std::runtime_error("local node is not a member of the token ring yet");
}
if (tmptr->clone_after_all_left().get0().sorted_tokens().size() < 2) {
auto temp = tmptr->clone_after_all_left().get0();
auto num_tokens_after_all_left = temp.sorted_tokens().size();
temp.clear_gently().get();
if (num_tokens_after_all_left < 2) {
throw std::runtime_error("no other normal nodes in the ring; decommission would be pointless");
}
@@ -2544,6 +2555,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
changed_ranges.emplace(r, ep);
}
}
temp.clear_gently().get();
return changed_ranges;
}