size_estimates_reader: close partition_reader

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-01-26 22:08:43 +02:00
parent 13dfc41d8c
commit 38e48bb462
2 changed files with 19 additions and 5 deletions

View File

@@ -255,10 +255,18 @@ future<> size_estimates_mutation_reader::get_next_partition() {
++_current_partition;
std::vector<mutation> ms;
ms.emplace_back(std::move(mutations));
_partition_reader = flat_mutation_reader_from_mutations(_permit, std::move(ms), _fwd);
auto reader = flat_mutation_reader_from_mutations(_permit, std::move(ms), _fwd);
auto close_partition_reader = _partition_reader ? _partition_reader->close() : make_ready_future<>();
return close_partition_reader.then([this, reader = std::move(reader)] () mutable {
_partition_reader = std::move(reader);
});
});
}
future<> size_estimates_mutation_reader::close_partition_reader() noexcept {
return _partition_reader ? _partition_reader->close() : make_ready_future<>();
}
future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (!_partition_reader) {
@@ -269,8 +277,9 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi
return stop_iteration(is_buffer_full());
}, timeout).then([this] {
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
_partition_reader = std::nullopt;
return _partition_reader->close();
}
return make_ready_future<>();
});
});
}
@@ -278,7 +287,7 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi
future<> size_estimates_mutation_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_partition_reader = std::nullopt;
return close_partition_reader();
}
return make_ready_future<>();
}
@@ -287,9 +296,8 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra
clear_buffer();
_prange = &pr;
_keyspaces = std::nullopt;
_partition_reader = std::nullopt;
_end_of_stream = false;
return make_ready_future<>();
return close_partition_reader();
}
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
@@ -301,6 +309,10 @@ future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::
return make_ready_future<>();
}
future<> size_estimates_mutation_reader::close() noexcept {
return close_partition_reader();
}
std::vector<db::system_keyspace::range_estimates>
size_estimates_mutation_reader::estimates_for_current_keyspace(std::vector<token_range> local_ranges) const {
// For each specified range, estimate (crudely) mean partition size and partitions count.

View File

@@ -51,8 +51,10 @@ public:
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
virtual future<> close() noexcept override;
private:
future<> get_next_partition();
future<> close_partition_reader() noexcept;
std::vector<db::system_keyspace::range_estimates>
estimates_for_current_keyspace(std::vector<token_range> local_ranges) const;