diff --git a/repair/row_level.cc b/repair/row_level.cc index d979e51758..2d26c27b60 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -492,9 +492,11 @@ public: }); } - void on_end_of_stream() { + future<> on_end_of_stream() noexcept { + return _reader.close().then([this] { _reader = make_empty_flat_reader(_schema, _permit); _reader_handle.reset(); + }); } lw_shared_ptr& get_current_dk() { @@ -1192,15 +1194,17 @@ private: _gate.check(); return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable { if (!mfopt) { - _repair_reader.on_end_of_stream(); + return _repair_reader.on_end_of_stream().then([] { return stop_iteration::yes; + }); } - return handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows); + return make_ready_future(handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows)); }); }).then_wrapped([this, &cur_rows, &new_rows_size] (future<> fut) mutable { if (fut.failed()) { - _repair_reader.on_end_of_stream(); - return make_exception_future(fut.get_exception()); + return make_exception_future(fut.get_exception()).finally([this] { + return _repair_reader.on_end_of_stream(); + }); } _repair_reader.pause(); return make_ready_future(value_type(std::move(cur_rows), new_rows_size));