From 11940ca39e26bebcb67c074ea48da170836e75f9 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 23 Apr 2018 12:10:01 -0300 Subject: [PATCH] sstables: Fix bloom filter size after resharding by properly estimating partition count We were feeding the total estimation partition count of an input shared sstable to the output unshared ones. So sstable writer thinks, *from estimation*, that each sstable created by resharding will have the same data amount as the shared sstable they are being created from. That's a problem because estimation is feeded to bloom filter creation which directly influences its size. So if we're resharding all sstables that belong to all shards, the disk usage taken by filter components will be multiplied by the number of shards. That becomes more of a problem with #3302. Partition count estimation for a shard S will now be done as follow: // // TE, the total estimated partition count for a shard S, is defined as // TE = Sum(i = 0...N) { Ei / Si }. // // where i is an input sstable that belongs to shard S, // Ei is the estimated partition count for sstable i, // Si is the total number of shards that own sstable i. Fixes #2672. Refs #3302. Signed-off-by: Raphael S. Carvalho Message-Id: <20180423151001.9995-1-raphaelsc@scylladb.com> --- sstables/compaction.cc | 36 +++++++++++++++++++++++++--- tests/sstable_resharding_test.cc | 40 ++++++++++++++++++++++++-------- tests/sstable_test.hh | 8 +++++++ 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 4423f2bb6f..1215dd0deb 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -610,6 +610,27 @@ class resharding_compaction final : public compaction { shard_id _shard; // shard of current sstable writer std::function _sstable_creator; compaction_backlog_tracker _resharding_backlog_tracker; + + // Partition count estimation for a shard S: + // + // TE, the total estimated partition count for a shard S, is defined as + // TE = Sum(i = 0...N) { Ei / Si }. + // + // where i is an input sstable that belongs to shard S, + // Ei is the estimated partition count for sstable i, + // Si is the total number of shards that own sstable i. + // + struct estimated_values { + uint64_t estimated_size = 0; + uint64_t estimated_partitions = 0; + }; + std::vector _estimation_per_shard; +private: + // return estimated partitions per sstable for a given shard + uint64_t partitions_per_sstable(shard_id s) const { + uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size))); + return ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables); + } public: resharding_compaction(std::vector sstables, column_family& cf, std::function creator, uint64_t max_sstable_size, uint32_t sstable_level) @@ -617,10 +638,19 @@ public: , _output_sstables(smp::count) , _sstable_creator(std::move(creator)) , _resharding_backlog_tracker(std::make_unique()) + , _estimation_per_shard(smp::count) { cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker); - for (auto& s : _sstables) { - _resharding_backlog_tracker.add_sstable(s); + for (auto& sst : _sstables) { + _resharding_backlog_tracker.add_sstable(sst); + + const auto& shards = sst->get_shards_for_this_sstable(); + auto size = sst->bytes_on_disk(); + auto estimated_partitions = sst->get_estimated_key_count(); + for (auto& s : shards) { + _estimation_per_shard[s].estimated_size += std::max(uint64_t(1), uint64_t(ceil(double(size) / shards.size()))); + _estimation_per_shard[s].estimated_partitions += std::max(uint64_t(1), uint64_t(ceil(double(estimated_partitions) / shards.size()))); + } } _info->type = compaction_type::Reshard; } @@ -666,7 +696,7 @@ public: sstable_writer_config cfg; cfg.max_sstable_size = _max_sstable_size; auto&& priority = service::get_local_compaction_priority(); - writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard)); + writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, priority, _shard)); } return &*writer; } diff --git a/tests/sstable_resharding_test.cc b/tests/sstable_resharding_test.cc index acd956a8d1..17c21ff734 100644 --- a/tests/sstable_resharding_test.cc +++ b/tests/sstable_resharding_test.cc @@ -69,7 +69,8 @@ void run_sstable_resharding_test() { auto cl_stats = make_lw_shared(); auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); - std::unordered_map muts; + std::unordered_map> muts; + static constexpr auto keys_per_shard = 1000u; // create sst shared by all shards { @@ -81,18 +82,29 @@ void run_sstable_resharding_test() { return m; }; for (auto i : boost::irange(0u, smp::count)) { - auto key_token_pair = token_generation_for_shard(i, 1); - BOOST_REQUIRE(key_token_pair.size() == 1); - auto m = get_mutation(key_token_pair[0].first, i); - muts.emplace(i, m); - mt->apply(std::move(m)); + auto key_token_pair = token_generation_for_shard(i, keys_per_shard); + BOOST_REQUIRE(key_token_pair.size() == keys_per_shard); + muts[i].reserve(keys_per_shard); + for (auto k : boost::irange(0u, keys_per_shard)) { + auto m = get_mutation(key_token_pair[k].first, i); + muts[i].push_back(m); + mt->apply(std::move(m)); + } } auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big); write_memtable_to_sstable(*mt, sst).get(); } auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big); sst->load().get(); - sst->set_unshared(); + + // FIXME: sstable write has a limitation in which it will generate sharding metadata only + // for a single shard. workaround that by setting shards manually. from this test perspective, + // it doesn't matter because we check each partition individually of each sstable created + // for a shard that owns the shared input sstable. + sstables::test(sst).set_shards(boost::copy_range>(boost::irange(0u, smp::count))); + + auto filter_fname = sstables::test(sst).filename(sstable::component_type::Filter); + uint64_t bloom_filter_size_before = file_size(filter_fname).get0(); auto creator = [&cf, tmp, version] (shard_id shard) mutable { // we need generation calculated by instance of cf at requested shard, @@ -109,19 +121,27 @@ void run_sstable_resharding_test() { auto new_sstables = sstables::reshard_sstables({ sst }, *cf, creator, std::numeric_limits::max(), 0).get0(); BOOST_REQUIRE(new_sstables.size() == smp::count); + uint64_t bloom_filter_size_after = 0; + for (auto& sstable : new_sstables) { auto new_sst = sstables::make_sstable(s, tmp->path, sstable->generation(), version, sstables::sstable::format_types::big); new_sst->load().get(); + filter_fname = sstables::test(new_sst).filename(sstable::component_type::Filter); + bloom_filter_size_after += file_size(filter_fname).get0(); auto shards = new_sst->get_shards_for_this_sstable(); BOOST_REQUIRE(shards.size() == 1); // check sstable is unshared. auto shard = shards.front(); BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard); - assert_that(new_sst->as_mutation_source().make_reader(s)) - .produces(muts.at(shard)) - .produces_end_of_stream(); + auto rd = assert_that(new_sst->as_mutation_source().make_reader(s)); + BOOST_REQUIRE(muts[shard].size() == keys_per_shard); + for (auto k : boost::irange(0u, keys_per_shard)) { + rd.produces(muts[shard][k]); + } + rd.produces_end_of_stream(); } + BOOST_REQUIRE_CLOSE_FRACTION(float(bloom_filter_size_before), float(bloom_filter_size_after), 0.1); } } diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 83348622b4..2f0185b4ee 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -203,6 +203,14 @@ public: future<> remove_component(sstable::component_type c) { return remove_file(_sst->filename(c)); } + + const sstring filename(sstable::component_type c) const { + return _sst->filename(c); + } + + void set_shards(std::vector shards) { + _sst->_shards = std::move(shards); + } }; inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)