Merge 'Optimize creation of reader excluding staging for view building' from Raphael "Raph" Carvalho

View building from staging creates a reader from scratch (memtable
\+ sstables - staging) for every partition, in order to calculate
the diff between new staging data and data in base sstable set,
and then pushes the result into the view replicas.

perf shows that the reader creation is very expensive:
```
+   12.15%    10.75%  reactor-3        scylla             [.] lexicographical_tri_compare<compound_type<(allow_prefixes)0>::iterator, compound_type<(allow_prefixes)0>::iterator, legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()(managed_bytes_basic_view<(mutable_view)0>, managed_bytes
+   10.01%     9.99%  reactor-3        scylla             [.] boost::icl::is_empty<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    8.95%     8.94%  reactor-3        scylla             [.] legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()
+    7.29%     7.28%  reactor-3        scylla             [.] dht::ring_position_tri_compare
+    6.28%     6.27%  reactor-3        scylla             [.] dht::tri_compare
+    4.11%     3.52%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    4.09%     4.07%  reactor-3        scylla             [.] sstables::index_consume_entry_context<sstables::index_consumer>::process_state
+    3.46%     0.93%  reactor-3        scylla             [.] sstables::sstable_run::will_introduce_overlapping
+    2.53%     2.53%  reactor-3        libstdc++.so.6     [.] std::_Rb_tree_increment
+    2.45%     2.45%  reactor-3        scylla             [.] boost::icl::non_empty::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.14%     2.13%  reactor-3        scylla             [.] boost::icl::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.07%     2.07%  reactor-3        scylla             [.] logalloc::region_impl::free
+    2.06%     1.91%  reactor-3        scylla             [.] sstables::index_consumer::consume_entry(sstables::parsed_partition_index_entry&&)::{lambda()https://github.com/scylladb/scylladb/issues/1}::operator()() const::{lambda()https://github.com/scylladb/scylladb/issues/1}::operator()
+    2.04%     2.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    1.87%     0.00%  reactor-3        [kernel.kallsyms]  [k] entry_SYSCALL_64_after_hwframe
+    1.86%     0.00%  reactor-3        [kernel.kallsyms]  [k] do_syscall_64
+    1.39%     1.38%  reactor-3        libc.so.6          [.] __memcmp_avx2_movbe
+    1.37%     0.92%  reactor-3        scylla             [.] boost::icl::segmental::join_left<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::
+    1.34%     1.33%  reactor-3        scylla             [.] logalloc::region_impl::alloc_small
+    1.33%     1.33%  reactor-3        scylla             [.] seastar::memory::small_pool::add_more_objects
+    1.30%     0.35%  reactor-3        scylla             [.] seastar::reactor::do_run
+    1.29%     1.29%  reactor-3        scylla             [.] seastar::memory::allocate
+    1.19%     0.05%  reactor-3        libc.so.6          [.] syscall
+    1.16%     1.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst
+    1.07%     0.79%  reactor-3        scylla             [.] sstables::partitioned_sstable_set::insert

```
That shows some significant amount of work for inserting sstables
into the interval map and maintaining the sstable run (which sorts
fragments by first key and checks for overlapping).

The interval map is known for having issues with L0 sstables, as
it will have to be replicated almost to every single interval
stored by the map, causing terrible space and time complexity.
With enough L0 sstables, it can fall into quadratic behavior.

This overhead is fixed by not building a new fresh sstable set
when recreating the reader, but rather supplying a predicate
to sstable set that will filter out staging sstables when
creating either a single-key or range scan reader.

This could have another benefit over today's approach which
may incorrectly consider a staging sstable as non-staging, if
the staging sst wasn't included in the current batch for view
building.

With this improvement, view building was measured to be 3x faster.

from
`INFO  2023-06-16 12:36:40,014 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 963957ms = 50kB/s`

to
`INFO  2023-06-16 14:47:12,129 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 319899ms = 150kB/s`

Refs https://github.com/scylladb/scylladb/issues/14089.
Fixes scylladb/scylladb#14244.

Closes #14364

* github.com:scylladb/scylladb:
  table: Optimize creation of reader excluding staging for view building
  view_update_generator: Dump throughput and duration for view update from staging
  utils: Extract pretty printers into a header
This commit is contained in:
Botond Dénes
2023-06-27 07:25:30 +03:00
15 changed files with 244 additions and 90 deletions

View File

@@ -148,25 +148,6 @@ std::ostream& operator<<(std::ostream& os, compaction_type_options::scrub::quara
return os << to_string(quarantine_mode);
}
std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) {
static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" };
unsigned exp = 0;
while ((data._size >= 1000) && (exp < sizeof(suffixes))) {
exp++;
data._size /= 1000;
}
os << data._size << suffixes[exp];
return os;
}
std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) {
uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0;
os << pretty_printed_data_size(throughput) << "/s";
return os;
}
static api::timestamp_type get_max_purgeable_timestamp(const table_state& table_s, sstable_set::incremental_selector& selector,
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk, uint64_t& bloom_filter_checks) {
if (!table_s.tombstone_gc_enabled()) [[unlikely]] {
@@ -806,8 +787,8 @@ protected:
// By the time being, using estimated key count.
log_info("{} {} sstables to {}. {} to {} (~{}% of original) in {}ms = {}. ~{} total partitions merged to {}.",
report_finish_desc(),
_input_sstable_generations.size(), new_sstables_msg, pretty_printed_data_size(_start_size), pretty_printed_data_size(_end_size), int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_end_size, duration),
_input_sstable_generations.size(), new_sstables_msg, utils::pretty_printed_data_size(_start_size), utils::pretty_printed_data_size(_end_size), int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), utils::pretty_printed_throughput(_end_size, duration),
_cdata.total_partitions, _cdata.total_keys_written);
return ret;
@@ -944,7 +925,7 @@ void compacted_fragments_writer::split_large_partition() {
_c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk);
_compaction_writer->writer.consume(std::move(rtc));
}
_c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, pretty_printed_data_size(_c._max_sstable_size));
_c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, utils::pretty_printed_data_size(_c._max_sstable_size));
// Close partition in current writer, and open it again in a new writer.
do_consume_end_of_partition();
stop_current_writer();

View File

@@ -14,6 +14,7 @@
#include "gc_clock.hh"
#include "compaction_weight_registration.hh"
#include "utils/UUID.hh"
#include "utils/pretty_printers.hh"
#include "table_state.hh"
#include <seastar/core/thread.hh>
#include <seastar/core/abort_source.hh>
@@ -24,21 +25,6 @@ namespace sstables {
bool is_eligible_for_compaction(const sstables::shared_sstable& sst) noexcept;
class pretty_printed_data_size {
uint64_t _size;
public:
pretty_printed_data_size(uint64_t size) : _size(size) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size);
};
class pretty_printed_throughput {
uint64_t _size;
std::chrono::duration<float> _duration;
public:
pretty_printed_throughput(uint64_t size, std::chrono::duration<float> dur) : _size(size), _duration(std::move(dur)) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput);
};
// Return the name of the compaction type
// as used over the REST api, e.g. "COMPACTION" or "CLEANUP".
sstring compaction_name(compaction_type type);

View File

@@ -11,6 +11,7 @@
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
#include "utils/pretty_printers.hh"
namespace compaction {
@@ -295,7 +296,7 @@ future<> table_reshaping_compaction_task_impl::run() {
if (total_size > 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration));
dblog.info("Reshaped {} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), duration.count(), utils::pretty_printed_throughput(total_size, duration));
}
}

View File

@@ -762,6 +762,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/rjson.cc',
'utils/human_readable.cc',
'utils/histogram_metrics_helper.cc',
'utils/pretty_printers.cc',
'converting_mutation_partition_applier.cc',
'readers/combined.cc',
'readers/multishard.cc',

View File

@@ -16,6 +16,7 @@
#include "sstables/progress_monitor.hh"
#include "readers/evictable.hh"
#include "dht/partition_filter.hh"
#include "utils/pretty_printers.hh"
static logging::logger vug_logger("view_update_generator");
@@ -127,9 +128,9 @@ future<> view_update_generator::start() {
auto& [t, sstables] = *table_it;
schema_ptr s = t->schema();
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), sstables.size());
const auto num_sstables = sstables.size();
auto start_time = db_clock::now();
uint64_t input_size = 0;
try {
// Exploit the fact that sstables in the staging directory
@@ -138,8 +139,12 @@ future<> view_update_generator::start() {
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
for (auto& sst : sstables) {
ssts->insert(sst);
input_size += sst->data_size();
}
vug_logger.info("Processing {}.{}: {} in {} sstables",
s->ks_name(), s->cf_name(), utils::pretty_printed_data_size(input_size), sstables.size());
auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout, {}).get0();
auto ms = mutation_source([this, ssts] (
schema_ptr s,
@@ -184,6 +189,12 @@ future<> view_update_generator::start() {
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
}
_registration_sem.signal(num_sstables);
auto end_time = db_clock::now();
auto duration = std::chrono::duration<float>(end_time - start_time);
vug_logger.info("Processed {}.{}: {} sstables in {}ms = {}", s->ks_name(), s->cf_name(), sstables.size(),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
utils::pretty_printed_throughput(input_size, duration));
}
// For each table, move the processed staging sstables into the table's base dir.
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {

View File

@@ -590,7 +590,8 @@ private:
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const;
lw_shared_ptr<sstables::sstable_set> make_maintenance_sstable_set() const;
lw_shared_ptr<sstables::sstable_set> make_compound_sstable_set();
@@ -667,9 +668,8 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema,
flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema,
reader_permit permit,
std::vector<sstables::shared_sstable>& sst,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state = nullptr,
@@ -706,7 +706,7 @@ public:
sstables::shared_sstable make_streaming_staging_sstable();
mutation_source as_mutation_source() const;
mutation_source as_mutation_source_excluding(std::vector<sstables::shared_sstable>& sst) const;
mutation_source as_mutation_source_excluding_staging() const;
void set_virtual_reader(mutation_source virtual_reader) {
_virtual_reader = std::move(virtual_reader);

View File

@@ -278,7 +278,7 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
}
auto start = std::chrono::steady_clock::now();
dblog.info("Resharding {} for {}.{}", sstables::pretty_printed_data_size(total_size), ks_name, table_name);
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name);
co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> {
auto& table = db.local().find_column_family(ks_name, table_name);
@@ -293,7 +293,7 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
}));
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), sstables::pretty_printed_throughput(total_size, duration));
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration));
}
// Global resharding function. Done in two parts:

View File

@@ -97,7 +97,8 @@ table::make_sstable_reader(schema_ptr s,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate) const {
// CAVEAT: if make_sstable_reader() is called on a single partition
// we want to optimize and read exactly this partition. As a
// consequence, fast_forward_to() will *NOT* work on the result,
@@ -109,10 +110,10 @@ table::make_sstable_reader(schema_ptr s,
}
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr);
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
} else {
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice,
std::move(trace_state), fwd, fwd_mr);
std::move(trace_state), fwd, fwd_mr, default_read_monitor_generator(), predicate);
}
}
@@ -2551,9 +2552,8 @@ void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
}
flat_mutation_reader_v2
table::make_reader_v2_excluding_sstables(schema_ptr s,
table::make_reader_v2_excluding_staging(schema_ptr s,
reader_permit permit,
std::vector<sstables::shared_sstable>& excluded,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
@@ -2565,16 +2565,11 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
readers.reserve(memtable_count + 1);
});
auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
_sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable {
if (excluded_ssts.contains(sst)) {
return;
}
effective_sstables->insert(sst);
});
static std::predicate<const sstable&> auto excl_staging_predicate = [] (const sstable& sst) {
return !sst.requires_view_building();
};
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}
@@ -2714,22 +2709,22 @@ table::stream_view_replica_updates(shared_ptr<db::view::view_update_generator> g
s,
std::move(m),
timeout,
as_mutation_source_excluding(excluded_sstables),
as_mutation_source_excluding_staging(),
tracing::trace_state_ptr(),
*_config.streaming_read_concurrency_semaphore,
query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
}
mutation_source
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
return mutation_source([this, &ssts] (schema_ptr s,
table::as_mutation_source_excluding_staging() const {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, std::move(trace_state), fwd, fwd_mr);
return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
});
}

View File

@@ -830,10 +830,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) {
};
}
// Filter out sstables for reader using bloom filter
const sstable_predicate& default_sstable_predicate() {
static const sstable_predicate predicate = [] (const sstable&) { return true; };
return predicate;
}
static std::predicate<const sstable&> auto
make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) {
return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) {
return predicate(sst) && pk_filter(sst);
};
}
// Filter out sstables for reader using bloom filter and supplied predicate
static std::vector<shared_sstable>
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos) {
auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); };
filter_sstable_for_reader(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) {
auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); };
sstables.erase(boost::remove_if(sstables, filter), sstables.end());
return std::move(sstables);
}
@@ -887,10 +899,11 @@ sstable_set_impl::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const
{
const auto& pos = pr.start()->value();
auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos);
auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader_v2(schema, permit);
@@ -929,7 +942,8 @@ time_series_sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
const auto& pos = pr.start()->value();
// First check if the optimized algorithm for TWCS single partition queries can be applied.
// Multiple conditions must be satisfied:
@@ -951,11 +965,11 @@ time_series_sstable_set::create_single_key_sstable_reader(
// Some of the conditions were not satisfied so we use the standard query path.
return sstable_set_impl::create_single_key_sstable_reader(
cf, std::move(schema), std::move(permit), sstable_histogram,
pr, slice, std::move(trace_state), fwd_sm, fwd_mr);
pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate);
}
auto pk_filter = make_pk_filter(pos, *schema);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); });
auto sst_filter = make_sstable_filter(pos, *schema, predicate);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); });
if (it == _sstables->end()) {
// No sstables contain data for the queried partition.
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
@@ -968,6 +982,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
return sst.make_reader(schema, permit, pr, slice, trace_state, fwd_sm);
};
auto pk_filter = make_pk_filter(pos, *schema);
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };
// We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that
@@ -1168,7 +1183,8 @@ compound_sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
auto sets = _sets;
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; });
auto non_empty_set_count = std::distance(sets.begin(), it);
@@ -1179,13 +1195,13 @@ compound_sstable_set::create_single_key_sstable_reader(
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
if (non_empty_set_count == 1) {
const auto& non_empty_set = *std::begin(sets);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
boost::make_iterator_range(sets.begin(), it)
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
})
);
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
@@ -1201,10 +1217,11 @@ sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
assert(pr.is_singular() && pr.start()->value().has_key());
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr);
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
}
flat_mutation_reader_v2
@@ -1240,11 +1257,15 @@ sstable_set::make_local_shard_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator) const
read_monitor_generator& monitor_generator,
const sstable_predicate& predicate) const
{
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator]
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
assert(!sst->is_shared());
if (!predicate(*sst)) {
return make_empty_flat_reader_v2(s, permit);
}
return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
if (_impl->size() == 1) [[unlikely]] {

View File

@@ -55,6 +55,10 @@ public:
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_ext> select(const dht::ring_position_view&) = 0;
};
using sstable_predicate = noncopyable_function<bool(const sstable&)>;
// Default predicate includes everything
const sstable_predicate& default_sstable_predicate();
class sstable_set_impl {
public:
virtual ~sstable_set_impl() {}
@@ -78,7 +82,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate&) const;
};
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
@@ -167,7 +172,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate& p = default_sstable_predicate()) const;
/// Read a range from the sstable set.
///
@@ -192,7 +198,8 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
read_monitor_generator& rmg = default_read_monitor_generator(),
const sstable_predicate& p = default_sstable_predicate()) const;
flat_mutation_reader_v2 make_crawling_reader(
schema_ptr,

View File

@@ -114,7 +114,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;
friend class sstable_position_reader_queue;
};
@@ -147,7 +148,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;
class incremental_selector;
};

View File

@@ -4518,7 +4518,7 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
auto backlog_before = t.as_table_state().get_backlog_tracker().backlog();
t->add_sstable_and_update_cache(sst).get();
testlog.debug("\tNew sstable of size={} level={}; Backlog diff={};",
sstables::pretty_printed_data_size(data_size), level,
utils::pretty_printed_data_size(data_size), level,
t.as_table_state().get_backlog_tracker().backlog() - backlog_before);
};
@@ -4556,7 +4556,7 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
for (auto target_table_count : target_table_count_s) {
const uint64_t per_table_max_disk_usage = std::ceil(all_tables_disk_usage / target_table_count);
testlog.info("Creating tables, with max size={}", sstables::pretty_printed_data_size(per_table_max_disk_usage));
testlog.info("Creating tables, with max size={}", utils::pretty_printed_data_size(per_table_max_disk_usage));
std::vector<table_for_tests> tables;
uint64_t tables_total_size = 0;
@@ -4577,18 +4577,18 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
}
auto table_size = t->get_stats().live_disk_space_used;
testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, sstables::pretty_printed_data_size(table_size));
testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, utils::pretty_printed_data_size(table_size));
tables.push_back(t);
tables_total_size += table_size;
}
testlog.debug("Created {} tables, with total size={}", tables.size(), sstables::pretty_printed_data_size(tables_total_size));
testlog.debug("Created {} tables, with total size={}", tables.size(), utils::pretty_printed_data_size(tables_total_size));
results.push_back(result{ tables.size(), per_table_max_disk_usage, normalize_backlog(manager.backlog()) });
for (auto& t : tables) {
t.stop().get();
}
}
for (auto& r : results) {
testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, sstables::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog);
testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, utils::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog);
// Expect 0 backlog as tiers are all perfectly compacted
// With LCS, the size of levels *set up by the test* can slightly exceed their target size,
// so let's account for the microscopical amount of backlog returned.

View File

@@ -3108,3 +3108,87 @@ SEASTAR_TEST_CASE(test_sstable_bytes_on_disk_correctness) {
SEASTAR_TEST_CASE(test_sstable_bytes_on_s3_correctness) {
return test_sstable_bytes_correctness(get_name() + "_s3", test_env_config{ .storage = make_test_object_storage_options() });
}
SEASTAR_TEST_CASE(test_sstable_set_predicate) {
return test_env::do_with_async([] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto s = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto sst = make_sstable_containing(env.make_sstable(s), muts);
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = cs.make_sstable_set(s);
set.insert(sst);
auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key());
auto make_point_query_reader = [&] (std::predicate<const sstable&> auto& pred) {
auto t = env.make_table_for_tests(s);
auto close_t = deferred_stop(t);
utils::estimated_histogram eh;
return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh,
first_key_pr,
s->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
pred);
};
auto make_full_scan_reader = [&] (std::predicate<const sstable&> auto& pred) {
return set.make_local_shard_sstable_reader(s, env.make_reader_permit(),
query::full_partition_range,
s->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
default_read_monitor_generator(),
pred);
};
auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) {
auto close_mr = deferred_close(sst_mr);
auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0();
if (expect_eos) {
BOOST_REQUIRE(sst_mr.is_buffer_empty());
BOOST_REQUIRE(sst_mr.is_end_of_stream());
BOOST_REQUIRE(!sst_mut);
} else {
BOOST_REQUIRE(sst_mut);
}
};
{
static std::predicate<const sstable&> auto excluding_pred = [] (const sstable&) {
return false;
};
testlog.info("excluding_pred: point query");
verify_reader_result(make_point_query_reader(excluding_pred), true);
testlog.info("excluding_pred: range query");
verify_reader_result(make_full_scan_reader(excluding_pred), true);
}
{
static std::predicate<const sstable&> auto inclusive_pred = [] (const sstable&) {
return true;
};
testlog.info("inclusive_pred: point query");
verify_reader_result(make_point_query_reader(inclusive_pred), false);
testlog.info("inclusive_pred: range query");
verify_reader_result(make_full_scan_reader(inclusive_pred), false);
}
});
}

32
utils/pretty_printers.cc Normal file
View File

@@ -0,0 +1,32 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "pretty_printers.hh"
namespace utils {
std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) {
static constexpr const char *suffixes[] = {" bytes", "kB", "MB", "GB", "TB", "PB"};
unsigned exp = 0;
while ((data._size >= 1000) && (exp < sizeof(suffixes))) {
exp++;
data._size /= 1000;
}
os << data._size << suffixes[exp];
return os;
}
std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) {
uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0;
os << pretty_printed_data_size(throughput) << "/s";
return os;
}
}

33
utils/pretty_printers.hh Normal file
View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <chrono>
#include <ostream>
namespace utils {
class pretty_printed_data_size {
uint64_t _size;
public:
pretty_printed_data_size(uint64_t size) : _size(size) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size);
};
class pretty_printed_throughput {
uint64_t _size;
std::chrono::duration<float> _duration;
public:
pretty_printed_throughput(uint64_t size, std::chrono::duration<float> dur) : _size(size), _duration(std::move(dur)) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput);
};
}