sstables: do not recompute shards for all tables after each compaction
For every finished compaction, we were calculating shards for all existing tables. With ignore_msb set to 0, it's probably not a big deal, but if ignore_msb is like 12 and LCS is used (meaning thousands of tables possibly), the operation may stall the reactor for a considerable amount of time. That's fixed by caching shards. Fixes #2875. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20171011053424.22308-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
66a15ccd18
commit
67c5c8dc67
@@ -1289,6 +1289,8 @@ future<> sstable::open_data() {
|
||||
_index_file = std::get<file>(std::get<0>(files).get());
|
||||
_data_file = std::get<file>(std::get<1>(files).get());
|
||||
return this->update_info_for_opened_data();
|
||||
}).then([this] {
|
||||
_shards = compute_shards_for_this_sstable();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1395,6 +1397,7 @@ future<> sstable::load(sstables::foreign_sstable_open_info info) {
|
||||
_components = std::move(info.components);
|
||||
_data_file = make_checked_file(_read_error_handler, info.data.to_file());
|
||||
_index_file = make_checked_file(_read_error_handler, info.index.to_file());
|
||||
_shards = std::move(info.owners);
|
||||
validate_min_max_metadata();
|
||||
return update_info_for_opened_data();
|
||||
});
|
||||
@@ -1404,9 +1407,8 @@ future<sstable_open_info> sstable::load_shared_components(const schema_ptr& s, s
|
||||
const io_priority_class& pc) {
|
||||
auto sst = sstables::make_sstable(s, dir, generation, v, f);
|
||||
return sst->load(pc).then([sst] () mutable {
|
||||
auto shards = sst->get_shards_for_this_sstable();
|
||||
auto info = sstable_open_info{make_lw_shared<shareable_components>(std::move(*sst->_components)),
|
||||
std::move(shards), std::move(sst->_data_file), std::move(sst->_index_file)};
|
||||
std::move(sst->_shards), std::move(sst->_data_file), std::move(sst->_index_file)};
|
||||
return make_ready_future<sstable_open_info>(std::move(info));
|
||||
});
|
||||
}
|
||||
@@ -2906,7 +2908,7 @@ uint64_t sstable::estimated_keys_for_range(const dht::token_range& range) {
|
||||
}
|
||||
|
||||
std::vector<unsigned>
|
||||
sstable::get_shards_for_this_sstable() const {
|
||||
sstable::compute_shards_for_this_sstable() const {
|
||||
std::unordered_set<unsigned> shards;
|
||||
dht::partition_range_vector token_ranges;
|
||||
const auto* sm = _components->scylla_metadata
|
||||
|
||||
@@ -488,6 +488,7 @@ private:
|
||||
uint64_t _bytes_on_disk = 0;
|
||||
db_clock::time_point _data_file_write_time;
|
||||
std::vector<nonwrapping_range<bytes_view>> _clustering_components_ranges;
|
||||
std::vector<unsigned> _shards;
|
||||
stdx::optional<dht::decorated_key> _first;
|
||||
stdx::optional<dht::decorated_key> _last;
|
||||
|
||||
@@ -626,6 +627,8 @@ private:
|
||||
void write_deletion_time(file_writer& out, const tombstone t);
|
||||
|
||||
stdx::optional<std::pair<uint64_t, uint64_t>> get_sample_indexes_for_range(const dht::token_range& range);
|
||||
|
||||
std::vector<unsigned> compute_shards_for_this_sstable() const;
|
||||
public:
|
||||
std::unique_ptr<index_reader> get_index_reader(const io_priority_class& pc);
|
||||
|
||||
@@ -690,7 +693,9 @@ public:
|
||||
const compaction_metadata& s = *static_cast<compaction_metadata *>(p.get());
|
||||
return s;
|
||||
}
|
||||
std::vector<unsigned> get_shards_for_this_sstable() const;
|
||||
std::vector<unsigned> get_shards_for_this_sstable() const {
|
||||
return _shards;
|
||||
}
|
||||
|
||||
uint32_t get_sstable_level() const {
|
||||
return get_stats_metadata().sstable_level;
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "sstables/compaction_manager.hh"
|
||||
#include "tmpdir.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/murmur3_partitioner.hh"
|
||||
#include "range.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "sstables/compaction_strategy_impl.hh"
|
||||
@@ -1706,18 +1707,23 @@ static range<dht::token> create_token_range_from_keys(sstring start_key, sstring
|
||||
return range<dht::token>::make(start, end);
|
||||
}
|
||||
|
||||
static std::vector<std::pair<sstring, dht::token>> token_generation_for_current_shard(unsigned tokens_to_generate) {
|
||||
namespace dht {
|
||||
extern std::unique_ptr<i_partitioner> default_partitioner;
|
||||
}
|
||||
|
||||
static std::vector<std::pair<sstring, dht::token>> token_generation_for_shard(unsigned tokens_to_generate, unsigned shard,
|
||||
unsigned ignore_msb = 0, unsigned smp_count = smp::count) {
|
||||
unsigned tokens = 0;
|
||||
unsigned key_id = 0;
|
||||
std::vector<std::pair<sstring, dht::token>> key_and_token_pair;
|
||||
|
||||
key_and_token_pair.reserve(tokens_to_generate);
|
||||
dht::set_global_partitioner(to_sstring("org.apache.cassandra.dht.Murmur3Partitioner"));
|
||||
dht::default_partitioner = std::make_unique<dht::murmur3_partitioner>(smp_count, ignore_msb);
|
||||
|
||||
while (tokens < tokens_to_generate) {
|
||||
sstring key = to_sstring(key_id++);
|
||||
dht::token token = create_token_from_key(key);
|
||||
if (engine().cpu_id() != dht::global_partitioner().shard_of(token)) {
|
||||
if (shard != dht::global_partitioner().shard_of(token)) {
|
||||
continue;
|
||||
}
|
||||
tokens++;
|
||||
@@ -1732,6 +1738,10 @@ static std::vector<std::pair<sstring, dht::token>> token_generation_for_current_
|
||||
return key_and_token_pair;
|
||||
}
|
||||
|
||||
static std::vector<std::pair<sstring, dht::token>> token_generation_for_current_shard(unsigned tokens_to_generate) {
|
||||
return token_generation_for_shard(tokens_to_generate, engine().cpu_id());
|
||||
}
|
||||
|
||||
static void add_sstable_for_leveled_test(lw_shared_ptr<column_family>& cf, int64_t gen, uint64_t fake_data_size,
|
||||
uint32_t sstable_level, sstring first_key, sstring last_key, int64_t max_timestamp = 0) {
|
||||
auto sst = make_sstable(cf->schema(), "", gen, la, big);
|
||||
@@ -4057,6 +4067,77 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_owner_shards) {
|
||||
return seastar::async([] {
|
||||
cell_locker_stats cl_stats;
|
||||
|
||||
auto builder = schema_builder("tests", "test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
auto sst = make_sstable(s, tmp->path, (*gen)++, la, big);
|
||||
sst->set_unshared();
|
||||
return sst;
|
||||
};
|
||||
auto make_insert = [&] (auto p) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(p.first)});
|
||||
mutation m(key, s);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1);
|
||||
BOOST_REQUIRE(m.decorated_key().token() == p.second);
|
||||
return m;
|
||||
};
|
||||
|
||||
auto make_shared_sstable = [&] (std::unordered_set<unsigned> shards, unsigned ignore_msb, unsigned smp_count) {
|
||||
auto mut = [&] (auto shard) {
|
||||
auto tokens = token_generation_for_shard(1, shard, ignore_msb, smp_count);
|
||||
return make_insert(tokens[0]);
|
||||
};
|
||||
auto muts = boost::copy_range<std::vector<mutation>>(shards
|
||||
| boost::adaptors::transformed([&] (auto shard) { return mut(shard); }));
|
||||
dht::default_partitioner = std::make_unique<dht::murmur3_partitioner>(1, ignore_msb);
|
||||
auto sst = make_sstable_containing(sst_gen, std::move(muts));
|
||||
dht::default_partitioner = std::make_unique<dht::murmur3_partitioner>(smp_count, ignore_msb);
|
||||
sst = reusable_sst(s, tmp->path, sst->generation()).get0();
|
||||
// restore partitioner
|
||||
dht::default_partitioner = std::make_unique<dht::murmur3_partitioner>(smp::count);
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto assert_sstable_owners = [&] (std::unordered_set<unsigned> expected_owners, unsigned ignore_msb, unsigned smp_count) {
|
||||
assert(expected_owners.size() <= smp_count);
|
||||
auto sst = make_shared_sstable(expected_owners, ignore_msb, smp_count);
|
||||
dht::default_partitioner = std::make_unique<dht::murmur3_partitioner>(smp_count, ignore_msb);
|
||||
auto owners = boost::copy_range<std::unordered_set<unsigned>>(sst->get_shards_for_this_sstable());
|
||||
BOOST_REQUIRE(boost::algorithm::all_of(expected_owners, [&] (unsigned expected_owner) {
|
||||
return owners.count(expected_owner);
|
||||
}));
|
||||
};
|
||||
|
||||
assert_sstable_owners({ 0 }, 0, 1);
|
||||
assert_sstable_owners({ 0 }, 0, 1);
|
||||
|
||||
assert_sstable_owners({ 0 }, 0, 4);
|
||||
assert_sstable_owners({ 0, 1 }, 0, 4);
|
||||
assert_sstable_owners({ 0, 2 }, 0, 4);
|
||||
assert_sstable_owners({ 0, 1, 2, 3 }, 0, 4);
|
||||
|
||||
assert_sstable_owners({ 0 }, 12, 4);
|
||||
assert_sstable_owners({ 0, 1 }, 12, 4);
|
||||
assert_sstable_owners({ 0, 2 }, 12, 4);
|
||||
assert_sstable_owners({ 0, 1, 2, 3 }, 12, 4);
|
||||
|
||||
assert_sstable_owners({ 10 }, 0, 63);
|
||||
assert_sstable_owners({ 10 }, 12, 63);
|
||||
assert_sstable_owners({ 10, 15 }, 0, 63);
|
||||
assert_sstable_owners({ 10, 15 }, 12, 63);
|
||||
assert_sstable_owners({ 0, 10, 15, 20, 30, 40, 50 }, 0, 63);
|
||||
assert_sstable_owners({ 0, 10, 15, 20, 30, 40, 50 }, 12, 63);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
|
||||
return seastar::async([] {
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
|
||||
Reference in New Issue
Block a user