From b551f4abd2ef0857ff69a612c092b16f4a2887f6 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 3 Nov 2023 15:20:50 +0200 Subject: [PATCH] streaming: Improve partition estimation with TWCS When off-strategy is disabled, data segregation is not postponed, meaning that getting partition estimate right is important to decrease filter's false positives. With streaming, we don't have min and max timestamps at destination, well, we could have extended the RPC verb to send them, but turns out we can deduce easily the amount of windows using default TTL. Given partitioner random nature, it's not absurd to assume that a given range being streamed may overlap with all windows, meaning that each range will yield one sstable for each window when segregating incoming data. Today, we assume the worst of 100 windows (which is the max amount of sstables the input data can be segregated into) due to the lack of metadata for estimating the window count. But given that users are recommended to target a max of ~20 windows, it means partition estimate is being downsized 5x more than needed. Let's improve it by using default TTL when estimating window count, so even on absence of timestamp metadata, the partition estimation won't be way off. Fixes #15704. Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 4 +-- compaction/compaction_strategy.cc | 6 ++-- compaction/compaction_strategy.hh | 2 +- compaction/compaction_strategy_impl.hh | 2 +- compaction/time_window_compaction_strategy.cc | 29 +++++++++++++------ compaction/time_window_compaction_strategy.hh | 2 +- replica/table.cc | 2 +- streaming/consumer.cc | 2 +- test/boost/sstable_compaction_test.cc | 23 +++++++++++++++ 9 files changed, 53 insertions(+), 19 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 5d51fb3561..8dcea0f6d0 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -547,7 +547,7 @@ protected: auto max_sstable_size = std::max(_max_sstable_size, 1); uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_compacting_data_file_size) / max_sstable_size))); return std::min(uint64_t(ceil(double(_estimated_partitions) / estimated_sstables)), - _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions)); + _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions, _schema)); } void setup_new_sstable(shared_sstable& sst) { @@ -1636,7 +1636,7 @@ private: uint64_t partitions_per_sstable(shard_id s) const { uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size))); return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)), - _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions)); + _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions, _schema)); } public: resharding_compaction(table_state& table_s, sstables::compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor) diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 027fb08b95..8280888839 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -66,7 +66,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold; } -uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { +uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const { return partition_estimate; } @@ -704,8 +704,8 @@ compaction_strategy::get_reshaping_job(std::vector input, schema return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode); } -uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { - return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); +uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const { + return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate, std::move(schema)); } reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const { diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index d734ef7d67..fe8d05c8e1 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -104,7 +104,7 @@ public: compaction_backlog_tracker make_backlog_tracker() const; - uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const; + uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr) const; reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const; diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 6b61c2941e..4aebb69184 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -68,7 +68,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const = 0; - virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const; + virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const; virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const; diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index d03096eba3..71736394f5 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -184,16 +184,27 @@ public: }; }; -uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { - if (!ms_meta.min_timestamp || !ms_meta.max_timestamp) { - // Not enough information, we assume the worst - return partition_estimate / max_data_segregation_window_count; - } - const auto min_window = get_window_for(_options, *ms_meta.min_timestamp); - const auto max_window = get_window_for(_options, *ms_meta.max_timestamp); - const auto window_size = get_window_size(_options); +uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const { + // If not enough information, we assume the worst + auto estimated_window_count = max_data_segregation_window_count; + auto default_ttl = std::chrono::duration_cast(s->default_time_to_live()); + bool min_and_max_ts_available = ms_meta.min_timestamp && ms_meta.max_timestamp; + auto estimate_window_count = [this] (timestamp_type min_window, timestamp_type max_window) { + const auto window_size = get_window_size(_options); + return (max_window + (window_size - 1) - min_window) / window_size; + }; - auto estimated_window_count = (max_window + (window_size - 1) - min_window) / window_size; + if (!min_and_max_ts_available && default_ttl.count()) { + auto min_window = get_window_for(_options, timestamp_type(0)); + auto max_window = get_window_for(_options, timestamp_type(default_ttl.count())); + + estimated_window_count = estimate_window_count(min_window, max_window); + } else if (min_and_max_ts_available) { + auto min_window = get_window_for(_options, *ms_meta.min_timestamp); + auto max_window = get_window_for(_options, *ms_meta.max_timestamp); + + estimated_window_count = estimate_window_count(min_window, max_window); + } return partition_estimate / std::max(1UL, uint64_t(estimated_window_count)); } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 28361f3466..26609ba371 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -162,7 +162,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const override; - virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const override; + virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const override; virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const override; diff --git a/replica/table.cc b/replica/table.cc index 051f2114f9..83af828d14 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -995,7 +995,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrget_min_timestamp(); metadata.max_timestamp = old->get_max_timestamp(); - auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count()); + auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema); if (!cg.async_gate().is_closed()) { co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.as_table_state()); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 4281a4c2fd..6acbbe1fa4 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -35,7 +35,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin auto& cs = cf->get_compaction_strategy(); // Data segregation is postponed to happen during off-strategy if latter is enabled, which // means partition estimation shouldn't be adjusted. - const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions); + const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable { if (offstrategy) { return end_consumer; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 3f0212c576..7c3f98fa8a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -48,6 +48,7 @@ #include "mutation_writer/partition_based_splitting_writer.hh" #include "compaction/table_state.hh" #include "mutation/mutation_rebuilder.hh" +#include "mutation/mutation_source_metadata.hh" #include #include @@ -3583,6 +3584,28 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) { auto close_cf = deferred_stop(cf); cf->start(); + auto ceil_div = [] (int dividend, int divisor) { return (dividend + divisor - 1) / divisor; }; + + auto estimation_test = [ceil_div] (schema_ptr s, uint64_t window_count) { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, s->compaction_strategy_options()); + mutation_source_metadata ms_metadata{}; + const int partitions = 100; + BOOST_REQUIRE_EQUAL(cs.adjust_partition_estimate(ms_metadata, partitions, s), + ceil_div(partitions, window_count)); + }; + { + static constexpr int window_count = 20; + builder.set_default_time_to_live(std::chrono::duration_cast(std::chrono::hours(window_count))); + auto s = builder.build(); + estimation_test(s, window_count); + } + + { + builder.set_default_time_to_live(0s); + auto s = builder.build(); + estimation_test(s, time_window_compaction_strategy::max_data_segregation_window_count); + } + std::vector sstables_spanning_many_windows = { make_sstable(0), make_sstable(1),