diff --git a/api/storage_service.cc b/api/storage_service.cc index 82835c9b70..04a4dd4bb6 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -323,8 +323,8 @@ void set_storage_service(http_context& ctx, routes& r) { for (auto cf : column_families) { column_families_vec.push_back(&db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [&cm] (column_family* cf) { - return cm.perform_cleanup(cf); + return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) { + return cm.perform_cleanup(db, cf); }); }).then([]{ return make_ready_future(0); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 9514caa7c9..84af3e6c61 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -168,15 +168,33 @@ insert_token_range_to_sorted_container_while_unwrapping( dht::token_range_vector abstract_replication_strategy::get_ranges(inet_address ep) const { - return get_ranges(ep, _token_metadata); + return do_get_ranges(ep, _token_metadata, false); +} + +dht::token_range_vector +abstract_replication_strategy::get_ranges_in_thread(inet_address ep) const { + return do_get_ranges(ep, _token_metadata, true); } dht::token_range_vector abstract_replication_strategy::get_ranges(inet_address ep, token_metadata& tm) const { + return do_get_ranges(ep, tm, false); +} + +dht::token_range_vector +abstract_replication_strategy::get_ranges_in_thread(inet_address ep, token_metadata& tm) const { + return do_get_ranges(ep, tm, true); +} + +dht::token_range_vector +abstract_replication_strategy::do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const { dht::token_range_vector ret; auto prev_tok = tm.sorted_tokens().back(); for (auto tok : tm.sorted_tokens()) { for (inet_address a : calculate_natural_endpoints(tok, tm)) { + if (can_yield) { + seastar::thread::maybe_yield(); + } if (a == ep) { insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); break; diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index cc28cdcf06..49812c43b8 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -113,10 +113,15 @@ public: // It the analogue of Origin's getAddressRanges().get(endpoint). // This function is not efficient, and not meant for the fast path. dht::token_range_vector get_ranges(inet_address ep) const; + dht::token_range_vector get_ranges_in_thread(inet_address ep) const; // Use the token_metadata provided by the caller instead of _token_metadata dht::token_range_vector get_ranges(inet_address ep, token_metadata& tm) const; + dht::token_range_vector get_ranges_in_thread(inet_address ep, token_metadata& tm) const; +private: + dht::token_range_vector do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const; +public: // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible // for storing replica primarily, which means this is the first node diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 1779f968f8..5c859454c8 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -732,20 +732,25 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } -future<> compaction_manager::perform_cleanup(column_family* cf) { +future<> compaction_manager::perform_cleanup(database& db, column_family* cf) { if (check_for_cleanup(cf)) { - throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", - cf->schema()->ks_name(), cf->schema()->cf_name())); + return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", + cf->schema()->ks_name(), cf->schema()->cf_name()))); } - return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) { - auto schema = table.schema(); - auto sorted_owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name()); + return seastar::async([this, cf, &db] { + auto schema = cf->schema(); + auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy(); + auto sorted_owned_ranges = rs.get_ranges_in_thread(utils::fb_utilities::get_broadcast_address()); auto sstables = std::vector{}; - const auto candidates = table.candidates_for_compaction(); + const auto candidates = cf->candidates_for_compaction(); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { + seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); }); return sstables; + }).then([this, cf] (std::vector sstables) { + return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), + [sstables = std::move(sstables)] (const table&) { return sstables; }); }); } diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index c413c41410..e499bbbabc 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -205,7 +205,7 @@ public: // Cleanup is about discarding keys that are no longer relevant for a // given sstable, e.g. after node loses part of its token range because // of a newly added node. - future<> perform_cleanup(column_family* cf); + future<> perform_cleanup(database& db, column_family* cf); // Submit a column family to be upgraded and wait for its termination. future<> perform_sstable_upgrade(column_family* cf, bool exclude_current_version);