sstables: Fix bloom filter size after resharding by properly estimating partition count

We were feeding the total estimation partition count of an input shared
sstable to the output unshared ones.

So sstable writer thinks, *from estimation*, that each sstable created
by resharding will have the same data amount as the shared sstable they
are being created from. That's a problem because estimation is feeded to
bloom filter creation which directly influences its size.
So if we're resharding all sstables that belong to all shards, the
disk usage taken by filter components will be multiplied by the number
of shards. That becomes more of a problem with #3302.

Partition count estimation for a shard S will now be done as follow:
    //
    // TE, the total estimated partition count for a shard S, is defined as
    // TE = Sum(i = 0...N) { Ei / Si }.
    //
    // where i is an input sstable that belongs to shard S,
    //       Ei is the estimated partition count for sstable i,
    //       Si is the total number of shards that own sstable i.

Fixes #2672.
Refs #3302.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20180423151001.9995-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2018-04-23 12:10:01 -03:00
committed by Avi Kivity
parent 8a8f688dbf
commit 11940ca39e
3 changed files with 71 additions and 13 deletions

View File

@@ -610,6 +610,27 @@ class resharding_compaction final : public compaction {
shard_id _shard; // shard of current sstable writer
std::function<shared_sstable(shard_id)> _sstable_creator;
compaction_backlog_tracker _resharding_backlog_tracker;
// Partition count estimation for a shard S:
//
// TE, the total estimated partition count for a shard S, is defined as
// TE = Sum(i = 0...N) { Ei / Si }.
//
// where i is an input sstable that belongs to shard S,
// Ei is the estimated partition count for sstable i,
// Si is the total number of shards that own sstable i.
//
struct estimated_values {
uint64_t estimated_size = 0;
uint64_t estimated_partitions = 0;
};
std::vector<estimated_values> _estimation_per_shard;
private:
// return estimated partitions per sstable for a given shard
uint64_t partitions_per_sstable(shard_id s) const {
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
return ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables);
}
public:
resharding_compaction(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable(shard_id)> creator,
uint64_t max_sstable_size, uint32_t sstable_level)
@@ -617,10 +638,19 @@ public:
, _output_sstables(smp::count)
, _sstable_creator(std::move(creator))
, _resharding_backlog_tracker(std::make_unique<resharding_backlog_tracker>())
, _estimation_per_shard(smp::count)
{
cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker);
for (auto& s : _sstables) {
_resharding_backlog_tracker.add_sstable(s);
for (auto& sst : _sstables) {
_resharding_backlog_tracker.add_sstable(sst);
const auto& shards = sst->get_shards_for_this_sstable();
auto size = sst->bytes_on_disk();
auto estimated_partitions = sst->get_estimated_key_count();
for (auto& s : shards) {
_estimation_per_shard[s].estimated_size += std::max(uint64_t(1), uint64_t(ceil(double(size) / shards.size())));
_estimation_per_shard[s].estimated_partitions += std::max(uint64_t(1), uint64_t(ceil(double(estimated_partitions) / shards.size())));
}
}
_info->type = compaction_type::Reshard;
}
@@ -666,7 +696,7 @@ public:
sstable_writer_config cfg;
cfg.max_sstable_size = _max_sstable_size;
auto&& priority = service::get_local_compaction_priority();
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard));
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, priority, _shard));
}
return &*writer;
}

View File

@@ -69,7 +69,8 @@ void run_sstable_resharding_test() {
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats);
cf->mark_ready_for_writes();
std::unordered_map<shard_id, mutation> muts;
std::unordered_map<shard_id, std::vector<mutation>> muts;
static constexpr auto keys_per_shard = 1000u;
// create sst shared by all shards
{
@@ -81,18 +82,29 @@ void run_sstable_resharding_test() {
return m;
};
for (auto i : boost::irange(0u, smp::count)) {
auto key_token_pair = token_generation_for_shard(i, 1);
BOOST_REQUIRE(key_token_pair.size() == 1);
auto m = get_mutation(key_token_pair[0].first, i);
muts.emplace(i, m);
mt->apply(std::move(m));
auto key_token_pair = token_generation_for_shard(i, keys_per_shard);
BOOST_REQUIRE(key_token_pair.size() == keys_per_shard);
muts[i].reserve(keys_per_shard);
for (auto k : boost::irange(0u, keys_per_shard)) {
auto m = get_mutation(key_token_pair[k].first, i);
muts[i].push_back(m);
mt->apply(std::move(m));
}
}
auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big);
write_memtable_to_sstable(*mt, sst).get();
}
auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big);
sst->load().get();
sst->set_unshared();
// FIXME: sstable write has a limitation in which it will generate sharding metadata only
// for a single shard. workaround that by setting shards manually. from this test perspective,
// it doesn't matter because we check each partition individually of each sstable created
// for a shard that owns the shared input sstable.
sstables::test(sst).set_shards(boost::copy_range<std::vector<unsigned>>(boost::irange(0u, smp::count)));
auto filter_fname = sstables::test(sst).filename(sstable::component_type::Filter);
uint64_t bloom_filter_size_before = file_size(filter_fname).get0();
auto creator = [&cf, tmp, version] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard,
@@ -109,19 +121,27 @@ void run_sstable_resharding_test() {
auto new_sstables = sstables::reshard_sstables({ sst }, *cf, creator, std::numeric_limits<uint64_t>::max(), 0).get0();
BOOST_REQUIRE(new_sstables.size() == smp::count);
uint64_t bloom_filter_size_after = 0;
for (auto& sstable : new_sstables) {
auto new_sst = sstables::make_sstable(s, tmp->path, sstable->generation(),
version, sstables::sstable::format_types::big);
new_sst->load().get();
filter_fname = sstables::test(new_sst).filename(sstable::component_type::Filter);
bloom_filter_size_after += file_size(filter_fname).get0();
auto shards = new_sst->get_shards_for_this_sstable();
BOOST_REQUIRE(shards.size() == 1); // check sstable is unshared.
auto shard = shards.front();
BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard);
assert_that(new_sst->as_mutation_source().make_reader(s))
.produces(muts.at(shard))
.produces_end_of_stream();
auto rd = assert_that(new_sst->as_mutation_source().make_reader(s));
BOOST_REQUIRE(muts[shard].size() == keys_per_shard);
for (auto k : boost::irange(0u, keys_per_shard)) {
rd.produces(muts[shard][k]);
}
rd.produces_end_of_stream();
}
BOOST_REQUIRE_CLOSE_FRACTION(float(bloom_filter_size_before), float(bloom_filter_size_after), 0.1);
}
}

View File

@@ -203,6 +203,14 @@ public:
future<> remove_component(sstable::component_type c) {
return remove_file(_sst->filename(c));
}
const sstring filename(sstable::component_type c) const {
return _sst->filename(c);
}
void set_shards(std::vector<unsigned> shards) {
_sst->_shards = std::move(shards);
}
};
inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)