diff --git a/compaction/compaction.cc b/compaction/compaction.cc index fce3fe1e5d..85d0c1f37c 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -50,7 +50,7 @@ #include "utils/utf8.hh" #include "utils/fmt-compat.hh" #include "utils/error_injection.hh" -#include "readers/filtering.hh" +#include "readers/multi_range.hh" #include "readers/compacting.hh" #include "tombstone_gc.hh" #include "keys.hh" @@ -610,16 +610,6 @@ protected: bool enable_garbage_collected_sstable_writer() const noexcept { return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits::max(); } - - flat_mutation_reader_v2::filter make_partition_filter() const { - return [this] (const dht::decorated_key& dk) { - if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) { - log_trace("Token {} does not belong to this node, skipping", dk.token()); - return false; - } - return true; - }; - } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -631,17 +621,55 @@ public: } private: // Default range sstable reader that will only return mutation that belongs to current shard. - virtual flat_mutation_reader_v2 make_sstable_reader() const = 0; + virtual flat_mutation_reader_v2 make_sstable_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding) const = 0; - // Make a filtering reader if needed - // FIXME: the sstable reader itself should be pass the owned ranges - // so it can skip over the disowned ranges efficiently using the index. - // Ref https://github.com/scylladb/scylladb/issues/12998 flat_mutation_reader_v2 setup_sstable_reader() const { if (!_owned_ranges_checker) { - return make_sstable_reader(); + return make_sstable_reader(_schema, + _permit, + query::full_partition_range, + _schema->full_slice(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no); } - return make_filtering_reader(make_sstable_reader(), make_partition_filter()); + + auto source = mutation_source([this] (schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + log_trace("Creating sstable set reader with range {}", range); + return make_sstable_reader(std::move(s), + std::move(permit), + range, + slice, + std::move(trace_state), + fwd, + fwd_mr); + }); + + auto owned_range_generator = [this] () -> std::optional { + auto r = _owned_ranges_checker->next_owned_range(); + if (r == nullptr) { + return std::nullopt; + } + log_trace("Skipping to the next owned range {}", *r); + return dht::to_partition_range(*r); + }; + + return make_flat_multi_range_reader(_schema, _permit, std::move(source), + std::move(owned_range_generator), + _schema->full_slice(), + tracing::trace_state_ptr()); } virtual sstables::sstable_set make_sstable_set_for_input() const { @@ -1019,14 +1047,20 @@ public: return sstables::make_partitioned_sstable_set(_schema, false); } - flat_mutation_reader_v2 make_sstable_reader() const override { - return _compacting->make_local_shard_sstable_reader(_schema, - _permit, - query::full_partition_range, - _schema->full_slice(), - tracing::trace_state_ptr(), - ::streamed_mutation::forwarding::no, - ::mutation_reader::forwarding::no, + flat_mutation_reader_v2 make_sstable_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace, + streamed_mutation::forwarding sm_fwd, + mutation_reader::forwarding mr_fwd) const override { + return _compacting->make_local_shard_sstable_reader(std::move(s), + std::move(permit), + range, + slice, + std::move(trace), + sm_fwd, + mr_fwd, default_read_monitor_generator()); } @@ -1064,14 +1098,20 @@ public: { } - flat_mutation_reader_v2 make_sstable_reader() const override { - return _compacting->make_local_shard_sstable_reader(_schema, - _permit, - query::full_partition_range, - _schema->full_slice(), - tracing::trace_state_ptr(), - ::streamed_mutation::forwarding::no, - ::mutation_reader::forwarding::no, + flat_mutation_reader_v2 make_sstable_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace, + streamed_mutation::forwarding sm_fwd, + mutation_reader::forwarding mr_fwd) const override { + return _compacting->make_local_shard_sstable_reader(std::move(s), + std::move(permit), + range, + slice, + std::move(trace), + sm_fwd, + mr_fwd, _monitor_generator); } @@ -1456,8 +1496,17 @@ public: return _scrub_finish_description; } - flat_mutation_reader_v2 make_sstable_reader() const override { - auto crawling_reader = _compacting->make_crawling_reader(_schema, _permit, nullptr); + flat_mutation_reader_v2 make_sstable_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace, + streamed_mutation::forwarding sm_fwd, + mutation_reader::forwarding mr_fwd) const override { + if (!range.is_full()) { + on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range)); + } + auto crawling_reader = _compacting->make_crawling_reader(std::move(s), std::move(permit), nullptr); return make_flat_mutation_reader_v2(std::move(crawling_reader), _options.operation_mode, _validation_errors); } @@ -1548,14 +1597,20 @@ public: ~resharding_compaction() { } // Use reader that makes sure no non-local mutation will not be filtered out. - flat_mutation_reader_v2 make_sstable_reader() const override { - return _compacting->make_range_sstable_reader(_schema, - _permit, - query::full_partition_range, - _schema->full_slice(), + flat_mutation_reader_v2 make_sstable_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace, + streamed_mutation::forwarding sm_fwd, + mutation_reader::forwarding mr_fwd) const override { + return _compacting->make_range_sstable_reader(std::move(s), + std::move(permit), + range, + slice, nullptr, - ::streamed_mutation::forwarding::no, - ::mutation_reader::forwarding::no); + sm_fwd, + mr_fwd); } diff --git a/dht/partition_filter.hh b/dht/partition_filter.hh index 8557a39963..fc4cc0bfda 100644 --- a/dht/partition_filter.hh +++ b/dht/partition_filter.hh @@ -39,6 +39,13 @@ public: return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator()); } + const dht::token_range* next_owned_range() const noexcept { + if (_it == _sorted_owned_ranges.end()) { + return nullptr; + } + return &*_it++; + } + static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges); }; diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index d109902356..5f6d3c8680 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -532,6 +532,7 @@ public: return make_ready_future<>(); } if (auto r = next()) { + mrlog.trace("flat_multi_range_mutation_reader {}: fast forwarding to range {}", fmt::ptr(this), *r); return _reader.fast_forward_to(*r); } else { _end_of_stream = true; diff --git a/sstables/mx/reader.cc b/sstables/mx/reader.cc index 5bb8b8eb71..88595172b2 100644 --- a/sstables/mx/reader.cc +++ b/sstables/mx/reader.cc @@ -1511,10 +1511,25 @@ private: } }); } + bool has_sstable_attached() const noexcept { + return bool(_sst); + } bool is_initialized() const { return bool(_context); } - future<> initialize() { + // Returns true if reader is initialized, by either a previous or current request + future maybe_initialize() { + if (is_initialized()) { + co_return true; + } + // If the reader has no SSTable attached, the reader was proactively closed in the + // context of fast-forward calls. The higher level code has no way to know that + // underlying reader is really exhausted, so reader is responsible for releasing + // its resources beforehand. From there on, the reader has the same semantics + // as that of an empty reader. + if (!has_sstable_attached()) { + co_return false; + } if (_single_partition_read) { _sst->get_stats().on_single_partition_read(); const auto& key = dht::ring_position_view(_pr.start()->value()); @@ -1523,7 +1538,7 @@ private: if (!present) { _sst->get_filter_tracker().add_false_positive(); - co_return; + co_return false; } _sst->get_filter_tracker().add_true_positive(); @@ -1558,12 +1573,7 @@ private: _monitor.on_read_started(_context->reader_position()); _index_in_current_partition = true; _will_likely_slice = will_likely_slice(_slice); - } - future<> ensure_initialized() { - if (is_initialized()) { - return make_ready_future<>(); - } - return initialize(); + co_return true; } future<> skip_to(indexable_element el, uint64_t begin) { sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast(el)); @@ -1591,8 +1601,8 @@ public: on_internal_error(sstlog, "mx reader: fast_forward_to(partition_range) not supported for reversed queries"); } - return ensure_initialized().then([this, &pr] { - if (!is_initialized()) { + return maybe_initialize().then([this, &pr] (bool initialized) { + if (!initialized) { _end_of_stream = true; return make_ready_future<>(); } else { @@ -1613,6 +1623,15 @@ public: } _index_in_current_partition = false; _read_enabled = false; + if (_index_reader->eof()) { + // Close the SSTable reader proactively, if the index is completely exhausted + // and no partition was found in the current fast-forward call. This allows + // disk space of SSTables to be reclaimed earlier if they take part in a + // long-living read and they're deleted midway. + sstlog.trace("Closing reader {} for {} after fast-forward call found that index reached EOF and there's nothing left to read", + fmt::ptr(this), _sst->get_filename()); + return close(); + } return make_ready_future<>(); }); } @@ -1623,8 +1642,8 @@ public: return make_ready_future<>(); } if (!is_initialized()) { - return initialize().then([this] { - if (!is_initialized()) { + return maybe_initialize().then([this] (bool initialized) { + if (!initialized) { _end_of_stream = true; return make_ready_future<>(); } else { @@ -1700,7 +1719,7 @@ public: close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {}); } - return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) { + return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([sst = std::move(_sst)] (std::exception_ptr ep) { // close can not fail as it is called either from the destructor or from flat_mutation_reader::close sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep); }); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index cfae56cf31..33dc8fa40a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -1966,6 +1966,8 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { auto total_partitions = 10000U; auto local_keys = tests::generate_partition_keys(total_partitions, s); + dht::decorated_key::less_comparator cmp(s); + std::sort(local_keys.begin(), local_keys.end(), cmp); std::vector mutations; for (auto i = 0U; i < total_partitions; i++) { mutations.push_back(make_insert(local_keys.at(i))); @@ -1978,7 +1980,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { cf->start(); auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name)); - auto descriptor = sstables::compaction_descriptor({std::move(sst)}, compaction_descriptor::default_level, + auto descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level, compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges)); auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0(); @@ -1986,6 +1988,25 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { BOOST_REQUIRE(ret.new_sstables.front()->get_estimated_key_count() >= total_partitions); BOOST_REQUIRE((ret.new_sstables.front()->get_estimated_key_count() - total_partitions) <= uint64_t(s->min_index_interval())); BOOST_REQUIRE(ret.new_sstables.front()->run_identifier() == run_identifier); + + dht::token_range_vector ranges; + ranges.push_back(dht::token_range::make_singular(local_keys.at(0).token())); + ranges.push_back(dht::token_range::make_singular(local_keys.at(10).token())); + ranges.push_back(dht::token_range::make_singular(local_keys.at(100).token())); + ranges.push_back(dht::token_range::make_singular(local_keys.at(900).token())); + local_ranges = compaction::make_owned_ranges_ptr(std::move(ranges)); + descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level, + compaction_descriptor::default_max_sstable_bytes, run_identifier, + compaction_type_options::make_cleanup(), std::move(local_ranges)); + ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0(); + BOOST_REQUIRE(ret.new_sstables.size() == 1); + auto reader = ret.new_sstables[0]->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice()); + assert_that(std::move(reader)) + .produces(local_keys[0]) + .produces(local_keys[10]) + .produces(local_keys[100]) + .produces(local_keys[900]) + .produces_end_of_stream(); }); }); } diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 81ae7928f9..a96159ff2c 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -1643,6 +1643,19 @@ SEASTAR_TEST_CASE(test_partition_skipping) { .produces_end_of_stream() .fast_forward_to(dht::partition_range::make({ dht::ring_position(keys[8]), false }, { dht::ring_position(keys[9]), false })) .produces_end_of_stream(); + + pr = dht::partition_range::make(dht::ring_position(keys[0]), dht::ring_position(keys[1])); + assert_that(sstable_reader_v2(sst, s, env.make_reader_permit(), pr)) + .fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[0].token()), dht::ring_position::ending_at(keys[1].token()))) + .produces(keys[0]) + .produces(keys[1]) + .fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[3].token()), dht::ring_position::ending_at(keys[4].token()))) + .produces(keys[3]) + .produces(keys[4]) + .fast_forward_to(dht::partition_range::make_starting_with(dht::ring_position::starting_at(keys[8].token()))) + .produces(keys[8]) + .produces(keys[9]) + .produces_end_of_stream(); } }); }