diff --git a/repair/row_level.cc b/repair/row_level.cc index d986fd51a8..b135ed7cfa 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -452,6 +452,7 @@ class repair_writer { // written. std::vector _partition_opened; streaming::stream_reason _reason; + named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}}; public: repair_writer( schema_ptr schema, @@ -563,11 +564,13 @@ public: future<> write_end_of_stream(unsigned node_idx) { if (_mq[node_idx]) { + return with_semaphore(_sem, 1, [this, node_idx] { // Partition_end is never sent on wire, so we have to write one ourselves. return write_partition_end(node_idx).then([this, node_idx] () mutable { // Empty mutation_fragment_opt means no more data, so the writer can seal the sstables. return _mq[node_idx]->push_eventually(mutation_fragment_opt()); }); + }); } else { return make_ready_future<>(); } @@ -590,6 +593,10 @@ public: return make_exception_future<>(std::move(ep)); }); } + + named_semaphore& sem() { + return _sem; + } }; class repair_meta { @@ -1195,6 +1202,23 @@ private: } } + future<> do_apply_rows(std::list& row_diff, unsigned node_idx, update_working_row_buf update_buf) { + return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] { + _repair_writer.create_writer(_db, node_idx); + return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) { + if (update_buf) { + _working_row_buf_combined_hash.add(r.hash()); + } + // The repair_row here is supposed to have + // mutation_fragment attached because we have stored it in + // to_repair_rows_list above where the repair_row is created. + mutation_fragment mf = std::move(r.get_mutation_fragment()); + auto dk_with_hash = r.get_dk_with_hash(); + return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)); + }); + }); + } + // Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested // Must run inside a seastar thread void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf, @@ -1220,18 +1244,7 @@ private: _peer_row_hash_sets[node_idx] = boost::copy_range>(row_diff | boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); })); } - _repair_writer.create_writer(_db, node_idx); - for (auto& r : row_diff) { - if (update_buf) { - _working_row_buf_combined_hash.add(r.hash()); - } - // The repair_row here is supposed to have - // mutation_fragment attached because we have stored it in - // to_repair_rows_list above where the repair_row is created. - mutation_fragment mf = std::move(r.get_mutation_fragment()); - auto dk_with_hash = r.get_dk_with_hash(); - _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get(); - } + do_apply_rows(row_diff, node_idx, update_buf).get(); } future<> @@ -1242,15 +1255,7 @@ private: return to_repair_rows_list(rows).then([this] (std::list row_diff) { return do_with(std::move(row_diff), [this] (std::list& row_diff) { unsigned node_idx = 0; - _repair_writer.create_writer(_db, node_idx); - return do_for_each(row_diff, [this, node_idx] (repair_row& r) { - // The repair_row here is supposed to have - // mutation_fragment attached because we have stored it in - // to_repair_rows_list above where the repair_row is created. - mutation_fragment mf = std::move(r.get_mutation_fragment()); - auto dk_with_hash = r.get_dk_with_hash(); - return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)); - }); + return do_apply_rows(row_diff, node_idx, update_working_row_buf::no); }); }); }