diff --git a/repair/row_level.cc b/repair/row_level.cc index 2d26c27b60..44f831f487 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -499,6 +499,12 @@ public: }); } + future<> close() noexcept { + return _reader.close().then([this] { + _reader_handle.reset(); + }); + } + lw_shared_ptr& get_current_dk() { return _current_dk; } @@ -873,8 +879,11 @@ public: auto f1 = _sink_source_for_get_full_row_hashes.close(); auto f2 = _sink_source_for_get_row_diff.close(); auto f3 = _sink_source_for_put_row_diff.close(); + rlogger.info("repair_meta::stop"); return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] { - return _repair_writer->wait_for_writer_done(); + return _repair_writer->wait_for_writer_done().finally([this] { + return close(); + }); }); } @@ -1066,6 +1075,10 @@ public: return std::pair, bool>(sync_boundary_min, already_synced); } + future<> close() noexcept { + return _repair_reader.close(); + } + private: future do_estimate_partitions_on_all_shards() { return estimate_partitions(_db, _schema->ks_name(), _schema->cf_name(), _range); @@ -2817,6 +2830,11 @@ public: _all_live_peer_nodes, _all_live_peer_nodes.size(), this); + auto auto_close_master = defer([&master] { + master.close().handle_exception([] (std::exception_ptr ep) { + rlogger.warn("Failed auto-closing Row Level Repair (Master): {}. Ignored.", ep); + }).get(); + }); rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}", master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size);