diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 4423f2bb6f..1215dd0deb 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -610,6 +610,27 @@ class resharding_compaction final : public compaction { shard_id _shard; // shard of current sstable writer std::function _sstable_creator; compaction_backlog_tracker _resharding_backlog_tracker; + + // Partition count estimation for a shard S: + // + // TE, the total estimated partition count for a shard S, is defined as + // TE = Sum(i = 0...N) { Ei / Si }. + // + // where i is an input sstable that belongs to shard S, + // Ei is the estimated partition count for sstable i, + // Si is the total number of shards that own sstable i. + // + struct estimated_values { + uint64_t estimated_size = 0; + uint64_t estimated_partitions = 0; + }; + std::vector _estimation_per_shard; +private: + // return estimated partitions per sstable for a given shard + uint64_t partitions_per_sstable(shard_id s) const { + uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size))); + return ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables); + } public: resharding_compaction(std::vector sstables, column_family& cf, std::function creator, uint64_t max_sstable_size, uint32_t sstable_level) @@ -617,10 +638,19 @@ public: , _output_sstables(smp::count) , _sstable_creator(std::move(creator)) , _resharding_backlog_tracker(std::make_unique()) + , _estimation_per_shard(smp::count) { cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker); - for (auto& s : _sstables) { - _resharding_backlog_tracker.add_sstable(s); + for (auto& sst : _sstables) { + _resharding_backlog_tracker.add_sstable(sst); + + const auto& shards = sst->get_shards_for_this_sstable(); + auto size = sst->bytes_on_disk(); + auto estimated_partitions = sst->get_estimated_key_count(); + for (auto& s : shards) { + _estimation_per_shard[s].estimated_size += std::max(uint64_t(1), uint64_t(ceil(double(size) / shards.size()))); + _estimation_per_shard[s].estimated_partitions += std::max(uint64_t(1), uint64_t(ceil(double(estimated_partitions) / shards.size()))); + } } _info->type = compaction_type::Reshard; } @@ -666,7 +696,7 @@ public: sstable_writer_config cfg; cfg.max_sstable_size = _max_sstable_size; auto&& priority = service::get_local_compaction_priority(); - writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard)); + writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, priority, _shard)); } return &*writer; } diff --git a/tests/sstable_resharding_test.cc b/tests/sstable_resharding_test.cc index acd956a8d1..17c21ff734 100644 --- a/tests/sstable_resharding_test.cc +++ b/tests/sstable_resharding_test.cc @@ -69,7 +69,8 @@ void run_sstable_resharding_test() { auto cl_stats = make_lw_shared(); auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); - std::unordered_map muts; + std::unordered_map> muts; + static constexpr auto keys_per_shard = 1000u; // create sst shared by all shards { @@ -81,18 +82,29 @@ void run_sstable_resharding_test() { return m; }; for (auto i : boost::irange(0u, smp::count)) { - auto key_token_pair = token_generation_for_shard(i, 1); - BOOST_REQUIRE(key_token_pair.size() == 1); - auto m = get_mutation(key_token_pair[0].first, i); - muts.emplace(i, m); - mt->apply(std::move(m)); + auto key_token_pair = token_generation_for_shard(i, keys_per_shard); + BOOST_REQUIRE(key_token_pair.size() == keys_per_shard); + muts[i].reserve(keys_per_shard); + for (auto k : boost::irange(0u, keys_per_shard)) { + auto m = get_mutation(key_token_pair[k].first, i); + muts[i].push_back(m); + mt->apply(std::move(m)); + } } auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big); write_memtable_to_sstable(*mt, sst).get(); } auto sst = sstables::make_sstable(s, tmp->path, 0, version, sstables::sstable::format_types::big); sst->load().get(); - sst->set_unshared(); + + // FIXME: sstable write has a limitation in which it will generate sharding metadata only + // for a single shard. workaround that by setting shards manually. from this test perspective, + // it doesn't matter because we check each partition individually of each sstable created + // for a shard that owns the shared input sstable. + sstables::test(sst).set_shards(boost::copy_range>(boost::irange(0u, smp::count))); + + auto filter_fname = sstables::test(sst).filename(sstable::component_type::Filter); + uint64_t bloom_filter_size_before = file_size(filter_fname).get0(); auto creator = [&cf, tmp, version] (shard_id shard) mutable { // we need generation calculated by instance of cf at requested shard, @@ -109,19 +121,27 @@ void run_sstable_resharding_test() { auto new_sstables = sstables::reshard_sstables({ sst }, *cf, creator, std::numeric_limits::max(), 0).get0(); BOOST_REQUIRE(new_sstables.size() == smp::count); + uint64_t bloom_filter_size_after = 0; + for (auto& sstable : new_sstables) { auto new_sst = sstables::make_sstable(s, tmp->path, sstable->generation(), version, sstables::sstable::format_types::big); new_sst->load().get(); + filter_fname = sstables::test(new_sst).filename(sstable::component_type::Filter); + bloom_filter_size_after += file_size(filter_fname).get0(); auto shards = new_sst->get_shards_for_this_sstable(); BOOST_REQUIRE(shards.size() == 1); // check sstable is unshared. auto shard = shards.front(); BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard); - assert_that(new_sst->as_mutation_source().make_reader(s)) - .produces(muts.at(shard)) - .produces_end_of_stream(); + auto rd = assert_that(new_sst->as_mutation_source().make_reader(s)); + BOOST_REQUIRE(muts[shard].size() == keys_per_shard); + for (auto k : boost::irange(0u, keys_per_shard)) { + rd.produces(muts[shard][k]); + } + rd.produces_end_of_stream(); } + BOOST_REQUIRE_CLOSE_FRACTION(float(bloom_filter_size_before), float(bloom_filter_size_after), 0.1); } } diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 83348622b4..2f0185b4ee 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -203,6 +203,14 @@ public: future<> remove_component(sstable::component_type c) { return remove_file(_sst->filename(c)); } + + const sstring filename(sstable::component_type c) const { + return _sst->filename(c); + } + + void set_shards(std::vector shards) { + _sst->_shards = std::move(shards); + } }; inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)