From 94995acedb42fc7d15c5e65fcd5def9dcfca1984 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 29 Jun 2020 10:47:22 +0800 Subject: [PATCH 1/3] abstract_replication_strategy: Add get_ranges_in_thread Add a version that runs inside a seastar thread. The benefit is that get_ranges can yield to avoid stalls. Refs #6662 --- locator/abstract_replication_strategy.cc | 20 +++++++++++++++++++- locator/abstract_replication_strategy.hh | 5 +++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 9514caa7c9..84af3e6c61 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -168,15 +168,33 @@ insert_token_range_to_sorted_container_while_unwrapping( dht::token_range_vector abstract_replication_strategy::get_ranges(inet_address ep) const { - return get_ranges(ep, _token_metadata); + return do_get_ranges(ep, _token_metadata, false); +} + +dht::token_range_vector +abstract_replication_strategy::get_ranges_in_thread(inet_address ep) const { + return do_get_ranges(ep, _token_metadata, true); } dht::token_range_vector abstract_replication_strategy::get_ranges(inet_address ep, token_metadata& tm) const { + return do_get_ranges(ep, tm, false); +} + +dht::token_range_vector +abstract_replication_strategy::get_ranges_in_thread(inet_address ep, token_metadata& tm) const { + return do_get_ranges(ep, tm, true); +} + +dht::token_range_vector +abstract_replication_strategy::do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const { dht::token_range_vector ret; auto prev_tok = tm.sorted_tokens().back(); for (auto tok : tm.sorted_tokens()) { for (inet_address a : calculate_natural_endpoints(tok, tm)) { + if (can_yield) { + seastar::thread::maybe_yield(); + } if (a == ep) { insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); break; diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index cc28cdcf06..49812c43b8 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -113,10 +113,15 @@ public: // It the analogue of Origin's getAddressRanges().get(endpoint). // This function is not efficient, and not meant for the fast path. dht::token_range_vector get_ranges(inet_address ep) const; + dht::token_range_vector get_ranges_in_thread(inet_address ep) const; // Use the token_metadata provided by the caller instead of _token_metadata dht::token_range_vector get_ranges(inet_address ep, token_metadata& tm) const; + dht::token_range_vector get_ranges_in_thread(inet_address ep, token_metadata& tm) const; +private: + dht::token_range_vector do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const; +public: // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible // for storing replica primarily, which means this is the first node From 868e2da1c4c723699e797268dda8672d82442552 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 19 Jun 2020 09:56:10 +0800 Subject: [PATCH 2/3] compaction_manager: Return exception future in perform_cleanup We should return the exception future instead of throw a plain exception. Refs #6662 --- sstables/compaction_manager.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 1779f968f8..38d8e865c1 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -734,8 +734,8 @@ bool needs_cleanup(const sstables::shared_sstable& sst, future<> compaction_manager::perform_cleanup(column_family* cf) { if (check_for_cleanup(cf)) { - throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", - cf->schema()->ks_name(), cf->schema()->cf_name())); + return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", + cf->schema()->ks_name(), cf->schema()->cf_name()))); } return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) { auto schema = table.schema(); From 07e253542d6b2611ee4b1c8df2653b0af4ef085a Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 1 Jul 2020 15:03:50 +0800 Subject: [PATCH 3/3] compaction_manager: Avoid stall in perform_cleanup The following stall was seen during a cleanup operation: scylla: Reactor stalled for 16262 ms on shard 4. | std::_MakeUniq::__single_object std::make_unique(locator::tokens_iterator_impl&) at /usr/include/fmt/format.h:1158 | (inlined by) locator::token_metadata::tokens_iterator::tokens_iterator(locator::token_metadata::tokens_iterator const&) at ./locator/token_metadata.cc:1602 | locator::simple_strategy::calculate_natural_endpoints(dht::token const&, locator::token_metadata&) const at simple_strategy.cc:? | (inlined by) locator::simple_strategy::calculate_natural_endpoints(dht::token const&, locator::token_metadata&) const at ./locator/simple_strategy.cc:56 | locator::abstract_replication_strategy::get_ranges(gms::inet_address, locator::token_metadata&) const at /usr/include/fmt/format.h:1158 | locator::abstract_replication_strategy::get_ranges(gms::inet_address) const at /usr/include/fmt/format.h:1158 | service::storage_service::get_ranges_for_endpoint(seastar::basic_sstring const&, gms::inet_address const&) const at /usr/include/fmt/format.h:1158 | service::storage_service::get_local_ranges(seastar::basic_sstring const&) const at /usr/include/fmt/format.h:1158 | (inlined by) operator() at ./sstables/compaction_manager.cc:691 | (inlined by) _M_invoke at /usr/include/c++/9/bits/std_function.h:286 | std::function, std::allocator > > (table const&)>::operator()(table const&) const at /usr/include/fmt/format.h:1158 | (inlined by) compaction_manager::rewrite_sstables(table*, sstables::compaction_options, std::function, std::allocator > > (table const&)>) at ./sstables/compaction_manager.cc:604 | compaction_manager::perform_cleanup(table*) at /usr/include/fmt/format.h:1158 To fix, we furturize the function to get local ranges and sstables. In addition, this patch removes the dependency to global storage_service object. Fixes #6662 --- api/storage_service.cc | 4 ++-- sstables/compaction_manager.cc | 15 ++++++++++----- sstables/compaction_manager.hh | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 82835c9b70..04a4dd4bb6 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -323,8 +323,8 @@ void set_storage_service(http_context& ctx, routes& r) { for (auto cf : column_families) { column_families_vec.push_back(&db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [&cm] (column_family* cf) { - return cm.perform_cleanup(cf); + return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) { + return cm.perform_cleanup(db, cf); }); }).then([]{ return make_ready_future(0); diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 38d8e865c1..5c859454c8 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -732,20 +732,25 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } -future<> compaction_manager::perform_cleanup(column_family* cf) { +future<> compaction_manager::perform_cleanup(database& db, column_family* cf) { if (check_for_cleanup(cf)) { return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name()))); } - return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) { - auto schema = table.schema(); - auto sorted_owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name()); + return seastar::async([this, cf, &db] { + auto schema = cf->schema(); + auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy(); + auto sorted_owned_ranges = rs.get_ranges_in_thread(utils::fb_utilities::get_broadcast_address()); auto sstables = std::vector{}; - const auto candidates = table.candidates_for_compaction(); + const auto candidates = cf->candidates_for_compaction(); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { + seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); }); return sstables; + }).then([this, cf] (std::vector sstables) { + return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), + [sstables = std::move(sstables)] (const table&) { return sstables; }); }); } diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index c413c41410..e499bbbabc 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -205,7 +205,7 @@ public: // Cleanup is about discarding keys that are no longer relevant for a // given sstable, e.g. after node loses part of its token range because // of a newly added node. - future<> perform_cleanup(column_family* cf); + future<> perform_cleanup(database& db, column_family* cf); // Submit a column family to be upgraded and wait for its termination. future<> perform_sstable_upgrade(column_family* cf, bool exclude_current_version);