table: try_flush_memtable_to_sstable: consume: close reader on error

If an exception is throws in `consume` before
write_memtable_to_sstable is called or if the latter fails,
we must close the reader passed to it.

Fixes #11075

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-07-18 14:38:25 +03:00
parent d40106d3a9
commit f60ff44fdf

View File

@@ -654,6 +654,8 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count());
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader_v2 reader) mutable -> future<> {
std::exception_ptr ex;
try {
auto&& priority = service::get_local_memtable_flush_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
cfg.backup = incremental_backups_enabled();
@@ -666,6 +668,11 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
old->get_max_timestamp());
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg, priority);
} catch (...) {
ex = std::current_exception();
}
co_await reader.close();
co_await coroutine::return_exception_ptr(std::move(ex));
});
auto reader = old->make_flush_reader(