diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index a2aa8d9805..3229f47e05 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -94,7 +94,7 @@ class read_context : public reader_lifecycle_policy { struct dismantling_state { foreign_unique_ptr params; foreign_unique_ptr read_operation; - future reader_fut; + foreign_unique_ptr reader; circular_buffer buffer; }; struct ready_to_save_state { @@ -103,52 +103,37 @@ class read_context : public reader_lifecycle_policy { foreign_unique_ptr reader; circular_buffer buffer; }; - struct future_used_state { - future fut; - }; - struct future_dismantling_state { - future fut; - }; - // ( ) - // | - // +------ inexistent_state -----+ - // | | - // (1) | (6) | - // | | - // successful_lookup_state future_used_state - // | | | | - // (2) | (3) | (7) | (8) | - // | | | | - // | used_state <---------+ future_dismantling_state - // | | | - // | (4) | (9) | - // | | | - // | dismantling_state <-----------------+ - // | | - // | (5) | - // | | - // +----> ready_to_save_state + // ( ) + // | + // +--- inexistent_state ---+ + // | | + // (1) | (3) | + // | (4) | + // successful_lookup_state -----> used_state + // | | + // (2) | (5) | + // | | + // | dismantling_state + // | | + // | (6) | + // | | + // +----> ready_to_save_state <----+ // | // (O) // // 1) lookup_readers() // 2) save_readers() - // 3) make_remote_reader() - // 4) dismantle_reader() - // 5) prepare_reader_for_saving() - // 6) do_make_remote_reader() - // 7) reader is created - // 8) dismantle_reader() - // 9) reader is created + // 3) do_make_remote_reader() + // 4) make_remote_reader() + // 5) dismantle_reader() + // 6) save_readers() using reader_state = std::variant< inexistent_state, successful_lookup_state, used_state, dismantling_state, - ready_to_save_state, - future_used_state, - future_dismantling_state>; + ready_to_save_state>; struct dismantle_buffer_stats { size_t partitions = 0; @@ -184,6 +169,8 @@ class read_context : public reader_lifecycle_policy { // One for each shard. Index is shard id. std::vector _readers; + gate _dismantling_gate; + static future do_make_remote_reader( distributed& db, shard_id shard, @@ -204,11 +191,6 @@ class read_context : public reader_lifecycle_policy { void dismantle_reader(shard_id shard, future&& stopped_reader_fut); - ready_to_save_state* prepare_reader_for_saving( - dismantling_state& current_state, - future&& stopped_reader_fut, - const dht::decorated_key& last_pkey, - const std::optional& last_ckey); dismantle_buffer_stats dismantle_combined_buffer(circular_buffer combined_buffer, const dht::decorated_key& pkey); dismantle_buffer_stats dismantle_compaction_state(detached_compaction_state compaction_state); future<> save_reader(ready_to_save_state& current_state, const dht::decorated_key& last_pkey, @@ -302,100 +284,60 @@ future> read_context::make_remote_reade return make_ready_future>(std::move(reader)); } - auto created = promise(); - rs = future_used_state{created.get_future()}; - return do_make_remote_reader(_db, shard, std::move(schema), pr, ps, pc, std::move(trace_state)).then_wrapped([this, &rs, - created = std::move(created)] (future&& bundled_reader_fut) mutable { - if (bundled_reader_fut.failed()) { - auto ex = bundled_reader_fut.get_exception(); - if (!std::holds_alternative(rs)) { - created.set_exception(ex); - } - return make_exception_future>(std::move(ex)); - } - - auto bundled_reader = bundled_reader_fut.get0(); - auto new_state = used_state{std::move(bundled_reader.params), std::move(bundled_reader.read_operation)}; - if (std::holds_alternative(rs)) { - rs = std::move(new_state); - } else { - created.set_value(std::move(new_state)); - } + return do_make_remote_reader(_db, shard, std::move(schema), pr, ps, pc, std::move(trace_state)).then( + [this, &rs] (bundled_remote_reader&& bundled_reader) mutable { + rs = used_state{std::move(bundled_reader.params), std::move(bundled_reader.read_operation)}; return make_ready_future>(std::move(bundled_reader.reader)); }); } -void read_context::dismantle_reader(shard_id shard, future&& stopped_reader_fut) { - auto& rs = _readers[shard]; +void read_context::dismantle_reader(shard_id shard, future&& reader_fut) { + with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { + return reader_fut.then_wrapped([this, shard] (future&& reader_fut) { + if (reader_fut.failed()) { + mmq_log.debug("Failed to stop reader on shard {}: {}", shard, reader_fut.get_exception()); + ++_db.local().get_stats().multishard_query_failed_reader_stops; + return; + } - if (auto* maybe_used_state = std::get_if(&rs)) { - auto read_operation = std::move(maybe_used_state->read_operation); - auto params = std::move(maybe_used_state->params); - rs = dismantling_state{std::move(params), std::move(read_operation), std::move(stopped_reader_fut), circular_buffer{}}; - } else if (auto* maybe_future_used_state = std::get_if(&rs)) { - auto f = maybe_future_used_state->fut.then([stopped_reader_fut = std::move(stopped_reader_fut)] (used_state&& current_state) mutable { - auto read_operation = std::move(current_state.read_operation); - auto params = std::move(current_state.params); - return dismantling_state{std::move(params), std::move(read_operation), std::move(stopped_reader_fut), - circular_buffer{}}; + auto reader = reader_fut.get0(); + auto& rs = _readers[shard]; + if (auto* maybe_used_state = std::get_if(&rs)) { + auto read_operation = std::move(maybe_used_state->read_operation); + auto params = std::move(maybe_used_state->params); + rs = dismantling_state{std::move(params), std::move(read_operation), std::move(reader.remote_reader), + std::move(reader.unconsumed_fragments)}; + } else { + mmq_log.warn( + "Unexpected request to dismantle reader in state {} for shard {}." + " Reader was not created nor is in the process of being created.", + rs.index(), + shard); + } }); - rs = future_dismantling_state{std::move(f)}; - } else { - mmq_log.warn("Unexpected request to dismantle reader for shard {}. Reader was not created nor is in the process of being created.", shard); - } + }); } future<> read_context::stop() { - auto cleanup = [db = &_db.local()] (shard_id shard, dismantling_state state) { - return state.reader_fut.then_wrapped([db, shard, params = std::move(state.params), - read_operation = std::move(state.read_operation)] (future&& fut) mutable { - if (fut.failed()) { - mmq_log.debug("Failed to stop reader on shard {}: {}", shard, fut.get_exception()); - ++db->get_stats().multishard_query_failed_reader_stops; - } else { - smp::submit_to(shard, [reader = fut.get0().remote_reader, params = std::move(params), - read_operation = std::move(read_operation)] () mutable { + auto pr = promise<>(); + auto fut = pr.get_future(); + auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close(); + gate_fut.then([this] { + for (shard_id shard = 0; shard != smp::count; ++shard) { + if (auto* maybe_dismantling_state = std::get_if(&_readers[shard])) { + smp::submit_to(shard, [reader = std::move(maybe_dismantling_state->reader), + params = std::move(maybe_dismantling_state->params), + read_operation = std::move(maybe_dismantling_state->read_operation)] () mutable { reader.release(); params.release(); read_operation.release(); }); } - }); - }; - - std::vector> futures; - auto immediate_cleanup = size_t(0); - auto future_cleanup = size_t(0); - - // Wait for pending read-aheads in the background. - for (shard_id shard = 0; shard != smp::count; ++shard) { - auto& rs = _readers[shard]; - - if (auto maybe_dismantling_state = std::get_if(&rs)) { - ++immediate_cleanup; - cleanup(shard, std::move(*maybe_dismantling_state)); - } else if (auto maybe_future_dismantling_state = std::get_if(&rs)) { - ++future_cleanup; - futures.emplace_back(maybe_future_dismantling_state->fut.then_wrapped([=] (future&& current_state_fut) { - if (current_state_fut.failed()) { - mmq_log.debug("Failed to stop reader on shard {}: {}", shard, current_state_fut.get_exception()); - ++_db.local().get_stats().multishard_query_failed_reader_stops; - } else { - cleanup(shard, current_state_fut.get0()); - } - })); } - } - - if (const auto total = immediate_cleanup + future_cleanup) { - tracing::trace(_trace_state, - "Stopping {} shard readers, {} ready for immediate cleanup, {} will be cleaned up after finishes read-ahead", - total, - immediate_cleanup, - future_cleanup); - } - - return when_all(futures.begin(), futures.end()).discard_result(); + }).finally([pr = std::move(pr)] () mutable { + pr.set_value(); + }); + return fut; } read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(circular_buffer combined_buffer, @@ -455,40 +397,6 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de return stats; } -read_context::ready_to_save_state* read_context::prepare_reader_for_saving( - dismantling_state& current_state, - future&& stopped_reader_fut, - const dht::decorated_key& last_pkey, - const std::optional& last_ckey) { - const auto shard = current_state.params.get_owner_shard(); - auto& rs = _readers[shard]; - - if (stopped_reader_fut.failed()) { - mmq_log.debug("Failed to stop reader on shard {}: {}", shard, stopped_reader_fut.get_exception()); - ++_db.local().get_stats().multishard_query_failed_reader_stops; - // We don't want to leave the reader in dismantling state, lest stop() - // will try to wait on the reader_fut again and crash the application. - rs = {}; - return nullptr; - } - - auto stopped_reader = stopped_reader_fut.get0(); - - // If the buffer is empty just overwrite it. - // If it has some data in it append the fragments to the back. - // The unconsumed fragments appended here come from the - // foreign_reader which is at the lowest layer, hence its - // fragments need to be at the back of the buffer. - if (current_state.buffer.empty()) { - current_state.buffer = std::move(stopped_reader.unconsumed_fragments); - } else { - std::move(stopped_reader.unconsumed_fragments.begin(), stopped_reader.unconsumed_fragments.end(), std::back_inserter(current_state.buffer)); - } - rs = ready_to_save_state{std::move(current_state.params), std::move(current_state.read_operation), std::move(stopped_reader.remote_reader), - std::move(current_state.buffer)}; - return &std::get(rs); -} - future<> read_context::save_reader(ready_to_save_state& current_state, const dht::decorated_key& last_pkey, const std::optional& last_ckey) { const auto shard = current_state.reader.get_owner_shard(); @@ -573,6 +481,8 @@ future<> read_context::save_readers(circular_buffer unconsume return make_ready_future<>(); } + return _dismantling_gate.close().then([this, unconsumed_buffer = std::move(unconsumed_buffer), compaction_state = std::move(compaction_state), + last_ckey = std::move(last_ckey)] () mutable { auto last_pkey = compaction_state.partition_start.key(); const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); @@ -593,39 +503,17 @@ future<> read_context::save_readers(circular_buffer unconsume return save_reader(std::get(rs), last_pkey, last_ckey); } - auto finish_saving = [this, &last_pkey, &last_ckey] (dismantling_state& current_state) { - return current_state.reader_fut.then_wrapped([this, ¤t_state, &last_pkey, &last_ckey] ( - future&& stopped_reader_fut) mutable { - if (auto* ready_state = prepare_reader_for_saving(current_state, std::move(stopped_reader_fut), last_pkey, last_ckey)) { - return save_reader(*ready_state, last_pkey, last_ckey); - } - return make_ready_future<>(); - }); - }; - if (auto* maybe_dismantling_state = std::get_if(&rs)) { - return finish_saving(*maybe_dismantling_state); - } - - if (auto* maybe_future_dismantling_state = std::get_if(&rs)) { - return maybe_future_dismantling_state->fut.then_wrapped([this, &rs, - finish_saving = std::move(finish_saving)] (future&& next_state_fut) mutable { - if (next_state_fut.failed()) { - mmq_log.debug("Failed to stop reader: {}", next_state_fut.get_exception()); - ++_db.local().get_stats().multishard_query_failed_reader_stops; - // We don't want to leave the reader in future dismantling state, lest - // stop() will try to wait on the fut again and crash the application. - rs = {}; - return make_ready_future<>(); - } - rs = next_state_fut.get0(); - return finish_saving(std::get(rs)); - }); + auto& current_state = *maybe_dismantling_state; + rs = ready_to_save_state{std::move(current_state.params), std::move(current_state.read_operation), + std::move(current_state.reader), std::move(current_state.buffer)}; + return save_reader(std::get(rs), last_pkey, last_ckey); } return make_ready_future<>(); }); }); + }); } static future do_query_mutations(