diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 5d51fb3561..8dcea0f6d0 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -547,7 +547,7 @@ protected: auto max_sstable_size = std::max(_max_sstable_size, 1); uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_compacting_data_file_size) / max_sstable_size))); return std::min(uint64_t(ceil(double(_estimated_partitions) / estimated_sstables)), - _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions)); + _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions, _schema)); } void setup_new_sstable(shared_sstable& sst) { @@ -1636,7 +1636,7 @@ private: uint64_t partitions_per_sstable(shard_id s) const { uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size))); return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)), - _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions)); + _table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions, _schema)); } public: resharding_compaction(table_state& table_s, sstables::compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor) diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 027fb08b95..8280888839 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -66,7 +66,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold; } -uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { +uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const { return partition_estimate; } @@ -704,8 +704,8 @@ compaction_strategy::get_reshaping_job(std::vector input, schema return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode); } -uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { - return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); +uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const { + return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate, std::move(schema)); } reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const { diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index d734ef7d67..fe8d05c8e1 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -104,7 +104,7 @@ public: compaction_backlog_tracker make_backlog_tracker() const; - uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const; + uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr) const; reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const; diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 6b61c2941e..4aebb69184 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -68,7 +68,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const = 0; - virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const; + virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const; virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const; diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index d03096eba3..71736394f5 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -184,16 +184,27 @@ public: }; }; -uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const { - if (!ms_meta.min_timestamp || !ms_meta.max_timestamp) { - // Not enough information, we assume the worst - return partition_estimate / max_data_segregation_window_count; - } - const auto min_window = get_window_for(_options, *ms_meta.min_timestamp); - const auto max_window = get_window_for(_options, *ms_meta.max_timestamp); - const auto window_size = get_window_size(_options); +uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const { + // If not enough information, we assume the worst + auto estimated_window_count = max_data_segregation_window_count; + auto default_ttl = std::chrono::duration_cast(s->default_time_to_live()); + bool min_and_max_ts_available = ms_meta.min_timestamp && ms_meta.max_timestamp; + auto estimate_window_count = [this] (timestamp_type min_window, timestamp_type max_window) { + const auto window_size = get_window_size(_options); + return (max_window + (window_size - 1) - min_window) / window_size; + }; - auto estimated_window_count = (max_window + (window_size - 1) - min_window) / window_size; + if (!min_and_max_ts_available && default_ttl.count()) { + auto min_window = get_window_for(_options, timestamp_type(0)); + auto max_window = get_window_for(_options, timestamp_type(default_ttl.count())); + + estimated_window_count = estimate_window_count(min_window, max_window); + } else if (min_and_max_ts_available) { + auto min_window = get_window_for(_options, *ms_meta.min_timestamp); + auto max_window = get_window_for(_options, *ms_meta.max_timestamp); + + estimated_window_count = estimate_window_count(min_window, max_window); + } return partition_estimate / std::max(1UL, uint64_t(estimated_window_count)); } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 28361f3466..26609ba371 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -162,7 +162,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const override; - virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const override; + virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const override; virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const override; diff --git a/replica/table.cc b/replica/table.cc index 051f2114f9..83af828d14 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -995,7 +995,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrget_min_timestamp(); metadata.max_timestamp = old->get_max_timestamp(); - auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count()); + auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema); if (!cg.async_gate().is_closed()) { co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.as_table_state()); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 4281a4c2fd..6acbbe1fa4 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -35,7 +35,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin auto& cs = cf->get_compaction_strategy(); // Data segregation is postponed to happen during off-strategy if latter is enabled, which // means partition estimation shouldn't be adjusted. - const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions); + const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable { if (offstrategy) { return end_consumer; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 3f0212c576..7c3f98fa8a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -48,6 +48,7 @@ #include "mutation_writer/partition_based_splitting_writer.hh" #include "compaction/table_state.hh" #include "mutation/mutation_rebuilder.hh" +#include "mutation/mutation_source_metadata.hh" #include #include @@ -3583,6 +3584,28 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) { auto close_cf = deferred_stop(cf); cf->start(); + auto ceil_div = [] (int dividend, int divisor) { return (dividend + divisor - 1) / divisor; }; + + auto estimation_test = [ceil_div] (schema_ptr s, uint64_t window_count) { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, s->compaction_strategy_options()); + mutation_source_metadata ms_metadata{}; + const int partitions = 100; + BOOST_REQUIRE_EQUAL(cs.adjust_partition_estimate(ms_metadata, partitions, s), + ceil_div(partitions, window_count)); + }; + { + static constexpr int window_count = 20; + builder.set_default_time_to_live(std::chrono::duration_cast(std::chrono::hours(window_count))); + auto s = builder.build(); + estimation_test(s, window_count); + } + + { + builder.set_default_time_to_live(0s); + auto s = builder.build(); + estimation_test(s, time_window_compaction_strategy::max_data_segregation_window_count); + } + std::vector sstables_spanning_many_windows = { make_sstable(0), make_sstable(1),