From 986491447b981bdb88e8076a30b2dd67af7d7d32 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 19 Jun 2023 17:56:15 -0300 Subject: [PATCH] table: Optimize creation of reader excluding staging for view building View building from staging creates a reader from scratch (memtable + sstables - staging) for every partition, in order to calculate the diff between new staging data and data in base sstable set, and then pushes the result into the view replicas. perf shows that the reader creation is very expensive: + 12.15% 10.75% reactor-3 scylla [.] lexicographical_tri_compare::iterator, compound_type<(allow_prefixes)0>::iterator, legacy_compound_view >::tri_comparator::operator()(managed_bytes_basic_view<(mutable_view)0>, managed_bytes + 10.01% 9.99% reactor-3 scylla [.] boost::icl::is_empty > + 8.95% 8.94% reactor-3 scylla [.] legacy_compound_view >::tri_comparator::operator() + 7.29% 7.28% reactor-3 scylla [.] dht::ring_position_tri_compare + 6.28% 6.27% reactor-3 scylla [.] dht::tri_compare + 4.11% 3.52% reactor-3 scylla [.] boost::icl::interval_base_map, std::hash >, std::equal_to::process_state + 3.46% 0.93% reactor-3 scylla [.] sstables::sstable_run::will_introduce_overlapping + 2.53% 2.53% reactor-3 libstdc++.so.6 [.] std::_Rb_tree_increment + 2.45% 2.45% reactor-3 scylla [.] boost::icl::non_empty::exclusive_less > + 2.14% 2.13% reactor-3 scylla [.] boost::icl::exclusive_less > + 2.07% 2.07% reactor-3 scylla [.] logalloc::region_impl::free + 2.06% 1.91% reactor-3 scylla [.] sstables::index_consumer::consume_entry(sstables::parsed_partition_index_entry&&)::{lambda()#1}::operator()() const::{lambda()#1}::operator() + 2.04% 2.04% reactor-3 scylla [.] boost::icl::interval_base_map, std::hash >, std::equal_to, std::hash >, std::equal_to, std::hash >, std::equal_to (cherry picked from commit 1d8cb32a5d467d483ae3b84af70b94eccfc3f847) Signed-off-by: Raphael S. Carvalho Closes #14764 --- replica/database.hh | 8 +-- replica/table.cc | 32 +++++----- sstables/sstable_set.cc | 53 +++++++++++----- sstables/sstable_set.hh | 10 ++- sstables/sstable_set_impl.hh | 9 ++- test/boost/sstable_compaction_test.cc | 52 ++++++++++++++++ test/boost/sstable_datafile_test.cc | 90 +++++++++++++++++++++++++++ 7 files changed, 211 insertions(+), 43 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 3cd99643fb..4a278460eb 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -593,7 +593,8 @@ private: const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const; lw_shared_ptr make_maintenance_sstable_set() const; lw_shared_ptr make_compound_sstable_set(); @@ -665,9 +666,8 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema, + flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema, reader_permit permit, - std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), @@ -705,7 +705,7 @@ public: sstables::shared_sstable make_streaming_staging_sstable(); mutation_source as_mutation_source() const; - mutation_source as_mutation_source_excluding(std::vector& sst) const; + mutation_source as_mutation_source_excluding_staging() const; void set_virtual_reader(mutation_source virtual_reader) { _virtual_reader = std::move(virtual_reader); diff --git a/replica/table.cc b/replica/table.cc index 39f325c5b5..ebfc3e8787 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -73,7 +73,8 @@ table::make_sstable_reader(schema_ptr s, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& predicate) const { // CAVEAT: if make_sstable_reader() is called on a single partition // we want to optimize and read exactly this partition. As a // consequence, fast_forward_to() will *NOT* work on the result, @@ -85,10 +86,10 @@ table::make_sstable_reader(schema_ptr s, } return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate); } else { return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr); + std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate); } } @@ -2439,9 +2440,8 @@ table::disable_auto_compaction() { } flat_mutation_reader_v2 -table::make_reader_v2_excluding_sstables(schema_ptr s, +table::make_reader_v2_excluding_staging(schema_ptr s, reader_permit permit, - std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2454,16 +2454,12 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, readers.reserve(memtable_count + 1); }); - auto excluded_ssts = boost::copy_range>(excluded); - auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema)); - _sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable { - if (excluded_ssts.contains(sst)) { - return; - } - effective_sstables->insert(sst); - }); + static std::predicate auto excl_staging_predicate = [] (const sstables::sstable& sst) { + return !sst.requires_view_building(); + }; + + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate)); - readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } @@ -2593,7 +2589,7 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou s, std::move(m), timeout, - as_mutation_source_excluding(excluded_sstables), + as_mutation_source_excluding_staging(), tracing::trace_state_ptr(), *_config.streaming_read_concurrency_semaphore, service::get_local_streaming_priority(), @@ -2601,8 +2597,8 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou } mutation_source -table::as_mutation_source_excluding(std::vector& ssts) const { - return mutation_source([this, &ssts] (schema_ptr s, +table::as_mutation_source_excluding_staging() const { + return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -2610,7 +2606,7 @@ table::as_mutation_source_excluding(std::vector& ssts) tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 78720aa8bc..5f5cf184b7 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -771,10 +771,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) { }; } -// Filter out sstables for reader using bloom filter +const sstable_predicate& default_sstable_predicate() { + static const sstable_predicate predicate = [] (const sstable&) { return true; }; + return predicate; +} + +static std::predicate auto +make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) { + return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) { + return predicate(sst) && pk_filter(sst); + }; +} + +// Filter out sstables for reader using bloom filter and supplied predicate static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { - auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; +filter_sstable_for_reader(std::vector&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) { + auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); }; sstables.erase(boost::remove_if(sstables, filter), sstables.end()); return std::move(sstables); } @@ -829,10 +841,11 @@ sstable_set_impl::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); - auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos); + auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader_v2(schema, permit); @@ -872,7 +885,8 @@ time_series_sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); // First check if the optimized algorithm for TWCS single partition queries can be applied. // Multiple conditions must be satisfied: @@ -894,11 +908,11 @@ time_series_sstable_set::create_single_key_sstable_reader( // Some of the conditions were not satisfied so we use the standard query path. return sstable_set_impl::create_single_key_sstable_reader( cf, std::move(schema), std::move(permit), sstable_histogram, - pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr); + pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, predicate); } - auto pk_filter = make_pk_filter(pos, *schema); - auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); }); + auto sst_filter = make_sstable_filter(pos, *schema, predicate); + auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); }); if (it == _sstables->end()) { // No sstables contain data for the queried partition. return make_empty_flat_reader_v2(std::move(schema), std::move(permit)); @@ -911,6 +925,7 @@ time_series_sstable_set::create_single_key_sstable_reader( return sst.make_reader(schema, permit, pr, slice, pc, trace_state, fwd_sm); }; + auto pk_filter = make_pk_filter(pos, *schema); auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); }; // We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that @@ -1096,7 +1111,8 @@ compound_sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { auto sets = _sets; auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return !set->all()->empty(); }); auto non_empty_set_count = std::distance(sets.begin(), it); @@ -1107,13 +1123,13 @@ compound_sstable_set::create_single_key_sstable_reader( // optimize for common case where only 1 set is populated, avoiding the expensive combined reader if (non_empty_set_count == 1) { const auto& non_empty_set = *std::begin(sets); - return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate); } auto readers = boost::copy_range>( boost::make_iterator_range(sets.begin(), it) | boost::adaptors::transformed([&] (const lw_shared_ptr& non_empty_set) { - return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate); }) ); return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr); @@ -1130,10 +1146,11 @@ sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { assert(pr.is_singular() && pr.start()->value().has_key()); return _impl->create_single_key_sstable_reader(cf, std::move(schema), - std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate); } flat_mutation_reader_v2 @@ -1171,11 +1188,15 @@ sstable_set::make_local_shard_sstable_reader( tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const + read_monitor_generator& monitor_generator, + const sstable_predicate& predicate) const { - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator, &predicate] (shared_sstable& sst, const dht::partition_range& pr) mutable { assert(!sst->is_shared()); + if (!predicate(*sst)) { + return make_empty_flat_reader_v2(s, permit); + } return sst->make_reader(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); }; if (auto sstables = _impl->all(); sstables->size() == 1) [[unlikely]] { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index fc01815663..559666d865 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -49,6 +49,10 @@ public: double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const; }; +using sstable_predicate = noncopyable_function; +// Default predicate includes everything +const sstable_predicate& default_sstable_predicate(); + class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; @@ -115,7 +119,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate& p = default_sstable_predicate()) const; /// Read a range from the sstable set. /// @@ -142,7 +147,8 @@ public: tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; + read_monitor_generator& rmg = default_read_monitor_generator(), + const sstable_predicate& p = default_sstable_predicate()) const; flat_mutation_reader_v2 make_crawling_reader( schema_ptr, diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 309f37010f..19387b274b 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -45,7 +45,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate&) const; }; // specialized when sstables are partitioned in the token range space @@ -141,7 +142,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; friend class sstable_position_reader_queue; }; @@ -173,7 +175,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; class incremental_selector; }; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index ba190a8561..faf7cf42c0 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -5117,3 +5117,55 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) { BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2); }); } + +SEASTAR_TEST_CASE(test_sstables_excluding_staging_correctness) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + + auto make_mut = [&] (auto pkey) { + auto mut1 = mutation(s, pkey); + mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + return mut1; + }; + std::set sorted_muts; + sorted_muts.insert(make_mut(pks[0])); + sorted_muts.insert(make_mut(pks[1])); + + auto tmp = tmpdir(); + table_for_tests t(env.manager(), s, tmp.path().string()); + auto close_t = deferred_stop(t); + t->mark_ready_for_writes(); + + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)]() { + return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big); + }; + + auto staging_sst = make_sstable_containing(sst_gen, {*sorted_muts.begin()}); + staging_sst->move_to_new_dir((tmp.path() / fs::path(sstables::staging_dir)).string(), staging_sst->generation()).get(); + BOOST_REQUIRE(staging_sst->requires_view_building()); + + auto regular_sst = make_sstable_containing(sst_gen, {*sorted_muts.rbegin()}); + + t->add_sstable_and_update_cache(staging_sst).get(); + t->add_sstable_and_update_cache(regular_sst).get(); + + { + testlog.info("table::as_mutation_source_excluding_staging()"); + auto ms_excluding_staging = t->as_mutation_source_excluding_staging(); + assert_that(ms_excluding_staging.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range)) + .produces(*sorted_muts.rbegin()) + .produces_end_of_stream(); + } + + { + testlog.info("table::as_mutation_source()"); + auto ms_inclusive = t->as_mutation_source(); + assert_that(ms_inclusive.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range)) + .produces(*sorted_muts.begin()) + .produces(*sorted_muts.rbegin()) + .produces_end_of_stream(); + } + }); +} diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 2643dafe9d..d780b439a5 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -3393,3 +3393,93 @@ SEASTAR_TEST_CASE(find_first_position_in_partition_from_sstable_test) { } }); } + +SEASTAR_TEST_CASE(test_sstable_set_predicate) { + return test_env::do_with_async([this] (test_env& env) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + auto s = random_schema.schema(); + + testlog.info("Random schema:\n{}", random_schema.cql()); + + const auto muts = tests::generate_random_mutations(random_schema, 20).get(); + + auto tmp = tmpdir(); + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)]() { + return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big); + }; + auto sst = make_sstable_containing(sst_gen, muts); + + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + sstable_set set = cs.make_sstable_set(s); + set.insert(sst); + + auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key()); + + auto make_point_query_reader = [&] (std::predicate auto& pred) { + table_for_tests t(env.manager(), s); + auto close_t = deferred_stop(t); + utils::estimated_histogram eh; + return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh, + first_key_pr, + s->full_slice(), + default_priority_class(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + pred); + }; + + auto make_full_scan_reader = [&] (std::predicate auto& pred) { + return set.make_local_shard_sstable_reader(s, env.make_reader_permit(), + query::full_partition_range, + s->full_slice(), + default_priority_class(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator(), + pred); + }; + + auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) { + auto close_mr = deferred_close(sst_mr); + auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0(); + + if (expect_eos) { + BOOST_REQUIRE(sst_mr.is_buffer_empty()); + BOOST_REQUIRE(sst_mr.is_end_of_stream()); + BOOST_REQUIRE(!sst_mut); + } else { + BOOST_REQUIRE(sst_mut); + } + }; + + { + static std::predicate auto excluding_pred = [] (const sstable&) { + return false; + }; + + testlog.info("excluding_pred: point query"); + verify_reader_result(make_point_query_reader(excluding_pred), true); + testlog.info("excluding_pred: range query"); + verify_reader_result(make_full_scan_reader(excluding_pred), true); + } + + { + static std::predicate auto inclusive_pred = [] (const sstable&) { + return true; + }; + + testlog.info("inclusive_pred: point query"); + verify_reader_result(make_point_query_reader(inclusive_pred), false); + testlog.info("inclusive_pred: range query"); + verify_reader_result(make_full_scan_reader(inclusive_pred), false); + } + }); +}