diff --git a/replica/database.hh b/replica/database.hh index 3cd99643fb..4a278460eb 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -593,7 +593,8 @@ private: const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const; lw_shared_ptr make_maintenance_sstable_set() const; lw_shared_ptr make_compound_sstable_set(); @@ -665,9 +666,8 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema, + flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema, reader_permit permit, - std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), @@ -705,7 +705,7 @@ public: sstables::shared_sstable make_streaming_staging_sstable(); mutation_source as_mutation_source() const; - mutation_source as_mutation_source_excluding(std::vector& sst) const; + mutation_source as_mutation_source_excluding_staging() const; void set_virtual_reader(mutation_source virtual_reader) { _virtual_reader = std::move(virtual_reader); diff --git a/replica/table.cc b/replica/table.cc index 39f325c5b5..ebfc3e8787 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -73,7 +73,8 @@ table::make_sstable_reader(schema_ptr s, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& predicate) const { // CAVEAT: if make_sstable_reader() is called on a single partition // we want to optimize and read exactly this partition. As a // consequence, fast_forward_to() will *NOT* work on the result, @@ -85,10 +86,10 @@ table::make_sstable_reader(schema_ptr s, } return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate); } else { return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr); + std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate); } } @@ -2439,9 +2440,8 @@ table::disable_auto_compaction() { } flat_mutation_reader_v2 -table::make_reader_v2_excluding_sstables(schema_ptr s, +table::make_reader_v2_excluding_staging(schema_ptr s, reader_permit permit, - std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2454,16 +2454,12 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, readers.reserve(memtable_count + 1); }); - auto excluded_ssts = boost::copy_range>(excluded); - auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema)); - _sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable { - if (excluded_ssts.contains(sst)) { - return; - } - effective_sstables->insert(sst); - }); + static std::predicate auto excl_staging_predicate = [] (const sstables::sstable& sst) { + return !sst.requires_view_building(); + }; + + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate)); - readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } @@ -2593,7 +2589,7 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou s, std::move(m), timeout, - as_mutation_source_excluding(excluded_sstables), + as_mutation_source_excluding_staging(), tracing::trace_state_ptr(), *_config.streaming_read_concurrency_semaphore, service::get_local_streaming_priority(), @@ -2601,8 +2597,8 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou } mutation_source -table::as_mutation_source_excluding(std::vector& ssts) const { - return mutation_source([this, &ssts] (schema_ptr s, +table::as_mutation_source_excluding_staging() const { + return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -2610,7 +2606,7 @@ table::as_mutation_source_excluding(std::vector& ssts) tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 78720aa8bc..5f5cf184b7 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -771,10 +771,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) { }; } -// Filter out sstables for reader using bloom filter +const sstable_predicate& default_sstable_predicate() { + static const sstable_predicate predicate = [] (const sstable&) { return true; }; + return predicate; +} + +static std::predicate auto +make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) { + return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) { + return predicate(sst) && pk_filter(sst); + }; +} + +// Filter out sstables for reader using bloom filter and supplied predicate static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { - auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; +filter_sstable_for_reader(std::vector&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) { + auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); }; sstables.erase(boost::remove_if(sstables, filter), sstables.end()); return std::move(sstables); } @@ -829,10 +841,11 @@ sstable_set_impl::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); - auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos); + auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader_v2(schema, permit); @@ -872,7 +885,8 @@ time_series_sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); // First check if the optimized algorithm for TWCS single partition queries can be applied. // Multiple conditions must be satisfied: @@ -894,11 +908,11 @@ time_series_sstable_set::create_single_key_sstable_reader( // Some of the conditions were not satisfied so we use the standard query path. return sstable_set_impl::create_single_key_sstable_reader( cf, std::move(schema), std::move(permit), sstable_histogram, - pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr); + pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, predicate); } - auto pk_filter = make_pk_filter(pos, *schema); - auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); }); + auto sst_filter = make_sstable_filter(pos, *schema, predicate); + auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); }); if (it == _sstables->end()) { // No sstables contain data for the queried partition. return make_empty_flat_reader_v2(std::move(schema), std::move(permit)); @@ -911,6 +925,7 @@ time_series_sstable_set::create_single_key_sstable_reader( return sst.make_reader(schema, permit, pr, slice, pc, trace_state, fwd_sm); }; + auto pk_filter = make_pk_filter(pos, *schema); auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); }; // We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that @@ -1096,7 +1111,8 @@ compound_sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { auto sets = _sets; auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return !set->all()->empty(); }); auto non_empty_set_count = std::distance(sets.begin(), it); @@ -1107,13 +1123,13 @@ compound_sstable_set::create_single_key_sstable_reader( // optimize for common case where only 1 set is populated, avoiding the expensive combined reader if (non_empty_set_count == 1) { const auto& non_empty_set = *std::begin(sets); - return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate); } auto readers = boost::copy_range>( boost::make_iterator_range(sets.begin(), it) | boost::adaptors::transformed([&] (const lw_shared_ptr& non_empty_set) { - return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate); }) ); return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr); @@ -1130,10 +1146,11 @@ sstable_set::create_single_key_sstable_reader( const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { assert(pr.is_singular() && pr.start()->value().has_key()); return _impl->create_single_key_sstable_reader(cf, std::move(schema), - std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate); } flat_mutation_reader_v2 @@ -1171,11 +1188,15 @@ sstable_set::make_local_shard_sstable_reader( tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const + read_monitor_generator& monitor_generator, + const sstable_predicate& predicate) const { - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator, &predicate] (shared_sstable& sst, const dht::partition_range& pr) mutable { assert(!sst->is_shared()); + if (!predicate(*sst)) { + return make_empty_flat_reader_v2(s, permit); + } return sst->make_reader(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); }; if (auto sstables = _impl->all(); sstables->size() == 1) [[unlikely]] { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index fc01815663..559666d865 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -49,6 +49,10 @@ public: double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const; }; +using sstable_predicate = noncopyable_function; +// Default predicate includes everything +const sstable_predicate& default_sstable_predicate(); + class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; @@ -115,7 +119,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate& p = default_sstable_predicate()) const; /// Read a range from the sstable set. /// @@ -142,7 +147,8 @@ public: tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; + read_monitor_generator& rmg = default_read_monitor_generator(), + const sstable_predicate& p = default_sstable_predicate()) const; flat_mutation_reader_v2 make_crawling_reader( schema_ptr, diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 309f37010f..19387b274b 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -45,7 +45,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate&) const; }; // specialized when sstables are partitioned in the token range space @@ -141,7 +142,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; friend class sstable_position_reader_queue; }; @@ -173,7 +175,8 @@ public: const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; class incremental_selector; }; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index ba190a8561..faf7cf42c0 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -5117,3 +5117,55 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) { BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2); }); } + +SEASTAR_TEST_CASE(test_sstables_excluding_staging_correctness) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + + auto make_mut = [&] (auto pkey) { + auto mut1 = mutation(s, pkey); + mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + return mut1; + }; + std::set sorted_muts; + sorted_muts.insert(make_mut(pks[0])); + sorted_muts.insert(make_mut(pks[1])); + + auto tmp = tmpdir(); + table_for_tests t(env.manager(), s, tmp.path().string()); + auto close_t = deferred_stop(t); + t->mark_ready_for_writes(); + + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)]() { + return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big); + }; + + auto staging_sst = make_sstable_containing(sst_gen, {*sorted_muts.begin()}); + staging_sst->move_to_new_dir((tmp.path() / fs::path(sstables::staging_dir)).string(), staging_sst->generation()).get(); + BOOST_REQUIRE(staging_sst->requires_view_building()); + + auto regular_sst = make_sstable_containing(sst_gen, {*sorted_muts.rbegin()}); + + t->add_sstable_and_update_cache(staging_sst).get(); + t->add_sstable_and_update_cache(regular_sst).get(); + + { + testlog.info("table::as_mutation_source_excluding_staging()"); + auto ms_excluding_staging = t->as_mutation_source_excluding_staging(); + assert_that(ms_excluding_staging.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range)) + .produces(*sorted_muts.rbegin()) + .produces_end_of_stream(); + } + + { + testlog.info("table::as_mutation_source()"); + auto ms_inclusive = t->as_mutation_source(); + assert_that(ms_inclusive.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range)) + .produces(*sorted_muts.begin()) + .produces(*sorted_muts.rbegin()) + .produces_end_of_stream(); + } + }); +} diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 2643dafe9d..d780b439a5 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -3393,3 +3393,93 @@ SEASTAR_TEST_CASE(find_first_position_in_partition_from_sstable_test) { } }); } + +SEASTAR_TEST_CASE(test_sstable_set_predicate) { + return test_env::do_with_async([this] (test_env& env) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + auto s = random_schema.schema(); + + testlog.info("Random schema:\n{}", random_schema.cql()); + + const auto muts = tests::generate_random_mutations(random_schema, 20).get(); + + auto tmp = tmpdir(); + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)]() { + return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big); + }; + auto sst = make_sstable_containing(sst_gen, muts); + + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + sstable_set set = cs.make_sstable_set(s); + set.insert(sst); + + auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key()); + + auto make_point_query_reader = [&] (std::predicate auto& pred) { + table_for_tests t(env.manager(), s); + auto close_t = deferred_stop(t); + utils::estimated_histogram eh; + return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh, + first_key_pr, + s->full_slice(), + default_priority_class(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + pred); + }; + + auto make_full_scan_reader = [&] (std::predicate auto& pred) { + return set.make_local_shard_sstable_reader(s, env.make_reader_permit(), + query::full_partition_range, + s->full_slice(), + default_priority_class(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator(), + pred); + }; + + auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) { + auto close_mr = deferred_close(sst_mr); + auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0(); + + if (expect_eos) { + BOOST_REQUIRE(sst_mr.is_buffer_empty()); + BOOST_REQUIRE(sst_mr.is_end_of_stream()); + BOOST_REQUIRE(!sst_mut); + } else { + BOOST_REQUIRE(sst_mut); + } + }; + + { + static std::predicate auto excluding_pred = [] (const sstable&) { + return false; + }; + + testlog.info("excluding_pred: point query"); + verify_reader_result(make_point_query_reader(excluding_pred), true); + testlog.info("excluding_pred: range query"); + verify_reader_result(make_full_scan_reader(excluding_pred), true); + } + + { + static std::predicate auto inclusive_pred = [] (const sstable&) { + return true; + }; + + testlog.info("inclusive_pred: point query"); + verify_reader_result(make_point_query_reader(inclusive_pred), false); + testlog.info("inclusive_pred: range query"); + verify_reader_result(make_full_scan_reader(inclusive_pred), false); + } + }); +}