table: Optimize creation of reader excluding staging for view building

View building from staging creates a reader from scratch (memtable
+ sstables - staging) for every partition, in order to calculate
the diff between new staging data and data in base sstable set,
and then pushes the result into the view replicas.

perf shows that the reader creation is very expensive:
+   12.15%    10.75%  reactor-3        scylla             [.] lexicographical_tri_compare<compound_type<(allow_prefixes)0>::iterator, compound_type<(allow_prefixes)0>::iterator, legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()(managed_bytes_basic_view<(mutable_view)0>, managed_bytes
+   10.01%     9.99%  reactor-3        scylla             [.] boost::icl::is_empty<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    8.95%     8.94%  reactor-3        scylla             [.] legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()
+    7.29%     7.28%  reactor-3        scylla             [.] dht::ring_position_tri_compare
+    6.28%     6.27%  reactor-3        scylla             [.] dht::tri_compare
+    4.11%     3.52%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    4.09%     4.07%  reactor-3        scylla             [.] sstables::index_consume_entry_context<sstables::index_consumer>::process_state
+    3.46%     0.93%  reactor-3        scylla             [.] sstables::sstable_run::will_introduce_overlapping
+    2.53%     2.53%  reactor-3        libstdc++.so.6     [.] std::_Rb_tree_increment
+    2.45%     2.45%  reactor-3        scylla             [.] boost::icl::non_empty::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.14%     2.13%  reactor-3        scylla             [.] boost::icl::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.07%     2.07%  reactor-3        scylla             [.] logalloc::region_impl::free
+    2.06%     1.91%  reactor-3        scylla             [.] sstables::index_consumer::consume_entry(sstables::parsed_partition_index_entry&&)::{lambda()#1}::operator()() const::{lambda()#1}::operator()
+    2.04%     2.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    1.87%     0.00%  reactor-3        [kernel.kallsyms]  [k] entry_SYSCALL_64_after_hwframe
+    1.86%     0.00%  reactor-3        [kernel.kallsyms]  [k] do_syscall_64
+    1.39%     1.38%  reactor-3        libc.so.6          [.] __memcmp_avx2_movbe
+    1.37%     0.92%  reactor-3        scylla             [.] boost::icl::segmental::join_left<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::
+    1.34%     1.33%  reactor-3        scylla             [.] logalloc::region_impl::alloc_small
+    1.33%     1.33%  reactor-3        scylla             [.] seastar::memory::small_pool::add_more_objects
+    1.30%     0.35%  reactor-3        scylla             [.] seastar::reactor::do_run
+    1.29%     1.29%  reactor-3        scylla             [.] seastar::memory::allocate
+    1.19%     0.05%  reactor-3        libc.so.6          [.] syscall
+    1.16%     1.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst
+    1.07%     0.79%  reactor-3        scylla             [.] sstables::partitioned_sstable_set::insert

That shows some significant amount of work for inserting sstables
into the interval map and maintaining the sstable run (which sorts
fragments by first key and checks for overlapping).

The interval map is known for having issues with L0 sstables, as
it will have to be replicated almost to every single interval
stored by the map, causing terrible space and time complexity.
With enough L0 sstables, it can fall into quadratic behavior.

This overhead is fixed by not building a new fresh sstable set
when recreating the reader, but rather supplying a predicate
to sstable set that will filter out staging sstables when
creating either a single-key or range scan reader.

This could have another benefit over today's approach which
may incorrectly consider a staging sstable as non-staging, if
the staging sst wasn't included in the current batch for view
building.

With this improvement, view building was measured to be 3x faster.

from
INFO  2023-06-16 12:36:40,014 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 963957ms = 50kB/s

to
INFO  2023-06-16 14:47:12,129 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 319899ms = 150kB/s

Refs #14089.
Fixes #14244.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 1d8cb32a5d)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #14764
This commit is contained in:
Raphael S. Carvalho
2023-06-19 17:56:15 -03:00
committed by Botond Dénes
parent a05bb26cd6
commit 986491447b
7 changed files with 211 additions and 43 deletions

View File

@@ -593,7 +593,8 @@ private:
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const;
lw_shared_ptr<sstables::sstable_set> make_maintenance_sstable_set() const;
lw_shared_ptr<sstables::sstable_set> make_compound_sstable_set();
@@ -665,9 +666,8 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema,
flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema,
reader_permit permit,
std::vector<sstables::shared_sstable>& sst,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
@@ -705,7 +705,7 @@ public:
sstables::shared_sstable make_streaming_staging_sstable();
mutation_source as_mutation_source() const;
mutation_source as_mutation_source_excluding(std::vector<sstables::shared_sstable>& sst) const;
mutation_source as_mutation_source_excluding_staging() const;
void set_virtual_reader(mutation_source virtual_reader) {
_virtual_reader = std::move(virtual_reader);

View File

@@ -73,7 +73,8 @@ table::make_sstable_reader(schema_ptr s,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate) const {
// CAVEAT: if make_sstable_reader() is called on a single partition
// we want to optimize and read exactly this partition. As a
// consequence, fast_forward_to() will *NOT* work on the result,
@@ -85,10 +86,10 @@ table::make_sstable_reader(schema_ptr s,
}
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate);
} else {
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr);
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate);
}
}
@@ -2439,9 +2440,8 @@ table::disable_auto_compaction() {
}
flat_mutation_reader_v2
table::make_reader_v2_excluding_sstables(schema_ptr s,
table::make_reader_v2_excluding_staging(schema_ptr s,
reader_permit permit,
std::vector<sstables::shared_sstable>& excluded,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2454,16 +2454,12 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
readers.reserve(memtable_count + 1);
});
auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
_sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable {
if (excluded_ssts.contains(sst)) {
return;
}
effective_sstables->insert(sst);
});
static std::predicate<const sstables::sstable&> auto excl_staging_predicate = [] (const sstables::sstable& sst) {
return !sst.requires_view_building();
};
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate));
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}
@@ -2593,7 +2589,7 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou
s,
std::move(m),
timeout,
as_mutation_source_excluding(excluded_sstables),
as_mutation_source_excluding_staging(),
tracing::trace_state_ptr(),
*_config.streaming_read_concurrency_semaphore,
service::get_local_streaming_priority(),
@@ -2601,8 +2597,8 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou
}
mutation_source
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
return mutation_source([this, &ssts] (schema_ptr s,
table::as_mutation_source_excluding_staging() const {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -2610,7 +2606,7 @@ table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts)
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
}

View File

@@ -771,10 +771,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) {
};
}
// Filter out sstables for reader using bloom filter
const sstable_predicate& default_sstable_predicate() {
static const sstable_predicate predicate = [] (const sstable&) { return true; };
return predicate;
}
static std::predicate<const sstable&> auto
make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) {
return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) {
return predicate(sst) && pk_filter(sst);
};
}
// Filter out sstables for reader using bloom filter and supplied predicate
static std::vector<shared_sstable>
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos) {
auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); };
filter_sstable_for_reader(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) {
auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); };
sstables.erase(boost::remove_if(sstables, filter), sstables.end());
return std::move(sstables);
}
@@ -829,10 +841,11 @@ sstable_set_impl::create_single_key_sstable_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const
{
const auto& pos = pr.start()->value();
auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos);
auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader_v2(schema, permit);
@@ -872,7 +885,8 @@ time_series_sstable_set::create_single_key_sstable_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
const auto& pos = pr.start()->value();
// First check if the optimized algorithm for TWCS single partition queries can be applied.
// Multiple conditions must be satisfied:
@@ -894,11 +908,11 @@ time_series_sstable_set::create_single_key_sstable_reader(
// Some of the conditions were not satisfied so we use the standard query path.
return sstable_set_impl::create_single_key_sstable_reader(
cf, std::move(schema), std::move(permit), sstable_histogram,
pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr);
pr, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, predicate);
}
auto pk_filter = make_pk_filter(pos, *schema);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); });
auto sst_filter = make_sstable_filter(pos, *schema, predicate);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); });
if (it == _sstables->end()) {
// No sstables contain data for the queried partition.
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
@@ -911,6 +925,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
return sst.make_reader(schema, permit, pr, slice, pc, trace_state, fwd_sm);
};
auto pk_filter = make_pk_filter(pos, *schema);
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };
// We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that
@@ -1096,7 +1111,8 @@ compound_sstable_set::create_single_key_sstable_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
auto sets = _sets;
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return !set->all()->empty(); });
auto non_empty_set_count = std::distance(sets.begin(), it);
@@ -1107,13 +1123,13 @@ compound_sstable_set::create_single_key_sstable_reader(
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
if (non_empty_set_count == 1) {
const auto& non_empty_set = *std::begin(sets);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
boost::make_iterator_range(sets.begin(), it)
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr, predicate);
})
);
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
@@ -1130,10 +1146,11 @@ sstable_set::create_single_key_sstable_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
assert(pr.is_singular() && pr.start()->value().has_key());
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr, predicate);
}
flat_mutation_reader_v2
@@ -1171,11 +1188,15 @@ sstable_set::make_local_shard_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator) const
read_monitor_generator& monitor_generator,
const sstable_predicate& predicate) const
{
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator, &predicate]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
assert(!sst->is_shared());
if (!predicate(*sst)) {
return make_empty_flat_reader_v2(s, permit);
}
return sst->make_reader(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
if (auto sstables = _impl->all(); sstables->size() == 1) [[unlikely]] {

View File

@@ -49,6 +49,10 @@ public:
double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const;
};
using sstable_predicate = noncopyable_function<bool(const sstable&)>;
// Default predicate includes everything
const sstable_predicate& default_sstable_predicate();
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
std::unique_ptr<sstable_set_impl> _impl;
schema_ptr _schema;
@@ -115,7 +119,8 @@ public:
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate& p = default_sstable_predicate()) const;
/// Read a range from the sstable set.
///
@@ -142,7 +147,8 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
read_monitor_generator& rmg = default_read_monitor_generator(),
const sstable_predicate& p = default_sstable_predicate()) const;
flat_mutation_reader_v2 make_crawling_reader(
schema_ptr,

View File

@@ -45,7 +45,8 @@ public:
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate&) const;
};
// specialized when sstables are partitioned in the token range space
@@ -141,7 +142,8 @@ public:
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;
friend class sstable_position_reader_queue;
};
@@ -173,7 +175,8 @@ public:
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;
class incremental_selector;
};

View File

@@ -5117,3 +5117,55 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2);
});
}
SEASTAR_TEST_CASE(test_sstables_excluding_staging_correctness) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(2);
auto make_mut = [&] (auto pkey) {
auto mut1 = mutation(s, pkey);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
return mut1;
};
std::set<mutation, mutation_decorated_key_less_comparator> sorted_muts;
sorted_muts.insert(make_mut(pks[0]));
sorted_muts.insert(make_mut(pks[1]));
auto tmp = tmpdir();
table_for_tests t(env.manager(), s, tmp.path().string());
auto close_t = deferred_stop(t);
t->mark_ready_for_writes();
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big);
};
auto staging_sst = make_sstable_containing(sst_gen, {*sorted_muts.begin()});
staging_sst->move_to_new_dir((tmp.path() / fs::path(sstables::staging_dir)).string(), staging_sst->generation()).get();
BOOST_REQUIRE(staging_sst->requires_view_building());
auto regular_sst = make_sstable_containing(sst_gen, {*sorted_muts.rbegin()});
t->add_sstable_and_update_cache(staging_sst).get();
t->add_sstable_and_update_cache(regular_sst).get();
{
testlog.info("table::as_mutation_source_excluding_staging()");
auto ms_excluding_staging = t->as_mutation_source_excluding_staging();
assert_that(ms_excluding_staging.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range))
.produces(*sorted_muts.rbegin())
.produces_end_of_stream();
}
{
testlog.info("table::as_mutation_source()");
auto ms_inclusive = t->as_mutation_source();
assert_that(ms_inclusive.make_reader_v2(s, env.make_reader_permit(), query::full_partition_range))
.produces(*sorted_muts.begin())
.produces(*sorted_muts.rbegin())
.produces_end_of_stream();
}
});
}

View File

@@ -3393,3 +3393,93 @@ SEASTAR_TEST_CASE(find_first_position_in_partition_from_sstable_test) {
}
});
}
SEASTAR_TEST_CASE(test_sstable_set_predicate) {
return test_env::do_with_async([this] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto s = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto tmp = tmpdir();
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, muts);
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = cs.make_sstable_set(s);
set.insert(sst);
auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key());
auto make_point_query_reader = [&] (std::predicate<const sstable&> auto& pred) {
table_for_tests t(env.manager(), s);
auto close_t = deferred_stop(t);
utils::estimated_histogram eh;
return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh,
first_key_pr,
s->full_slice(),
default_priority_class(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
pred);
};
auto make_full_scan_reader = [&] (std::predicate<const sstable&> auto& pred) {
return set.make_local_shard_sstable_reader(s, env.make_reader_permit(),
query::full_partition_range,
s->full_slice(),
default_priority_class(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
default_read_monitor_generator(),
pred);
};
auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) {
auto close_mr = deferred_close(sst_mr);
auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0();
if (expect_eos) {
BOOST_REQUIRE(sst_mr.is_buffer_empty());
BOOST_REQUIRE(sst_mr.is_end_of_stream());
BOOST_REQUIRE(!sst_mut);
} else {
BOOST_REQUIRE(sst_mut);
}
};
{
static std::predicate<const sstable&> auto excluding_pred = [] (const sstable&) {
return false;
};
testlog.info("excluding_pred: point query");
verify_reader_result(make_point_query_reader(excluding_pred), true);
testlog.info("excluding_pred: range query");
verify_reader_result(make_full_scan_reader(excluding_pred), true);
}
{
static std::predicate<const sstable&> auto inclusive_pred = [] (const sstable&) {
return true;
};
testlog.info("inclusive_pred: point query");
verify_reader_result(make_point_query_reader(inclusive_pred), false);
testlog.info("inclusive_pred: range query");
verify_reader_result(make_full_scan_reader(inclusive_pred), false);
}
});
}