mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
token_metadata: add clear_gently
clear_gently gently clears the token_metadata members. It uses continuations to allow yielding if needed to prevent reactor stalls. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -117,6 +117,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh
|
||||
|
||||
auto tm = get_token_metadata().clone_only_token_map().get0();
|
||||
auto range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes);
|
||||
tm.clear_gently().get();
|
||||
|
||||
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
|
||||
|
||||
@@ -163,6 +164,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
|
||||
//Pending ranges
|
||||
metadata_clone.update_normal_tokens(_tokens, _address).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
|
||||
metadata_clone.clear_gently().get();
|
||||
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;
|
||||
|
||||
@@ -304,6 +304,9 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
|
||||
for (auto& x : get_address_ranges(temp, pending_address, can_yield)) {
|
||||
ret.push_back(x.second);
|
||||
}
|
||||
if (can_yield) {
|
||||
temp.clear_gently().get();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@@ -448,6 +448,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy the token_metadata members using continuations
|
||||
* to prevent reactor stalls.
|
||||
*/
|
||||
future<> clear_gently() noexcept;
|
||||
|
||||
public:
|
||||
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
|
||||
|
||||
@@ -1009,6 +1015,43 @@ future<token_metadata_impl> token_metadata_impl::clone_only_token_map(bool clone
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Container>
|
||||
static future<> clear_container_gently(Container& c) noexcept;
|
||||
|
||||
// The vector elements we use here (token / inet_address) have trivial destructors
|
||||
// so they can be safely cleared in bulk
|
||||
template <typename T, typename Container = std::vector<T>>
|
||||
static future<> clear_container_gently(Container& vect) noexcept {
|
||||
vect.clear();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
template <typename Container>
|
||||
static future<> clear_container_gently(Container& c) noexcept {
|
||||
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
|
||||
co_await make_ready_future<>(); // maybe yield
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Container>
|
||||
static future<> clear_nested_container_gently(Container& c) noexcept {
|
||||
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
|
||||
co_await clear_container_gently(b->second);
|
||||
}
|
||||
}
|
||||
|
||||
future<> token_metadata_impl::clear_gently() noexcept {
|
||||
co_await clear_container_gently(_token_to_endpoint_map);
|
||||
co_await clear_container_gently(_endpoint_to_host_id_map);
|
||||
co_await clear_container_gently(_bootstrap_tokens);
|
||||
co_await clear_container_gently(_leaving_endpoints);
|
||||
co_await clear_container_gently(_replacing_endpoints);
|
||||
co_await clear_container_gently(_pending_ranges_interval_map);
|
||||
co_await clear_container_gently(_sorted_tokens);
|
||||
co_await _topology.clear_gently();
|
||||
co_return;
|
||||
}
|
||||
|
||||
void token_metadata_impl::sort_tokens() {
|
||||
std::vector<token> sorted;
|
||||
sorted.reserve(_token_to_endpoint_map.size());
|
||||
@@ -1518,6 +1561,7 @@ void token_metadata_impl::calculate_pending_ranges_for_leaving(
|
||||
new_pending_ranges.emplace(r, ep);
|
||||
}
|
||||
}
|
||||
metadata.clear_gently().get();
|
||||
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges_size);
|
||||
}
|
||||
|
||||
@@ -1594,6 +1638,7 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
// At this stage newPendingRanges has been updated according to leave operations. We can
|
||||
// now continue the calculation by checking bootstrapping nodes.
|
||||
calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata);
|
||||
all_left_metadata->clear_gently().get();
|
||||
|
||||
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges), can_yield::yes);
|
||||
@@ -1951,6 +1996,10 @@ token_metadata::clone_after_all_left() const noexcept {
|
||||
});
|
||||
}
|
||||
|
||||
future<> token_metadata::clear_gently() noexcept {
|
||||
return _impl->clear_gently();
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) const {
|
||||
return _impl->get_primary_ranges_for(std::move(tokens));
|
||||
@@ -2027,10 +2076,11 @@ token_metadata::invalidate_cached_rings() {
|
||||
}
|
||||
|
||||
/////////////////// class topology /////////////////////////////////////////////
|
||||
inline void topology::clear() {
|
||||
_dc_endpoints.clear();
|
||||
_dc_racks.clear();
|
||||
_current_locations.clear();
|
||||
inline future<> topology::clear_gently() noexcept {
|
||||
co_await clear_nested_container_gently(_dc_endpoints);
|
||||
co_await clear_nested_container_gently(_dc_racks);
|
||||
co_await clear_container_gently(_current_locations);
|
||||
co_return;
|
||||
}
|
||||
|
||||
topology::topology(const topology& other) {
|
||||
|
||||
@@ -73,7 +73,7 @@ public:
|
||||
topology() {}
|
||||
topology(const topology& other);
|
||||
|
||||
void clear();
|
||||
future<> clear_gently() noexcept;
|
||||
|
||||
/**
|
||||
* Stores current DC/rack assignment for ep
|
||||
@@ -286,6 +286,12 @@ public:
|
||||
*/
|
||||
future<token_metadata> clone_after_all_left() const noexcept;
|
||||
|
||||
/**
|
||||
* Gently clear the token_metadata members.
|
||||
* Yield if needed to prevent reactor stalls.
|
||||
*/
|
||||
future<> clear_gently() noexcept;
|
||||
|
||||
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
|
||||
|
||||
dht::token_range_vector get_primary_ranges_for(token right) const;
|
||||
|
||||
@@ -1812,6 +1812,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
|
||||
//Pending ranges
|
||||
metadata_clone.update_normal_tokens(tokens, myip).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
|
||||
metadata_clone.clear_gently().get();
|
||||
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
@@ -2110,6 +2111,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
}
|
||||
}
|
||||
}
|
||||
temp.clear_gently().get();
|
||||
if (reason == streaming::stream_reason::decommission) {
|
||||
db.invoke_on_all([nr_ranges_skipped] (database&) {
|
||||
_node_ops_metrics.decommission_finished_ranges += nr_ranges_skipped;
|
||||
|
||||
@@ -1706,7 +1706,15 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
}).then_wrapped([this] (future<> f) {
|
||||
if (f.failed()) {
|
||||
return container().invoke_on_all([] (storage_service& ss) {
|
||||
ss._pending_token_metadata_ptr = {};
|
||||
if (auto tmptr = std::move(ss._pending_token_metadata_ptr)) {
|
||||
return tmptr->clear_gently().then_wrapped([tmptr = std::move(tmptr)] (future<> f) {
|
||||
if (f.failed()) {
|
||||
slogger.warn("Failure to reset pending token_metadata in cleanup path: {}. Ignored.", f.get_exception());
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}).finally([ep = f.get_exception()] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
@@ -2116,7 +2124,10 @@ future<> storage_service::decommission() {
|
||||
throw std::runtime_error("local node is not a member of the token ring yet");
|
||||
}
|
||||
|
||||
if (tmptr->clone_after_all_left().get0().sorted_tokens().size() < 2) {
|
||||
auto temp = tmptr->clone_after_all_left().get0();
|
||||
auto num_tokens_after_all_left = temp.sorted_tokens().size();
|
||||
temp.clear_gently().get();
|
||||
if (num_tokens_after_all_left < 2) {
|
||||
throw std::runtime_error("no other normal nodes in the ring; decommission would be pointless");
|
||||
}
|
||||
|
||||
@@ -2544,6 +2555,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
changed_ranges.emplace(r, ep);
|
||||
}
|
||||
}
|
||||
temp.clear_gently().get();
|
||||
|
||||
return changed_ranges;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user