Merge 'Make SSTable cleanup more efficient by fast forwarding to next owned range' from Raphael "Raph" Carvalho

Today, SSTable cleanup skips to the next partition, one at a time, when it finds that the current partition is no longer owned by this node.

That's very inefficient because when a cluster is growing in size, existing nodes lose multiple sequential tokens in its owned ranges. Another inefficiency comes from fetching index pages spanning all unowned tokens, which was described in https://github.com/scylladb/scylladb/issues/14317.

To solve both problems, cleanup will now use multi range reader, to guarantee that it will only process the owned data and as a result skip unowned data. This results in cleanup scanning an owned range and then fast forwarding to the next one, until it's done with them all. This reduces significantly the amount of data in the index caching, as index will only be invoked at each range boundary instead.

Without further ado,

before:

`INFO  2023-07-01 07:10:26,281 [shard 0] compaction - [Cleanup keyspace2.standard1 701af580-17f7-11ee-8b85-a479a1a77573] Cleaned 1 sstables to [./tmp/1/keyspace2/standard1-b490ee20179f11ee9134afb16b3e10fd/me-3g7a_0s8o_06uww24drzrroaodpv-big-Data.db:level=0]. 2GB to 1GB (~50% of original) in 26248ms = 81MB/s. ~9443072 total partitions merged to 4750028.`

after:

`INFO  2023-07-01 07:07:52,354 [shard 0] compaction - [Cleanup keyspace2.standard1 199dff90-17f7-11ee-b592-b4f5d81717b9] Cleaned 1 sstables to [./tmp/1/keyspace2/standard1-b490ee20179f11ee9134afb16b3e10fd/me-3g7a_0s4m_5hehd2rejj8w15d2nt-big-Data.db:level=0]. 2GB to 1GB (~50% of original) in 17424ms = 123MB/s. ~9443072 total partitions merged to 4750028.`

Fixes #12998.
Fixes #14317.

Closes #14469

* github.com:scylladb/scylladb:
  test: Extend cleanup correctness test to cover more cases
  compaction: Make SSTable cleanup more efficient by fast forwarding to next owned range
  sstables: Close SSTable reader if index exhaustion is detected in fast forward call
  sstables: Simplify sstable reader initialization
  compaction: Extend make_sstable_reader() interface to work with mutation_source
  test: Extend sstable partition skipping test to cover fast forward using token
This commit is contained in:
Avi Kivity
2023-07-11 23:28:15 +03:00
6 changed files with 173 additions and 57 deletions

View File

@@ -50,7 +50,7 @@
#include "utils/utf8.hh"
#include "utils/fmt-compat.hh"
#include "utils/error_injection.hh"
#include "readers/filtering.hh"
#include "readers/multi_range.hh"
#include "readers/compacting.hh"
#include "tombstone_gc.hh"
#include "keys.hh"
@@ -610,16 +610,6 @@ protected:
bool enable_garbage_collected_sstable_writer() const noexcept {
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max();
}
flat_mutation_reader_v2::filter make_partition_filter() const {
return [this] (const dht::decorated_key& dk) {
if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) {
log_trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}
return true;
};
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -631,17 +621,55 @@ public:
}
private:
// Default range sstable reader that will only return mutation that belongs to current shard.
virtual flat_mutation_reader_v2 make_sstable_reader() const = 0;
virtual flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) const = 0;
// Make a filtering reader if needed
// FIXME: the sstable reader itself should be pass the owned ranges
// so it can skip over the disowned ranges efficiently using the index.
// Ref https://github.com/scylladb/scylladb/issues/12998
flat_mutation_reader_v2 setup_sstable_reader() const {
if (!_owned_ranges_checker) {
return make_sstable_reader();
return make_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
}
return make_filtering_reader(make_sstable_reader(), make_partition_filter());
auto source = mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
log_trace("Creating sstable set reader with range {}", range);
return make_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace_state),
fwd,
fwd_mr);
});
auto owned_range_generator = [this] () -> std::optional<dht::partition_range> {
auto r = _owned_ranges_checker->next_owned_range();
if (r == nullptr) {
return std::nullopt;
}
log_trace("Skipping to the next owned range {}", *r);
return dht::to_partition_range(*r);
};
return make_flat_multi_range_reader(_schema, _permit, std::move(source),
std::move(owned_range_generator),
_schema->full_slice(),
tracing::trace_state_ptr());
}
virtual sstables::sstable_set make_sstable_set_for_input() const {
@@ -1019,14 +1047,20 @@ public:
return sstables::make_partitioned_sstable_set(_schema, false);
}
flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
default_read_monitor_generator());
}
@@ -1064,14 +1098,20 @@ public:
{
}
flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
_monitor_generator);
}
@@ -1456,8 +1496,17 @@ public:
return _scrub_finish_description;
}
flat_mutation_reader_v2 make_sstable_reader() const override {
auto crawling_reader = _compacting->make_crawling_reader(_schema, _permit, nullptr);
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
if (!range.is_full()) {
on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range));
}
auto crawling_reader = _compacting->make_crawling_reader(std::move(s), std::move(permit), nullptr);
return make_flat_mutation_reader_v2<reader>(std::move(crawling_reader), _options.operation_mode, _validation_errors);
}
@@ -1548,14 +1597,20 @@ public:
~resharding_compaction() { }
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_range_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_range_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
sm_fwd,
mr_fwd);
}

View File

@@ -39,6 +39,13 @@ public:
return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
}
const dht::token_range* next_owned_range() const noexcept {
if (_it == _sorted_owned_ranges.end()) {
return nullptr;
}
return &*_it++;
}
static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges);
};

View File

@@ -532,6 +532,7 @@ public:
return make_ready_future<>();
}
if (auto r = next()) {
mrlog.trace("flat_multi_range_mutation_reader {}: fast forwarding to range {}", fmt::ptr(this), *r);
return _reader.fast_forward_to(*r);
} else {
_end_of_stream = true;

View File

@@ -1511,10 +1511,25 @@ private:
}
});
}
bool has_sstable_attached() const noexcept {
return bool(_sst);
}
bool is_initialized() const {
return bool(_context);
}
future<> initialize() {
// Returns true if reader is initialized, by either a previous or current request
future<bool> maybe_initialize() {
if (is_initialized()) {
co_return true;
}
// If the reader has no SSTable attached, the reader was proactively closed in the
// context of fast-forward calls. The higher level code has no way to know that
// underlying reader is really exhausted, so reader is responsible for releasing
// its resources beforehand. From there on, the reader has the same semantics
// as that of an empty reader.
if (!has_sstable_attached()) {
co_return false;
}
if (_single_partition_read) {
_sst->get_stats().on_single_partition_read();
const auto& key = dht::ring_position_view(_pr.start()->value());
@@ -1523,7 +1538,7 @@ private:
if (!present) {
_sst->get_filter_tracker().add_false_positive();
co_return;
co_return false;
}
_sst->get_filter_tracker().add_true_positive();
@@ -1558,12 +1573,7 @@ private:
_monitor.on_read_started(_context->reader_position());
_index_in_current_partition = true;
_will_likely_slice = will_likely_slice(_slice);
}
future<> ensure_initialized() {
if (is_initialized()) {
return make_ready_future<>();
}
return initialize();
co_return true;
}
future<> skip_to(indexable_element el, uint64_t begin) {
sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast<int>(el));
@@ -1591,8 +1601,8 @@ public:
on_internal_error(sstlog, "mx reader: fast_forward_to(partition_range) not supported for reversed queries");
}
return ensure_initialized().then([this, &pr] {
if (!is_initialized()) {
return maybe_initialize().then([this, &pr] (bool initialized) {
if (!initialized) {
_end_of_stream = true;
return make_ready_future<>();
} else {
@@ -1613,6 +1623,15 @@ public:
}
_index_in_current_partition = false;
_read_enabled = false;
if (_index_reader->eof()) {
// Close the SSTable reader proactively, if the index is completely exhausted
// and no partition was found in the current fast-forward call. This allows
// disk space of SSTables to be reclaimed earlier if they take part in a
// long-living read and they're deleted midway.
sstlog.trace("Closing reader {} for {} after fast-forward call found that index reached EOF and there's nothing left to read",
fmt::ptr(this), _sst->get_filename());
return close();
}
return make_ready_future<>();
});
}
@@ -1623,8 +1642,8 @@ public:
return make_ready_future<>();
}
if (!is_initialized()) {
return initialize().then([this] {
if (!is_initialized()) {
return maybe_initialize().then([this] (bool initialized) {
if (!initialized) {
_end_of_stream = true;
return make_ready_future<>();
} else {
@@ -1700,7 +1719,7 @@ public:
close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {});
}
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) {
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([sst = std::move(_sst)] (std::exception_ptr ep) {
// close can not fail as it is called either from the destructor or from flat_mutation_reader::close
sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
});

View File

@@ -1966,6 +1966,8 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
auto total_partitions = 10000U;
auto local_keys = tests::generate_partition_keys(total_partitions, s);
dht::decorated_key::less_comparator cmp(s);
std::sort(local_keys.begin(), local_keys.end(), cmp);
std::vector<mutation> mutations;
for (auto i = 0U; i < total_partitions; i++) {
mutations.push_back(make_insert(local_keys.at(i)));
@@ -1978,7 +1980,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
cf->start();
auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name));
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, compaction_descriptor::default_level,
auto descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges));
auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0();
@@ -1986,6 +1988,25 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
BOOST_REQUIRE(ret.new_sstables.front()->get_estimated_key_count() >= total_partitions);
BOOST_REQUIRE((ret.new_sstables.front()->get_estimated_key_count() - total_partitions) <= uint64_t(s->min_index_interval()));
BOOST_REQUIRE(ret.new_sstables.front()->run_identifier() == run_identifier);
dht::token_range_vector ranges;
ranges.push_back(dht::token_range::make_singular(local_keys.at(0).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(10).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(100).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(900).token()));
local_ranges = compaction::make_owned_ranges_ptr(std::move(ranges));
descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier,
compaction_type_options::make_cleanup(), std::move(local_ranges));
ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0();
BOOST_REQUIRE(ret.new_sstables.size() == 1);
auto reader = ret.new_sstables[0]->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice());
assert_that(std::move(reader))
.produces(local_keys[0])
.produces(local_keys[10])
.produces(local_keys[100])
.produces(local_keys[900])
.produces_end_of_stream();
});
});
}

View File

@@ -1643,6 +1643,19 @@ SEASTAR_TEST_CASE(test_partition_skipping) {
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make({ dht::ring_position(keys[8]), false }, { dht::ring_position(keys[9]), false }))
.produces_end_of_stream();
pr = dht::partition_range::make(dht::ring_position(keys[0]), dht::ring_position(keys[1]));
assert_that(sstable_reader_v2(sst, s, env.make_reader_permit(), pr))
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[0].token()), dht::ring_position::ending_at(keys[1].token())))
.produces(keys[0])
.produces(keys[1])
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[3].token()), dht::ring_position::ending_at(keys[4].token())))
.produces(keys[3])
.produces(keys[4])
.fast_forward_to(dht::partition_range::make_starting_with(dht::ring_position::starting_at(keys[8].token())))
.produces(keys[8])
.produces(keys[9])
.produces_end_of_stream();
}
});
}