From c65ad02fcd898c3d16daeba37d3f3b11c43cd7b7 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 11 Aug 2020 10:10:20 +0800 Subject: [PATCH] repair: Fix stall in apply_rows_on_master_in_thread and apply_rows_on_follower The row_diff list in apply_rows_on_master_in_thread and apply_rows_on_follower can be large. Modify do_apply_rows to remove the row from the list when the row is consumed to avoid stall when the list is destroyed. Fixes #6975 --- repair/row_level.cc | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 371d9839da..46d1fd8f49 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1242,19 +1242,28 @@ private: } } - future<> do_apply_rows(std::list& row_diff, unsigned node_idx, update_working_row_buf update_buf) { - return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] { - _repair_writer.create_writer(_db, node_idx); - return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) { - if (update_buf) { - _working_row_buf_combined_hash.add(r.hash()); - } - // The repair_row here is supposed to have - // mutation_fragment attached because we have stored it in - // to_repair_rows_list above where the repair_row is created. - mutation_fragment mf = std::move(r.get_mutation_fragment()); - auto dk_with_hash = r.get_dk_with_hash(); - return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)); + future<> do_apply_rows(std::list&& row_diff, unsigned node_idx, update_working_row_buf update_buf) { + return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list& row_diff) { + return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] { + _repair_writer.create_writer(_db, node_idx); + return repeat([this, node_idx, update_buf, &row_diff] () mutable { + if (row_diff.empty()) { + return make_ready_future(stop_iteration::yes); + } + repair_row& r = row_diff.front(); + if (update_buf) { + _working_row_buf_combined_hash.add(r.hash()); + } + // The repair_row here is supposed to have + // mutation_fragment attached because we have stored it in + // to_repair_rows_list above where the repair_row is created. + mutation_fragment mf = std::move(r.get_mutation_fragment()); + auto dk_with_hash = r.get_dk_with_hash(); + return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] { + row_diff.pop_front(); + return make_ready_future(stop_iteration::no); + }); + }); }); }); } @@ -1282,7 +1291,7 @@ private: _peer_row_hash_sets[node_idx] = boost::copy_range(row_diff | boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); })); } - do_apply_rows(row_diff, node_idx, update_buf).get(); + do_apply_rows(std::move(row_diff), node_idx, update_buf).get(); } future<> @@ -1291,10 +1300,8 @@ private: return make_ready_future<>(); } return to_repair_rows_list(rows).then([this] (std::list row_diff) { - return do_with(std::move(row_diff), [this] (std::list& row_diff) { - unsigned node_idx = 0; - return do_apply_rows(row_diff, node_idx, update_working_row_buf::no); - }); + unsigned node_idx = 0; + return do_apply_rows(std::move(row_diff), node_idx, update_working_row_buf::no); }); }