mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
service: storage_service: coroutinize get_changed_ranges_for_leaving()
Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
@@ -2639,8 +2639,7 @@ int32_t storage_service::get_exception_count() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
std::unordered_multimap<dht::token_range, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
|
||||
future<std::unordered_multimap<dht::token_range, inet_address>> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
|
||||
// First get all ranges the leaving endpoint is responsible for
|
||||
auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint);
|
||||
|
||||
@@ -2655,10 +2654,10 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = erm->get_natural_endpoints(end_token);
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
seastar::thread::maybe_yield();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
auto temp = get_token_metadata_ptr()->clone_after_all_left().get0();
|
||||
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
|
||||
|
||||
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
|
||||
// command was used), it is still present in temp and must be removed.
|
||||
@@ -2676,7 +2675,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
for (auto& r : ranges) {
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto new_replica_endpoints = rs.calculate_natural_endpoints(end_token, temp).get0();
|
||||
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
|
||||
|
||||
auto rg = current_replica_endpoints.equal_range(r);
|
||||
for (auto it = rg.first; it != rg.second; it++) {
|
||||
@@ -2702,11 +2701,11 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
}
|
||||
// Replication strategy doesn't necessarily yield in calculate_natural_endpoints.
|
||||
// E.g. everywhere_replication_strategy
|
||||
seastar::thread::maybe_yield();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
temp.clear_gently().get();
|
||||
co_await temp.clear_gently();
|
||||
|
||||
return changed_ranges;
|
||||
co_return changed_ranges;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
@@ -2719,7 +2718,7 @@ void storage_service::unbootstrap() {
|
||||
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
|
||||
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()).get0();
|
||||
if (slogger.is_enabled(logging::log_level::debug)) {
|
||||
std::vector<range<token>> ranges;
|
||||
for (auto& x : ranges_mm) {
|
||||
@@ -2758,7 +2757,7 @@ void storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> s
|
||||
auto my_address = get_broadcast_address();
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node);
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, leaving_node).get0();
|
||||
dht::token_range_vector my_new_ranges;
|
||||
for (auto& x : changed_ranges) {
|
||||
if (x.second == my_address) {
|
||||
|
||||
@@ -639,7 +639,7 @@ private:
|
||||
void removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
|
||||
|
||||
// needs to be modified to accept either a keyspace or ARS.
|
||||
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
|
||||
public:
|
||||
|
||||
|
||||
Reference in New Issue
Block a user