streaming: Improve partition estimation with TWCS

When off-strategy is disabled, data segregation is not postponed,
meaning that getting partition estimate right is important to
decrease filter's false positives. With streaming, we don't
have min and max timestamps at destination, well, we could have
extended the RPC verb to send them, but turns out we can deduce
easily the amount of windows using default TTL. Given partitioner
random nature, it's not absurd to assume that a given range being
streamed may overlap with all windows, meaning that each range
will yield one sstable for each window when segregating incoming
data. Today, we assume the worst of 100 windows (which is the
max amount of sstables the input data can be segregated into)
due to the lack of metadata for estimating the window count.
But given that users are recommended to target a max of ~20
windows, it means partition estimate is being downsized 5x more
than needed. Let's improve it by using default TTL when
estimating window count, so even on absence of timestamp
metadata, the partition estimation won't be way off.

Fixes #15704.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2023-11-03 15:20:50 +02:00
parent cca85f5454
commit b551f4abd2
9 changed files with 53 additions and 19 deletions

View File

@@ -547,7 +547,7 @@ protected:
auto max_sstable_size = std::max<uint64_t>(_max_sstable_size, 1);
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_compacting_data_file_size) / max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions));
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions, _schema));
}
void setup_new_sstable(shared_sstable& sst) {
@@ -1636,7 +1636,7 @@ private:
uint64_t partitions_per_sstable(shard_id s) const {
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions));
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions, _schema));
}
public:
resharding_compaction(table_state& table_s, sstables::compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)

View File

@@ -66,7 +66,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const {
return partition_estimate;
}
@@ -704,8 +704,8 @@ compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode);
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate, std::move(schema));
}
reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const {

View File

@@ -104,7 +104,7 @@ public:
compaction_backlog_tracker make_backlog_tracker() const;
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const;
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr) const;
reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const;

View File

@@ -68,7 +68,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const = 0;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const;
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const;

View File

@@ -184,16 +184,27 @@ public:
};
};
uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
if (!ms_meta.min_timestamp || !ms_meta.max_timestamp) {
// Not enough information, we assume the worst
return partition_estimate / max_data_segregation_window_count;
}
const auto min_window = get_window_for(_options, *ms_meta.min_timestamp);
const auto max_window = get_window_for(_options, *ms_meta.max_timestamp);
const auto window_size = get_window_size(_options);
uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const {
// If not enough information, we assume the worst
auto estimated_window_count = max_data_segregation_window_count;
auto default_ttl = std::chrono::duration_cast<std::chrono::microseconds>(s->default_time_to_live());
bool min_and_max_ts_available = ms_meta.min_timestamp && ms_meta.max_timestamp;
auto estimate_window_count = [this] (timestamp_type min_window, timestamp_type max_window) {
const auto window_size = get_window_size(_options);
return (max_window + (window_size - 1) - min_window) / window_size;
};
auto estimated_window_count = (max_window + (window_size - 1) - min_window) / window_size;
if (!min_and_max_ts_available && default_ttl.count()) {
auto min_window = get_window_for(_options, timestamp_type(0));
auto max_window = get_window_for(_options, timestamp_type(default_ttl.count()));
estimated_window_count = estimate_window_count(min_window, max_window);
} else if (min_and_max_ts_available) {
auto min_window = get_window_for(_options, *ms_meta.min_timestamp);
auto max_window = get_window_for(_options, *ms_meta.max_timestamp);
estimated_window_count = estimate_window_count(min_window, max_window);
}
return partition_estimate / std::max(1UL, uint64_t(estimated_window_count));
}

View File

@@ -162,7 +162,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const override;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const override;
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const override;

View File

@@ -995,7 +995,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
auto metadata = mutation_source_metadata{};
metadata.min_timestamp = old->get_min_timestamp();
metadata.max_timestamp = old->get_max_timestamp();
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count());
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
if (!cg.async_gate().is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.as_table_state());

View File

@@ -35,7 +35,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
auto& cs = cf->get_compaction_strategy();
// Data segregation is postponed to happen during off-strategy if latter is enabled, which
// means partition estimation shouldn't be adjusted.
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions);
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable {
if (offstrategy) {
return end_consumer;

View File

@@ -48,6 +48,7 @@
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "compaction/table_state.hh"
#include "mutation/mutation_rebuilder.hh"
#include "mutation/mutation_source_metadata.hh"
#include <stdio.h>
#include <ftw.h>
@@ -3583,6 +3584,28 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
auto close_cf = deferred_stop(cf);
cf->start();
auto ceil_div = [] (int dividend, int divisor) { return (dividend + divisor - 1) / divisor; };
auto estimation_test = [ceil_div] (schema_ptr s, uint64_t window_count) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, s->compaction_strategy_options());
mutation_source_metadata ms_metadata{};
const int partitions = 100;
BOOST_REQUIRE_EQUAL(cs.adjust_partition_estimate(ms_metadata, partitions, s),
ceil_div(partitions, window_count));
};
{
static constexpr int window_count = 20;
builder.set_default_time_to_live(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::hours(window_count)));
auto s = builder.build();
estimation_test(s, window_count);
}
{
builder.set_default_time_to_live(0s);
auto s = builder.build();
estimation_test(s, time_window_compaction_strategy::max_data_segregation_window_count);
}
std::vector<shared_sstable> sstables_spanning_many_windows = {
make_sstable(0),
make_sstable(1),