diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc index 0d4d78c1ed..aa73548630 100644 --- a/db/size_estimates_virtual_reader.cc +++ b/db/size_estimates_virtual_reader.cc @@ -255,10 +255,18 @@ future<> size_estimates_mutation_reader::get_next_partition() { ++_current_partition; std::vector ms; ms.emplace_back(std::move(mutations)); - _partition_reader = flat_mutation_reader_from_mutations(_permit, std::move(ms), _fwd); + auto reader = flat_mutation_reader_from_mutations(_permit, std::move(ms), _fwd); + auto close_partition_reader = _partition_reader ? _partition_reader->close() : make_ready_future<>(); + return close_partition_reader.then([this, reader = std::move(reader)] () mutable { + _partition_reader = std::move(reader); + }); }); } +future<> size_estimates_mutation_reader::close_partition_reader() noexcept { + return _partition_reader ? _partition_reader->close() : make_ready_future<>(); +} + future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) { return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] { if (!_partition_reader) { @@ -269,8 +277,9 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi return stop_iteration(is_buffer_full()); }, timeout).then([this] { if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) { - _partition_reader = std::nullopt; + return _partition_reader->close(); } + return make_ready_future<>(); }); }); } @@ -278,7 +287,7 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi future<> size_estimates_mutation_reader::next_partition() { clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _partition_reader = std::nullopt; + return close_partition_reader(); } return make_ready_future<>(); } @@ -287,9 +296,8 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra clear_buffer(); _prange = ≺ _keyspaces = std::nullopt; - _partition_reader = std::nullopt; _end_of_stream = false; - return make_ready_future<>(); + return close_partition_reader(); } future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) { @@ -301,6 +309,10 @@ future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db:: return make_ready_future<>(); } +future<> size_estimates_mutation_reader::close() noexcept { + return close_partition_reader(); +} + std::vector size_estimates_mutation_reader::estimates_for_current_keyspace(std::vector local_ranges) const { // For each specified range, estimate (crudely) mean partition size and partitions count. diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh index 0656f7934b..6a7be3ef9b 100644 --- a/db/size_estimates_virtual_reader.hh +++ b/db/size_estimates_virtual_reader.hh @@ -51,8 +51,10 @@ public: virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override; virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override; + virtual future<> close() noexcept override; private: future<> get_next_partition(); + future<> close_partition_reader() noexcept; std::vector estimates_for_current_keyspace(std::vector local_ranges) const;