compaction: mc: re-calculate encoding_stats based on column stats

When compacting several sstables, get and merge their encoding_stats
for encoding the result.

Introduce sstable::get_encoding_stats_for_compaction to return encoding_stats
based on the sstable's column stats.

Use encoding_stats_collector to keep track of the minimum encoding_stats
values of all input sstables.

Fixes #3971

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2018-12-31 14:23:14 +02:00
parent e2c4d2d60a
commit 1d483bc424
4 changed files with 32 additions and 4 deletions

View File

@@ -76,6 +76,12 @@ public:
min_ttl.update(ttl);
}
void update(const encoding_stats& other) {
update_timestamp(other.min_timestamp);
update_local_deletion_time(other.min_local_deletion_time);
update_ttl(other.min_ttl);
}
encoding_stats get() const {
return { min_timestamp.get(), min_local_deletion_time.get(), min_ttl.get() };
}

View File

@@ -321,6 +321,7 @@ protected:
uint64_t _estimated_partitions = 0;
std::vector<unsigned long> _ancestors;
db::replay_position _rp;
encoding_stats_collector _stats_collector;
protected:
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level)
: _cf(cf)
@@ -329,6 +330,9 @@ protected:
, _sstable_level(sstable_level)
{
_info->cf = &cf;
for (auto sst : sstables) {
_stats_collector.update(sst->get_encoding_stats_for_compaction());
}
_cf.get_compaction_manager().register_compaction(_info);
}
@@ -359,6 +363,10 @@ protected:
});
return (*m)->get_stats_metadata().max_timestamp;
}
encoding_stats get_encoding_stats() const {
return _stats_collector.get();
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -589,8 +597,7 @@ public:
cfg.monitor = &_active_write_monitors.back();
cfg.large_partition_handler = _cf.get_large_partition_handler();
cfg.run_identifier = _run_identifier;
// TODO: calculate encoding_stats based on statistics of compacted sstables
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, encoding_stats{}, priority));
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, get_encoding_stats(), priority));
}
do_pending_replacements();
return &*_writer;
@@ -824,8 +831,7 @@ public:
cfg.max_sstable_size = _max_sstable_size;
cfg.large_partition_handler = _cf.get_large_partition_handler();
auto&& priority = service::get_local_compaction_priority();
// TODO: calculate encoding_stats based on statistics of compacted sstables
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard));
}
return &*writer;
}

View File

@@ -2383,6 +2383,20 @@ sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partition
return sstable_writer(*this, s, estimated_partitions, cfg, enc_stats, pc, shard);
}
// Encoding stats for compaction are based on the sstable's stats metadata
// since, in contract to the mc-format encoding_stats that are evaluated
// before the sstable data is written, the stats metadata is updated during
// writing so it provides actual minimum values of the written timestamps.
encoding_stats sstable::get_encoding_stats_for_compaction() const {
encoding_stats enc_stats;
enc_stats.min_timestamp = _c_stats.timestamp_tracker.min();
enc_stats.min_local_deletion_time = _c_stats.local_deletion_time_tracker.min();
enc_stats.min_ttl = _c_stats.ttl_tracker.min();
return enc_stats;
}
future<> sstable::write_components(
flat_mutation_reader mr,
uint64_t estimated_partitions,

View File

@@ -268,6 +268,8 @@ public:
const io_priority_class& pc = default_priority_class(),
shard_id shard = engine().cpu_id());
encoding_stats get_encoding_stats_for_compaction() const;
future<> seal_sstable(bool backup);
static uint64_t get_estimated_key_count(const uint32_t size_at_full_sampling, const uint32_t min_index_interval) {