query_mutations_on_all_shards(): simplify the state-machine

The `read_context` which handles creating, saving and looking-up the
shard readers has to deal with its `destroy_reader()` method called any
time, even before some other method finished its work. For example it is
valid for a reader to be requested to be destroyed, even before the
contexts finishes creating it.
This means that state transitions that take time can be interleaved with
another state transition request. To deal with this the read context
uses `future_` states, states that mark an ongoing state transitions.
This allows for state transition request that arrive in the middle of
another state transition to be attached as a continuation to the ongoing
transition, and to be executed after that finishes. This however
resulted in complex code, that has to handle readers being in all sorts
of different states, when the `save_readers()` method is called.
To avoid all this complexity, exploit the fact that `destroy_reader()`
receives a future<> as its argument, which resolves when all previous
state transitions have finished. Use a gate to wait on all these futures
to resolve. This way we don't need all those transitional states,
instead in `save_readers()` we only need to wait on the gate to close.
Thus the number of states `save_readers()` has to consider drops
drastically.

This has the theoretical drawback of the process of saving the readers
having to wait on each of the readers to stop, but in practice the
process finishes when the last reader is saved anyway, so I don't expect
this to result in any slowdown.
This commit is contained in:
Botond Dénes
2018-10-24 16:18:25 +03:00
parent 007619de4c
commit aa6083a75b

View File

@@ -94,7 +94,7 @@ class read_context : public reader_lifecycle_policy {
struct dismantling_state {
foreign_unique_ptr<reader_params> params;
foreign_unique_ptr<utils::phased_barrier::operation> read_operation;
future<stopped_reader> reader_fut;
foreign_unique_ptr<flat_mutation_reader> reader;
circular_buffer<mutation_fragment> buffer;
};
struct ready_to_save_state {
@@ -103,52 +103,37 @@ class read_context : public reader_lifecycle_policy {
foreign_unique_ptr<flat_mutation_reader> reader;
circular_buffer<mutation_fragment> buffer;
};
struct future_used_state {
future<used_state> fut;
};
struct future_dismantling_state {
future<dismantling_state> fut;
};
// ( )
// |
// +------ inexistent_state -----+
// | |
// (1) | (6) |
// | |
// successful_lookup_state future_used_state
// | | | |
// (2) | (3) | (7) | (8) |
// | | | |
// | used_state <---------+ future_dismantling_state
// | | |
// | (4) | (9) |
// | | |
// | dismantling_state <-----------------+
// | |
// | (5) |
// | |
// +----> ready_to_save_state
// ( )
// |
// +--- inexistent_state ---+
// | |
// (1) | (3) |
// | (4) |
// successful_lookup_state -----> used_state
// | |
// (2) | (5) |
// | |
// | dismantling_state
// | |
// | (6) |
// | |
// +----> ready_to_save_state <----+
// |
// (O)
//
// 1) lookup_readers()
// 2) save_readers()
// 3) make_remote_reader()
// 4) dismantle_reader()
// 5) prepare_reader_for_saving()
// 6) do_make_remote_reader()
// 7) reader is created
// 8) dismantle_reader()
// 9) reader is created
// 3) do_make_remote_reader()
// 4) make_remote_reader()
// 5) dismantle_reader()
// 6) save_readers()
using reader_state = std::variant<
inexistent_state,
successful_lookup_state,
used_state,
dismantling_state,
ready_to_save_state,
future_used_state,
future_dismantling_state>;
ready_to_save_state>;
struct dismantle_buffer_stats {
size_t partitions = 0;
@@ -184,6 +169,8 @@ class read_context : public reader_lifecycle_policy {
// One for each shard. Index is shard id.
std::vector<reader_state> _readers;
gate _dismantling_gate;
static future<bundled_remote_reader> do_make_remote_reader(
distributed<database>& db,
shard_id shard,
@@ -204,11 +191,6 @@ class read_context : public reader_lifecycle_policy {
void dismantle_reader(shard_id shard, future<stopped_reader>&& stopped_reader_fut);
ready_to_save_state* prepare_reader_for_saving(
dismantling_state& current_state,
future<stopped_reader>&& stopped_reader_fut,
const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey);
dismantle_buffer_stats dismantle_combined_buffer(circular_buffer<mutation_fragment> combined_buffer, const dht::decorated_key& pkey);
dismantle_buffer_stats dismantle_compaction_state(detached_compaction_state compaction_state);
future<> save_reader(ready_to_save_state& current_state, const dht::decorated_key& last_pkey,
@@ -302,100 +284,60 @@ future<foreign_unique_ptr<flat_mutation_reader>> read_context::make_remote_reade
return make_ready_future<foreign_unique_ptr<flat_mutation_reader>>(std::move(reader));
}
auto created = promise<used_state>();
rs = future_used_state{created.get_future()};
return do_make_remote_reader(_db, shard, std::move(schema), pr, ps, pc, std::move(trace_state)).then_wrapped([this, &rs,
created = std::move(created)] (future<bundled_remote_reader>&& bundled_reader_fut) mutable {
if (bundled_reader_fut.failed()) {
auto ex = bundled_reader_fut.get_exception();
if (!std::holds_alternative<future_used_state>(rs)) {
created.set_exception(ex);
}
return make_exception_future<foreign_unique_ptr<flat_mutation_reader>>(std::move(ex));
}
auto bundled_reader = bundled_reader_fut.get0();
auto new_state = used_state{std::move(bundled_reader.params), std::move(bundled_reader.read_operation)};
if (std::holds_alternative<future_used_state>(rs)) {
rs = std::move(new_state);
} else {
created.set_value(std::move(new_state));
}
return do_make_remote_reader(_db, shard, std::move(schema), pr, ps, pc, std::move(trace_state)).then(
[this, &rs] (bundled_remote_reader&& bundled_reader) mutable {
rs = used_state{std::move(bundled_reader.params), std::move(bundled_reader.read_operation)};
return make_ready_future<foreign_unique_ptr<flat_mutation_reader>>(std::move(bundled_reader.reader));
});
}
void read_context::dismantle_reader(shard_id shard, future<stopped_reader>&& stopped_reader_fut) {
auto& rs = _readers[shard];
void read_context::dismantle_reader(shard_id shard, future<stopped_reader>&& reader_fut) {
with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable {
return reader_fut.then_wrapped([this, shard] (future<stopped_reader>&& reader_fut) {
if (reader_fut.failed()) {
mmq_log.debug("Failed to stop reader on shard {}: {}", shard, reader_fut.get_exception());
++_db.local().get_stats().multishard_query_failed_reader_stops;
return;
}
if (auto* maybe_used_state = std::get_if<used_state>(&rs)) {
auto read_operation = std::move(maybe_used_state->read_operation);
auto params = std::move(maybe_used_state->params);
rs = dismantling_state{std::move(params), std::move(read_operation), std::move(stopped_reader_fut), circular_buffer<mutation_fragment>{}};
} else if (auto* maybe_future_used_state = std::get_if<future_used_state>(&rs)) {
auto f = maybe_future_used_state->fut.then([stopped_reader_fut = std::move(stopped_reader_fut)] (used_state&& current_state) mutable {
auto read_operation = std::move(current_state.read_operation);
auto params = std::move(current_state.params);
return dismantling_state{std::move(params), std::move(read_operation), std::move(stopped_reader_fut),
circular_buffer<mutation_fragment>{}};
auto reader = reader_fut.get0();
auto& rs = _readers[shard];
if (auto* maybe_used_state = std::get_if<used_state>(&rs)) {
auto read_operation = std::move(maybe_used_state->read_operation);
auto params = std::move(maybe_used_state->params);
rs = dismantling_state{std::move(params), std::move(read_operation), std::move(reader.remote_reader),
std::move(reader.unconsumed_fragments)};
} else {
mmq_log.warn(
"Unexpected request to dismantle reader in state {} for shard {}."
" Reader was not created nor is in the process of being created.",
rs.index(),
shard);
}
});
rs = future_dismantling_state{std::move(f)};
} else {
mmq_log.warn("Unexpected request to dismantle reader for shard {}. Reader was not created nor is in the process of being created.", shard);
}
});
}
future<> read_context::stop() {
auto cleanup = [db = &_db.local()] (shard_id shard, dismantling_state state) {
return state.reader_fut.then_wrapped([db, shard, params = std::move(state.params),
read_operation = std::move(state.read_operation)] (future<stopped_reader>&& fut) mutable {
if (fut.failed()) {
mmq_log.debug("Failed to stop reader on shard {}: {}", shard, fut.get_exception());
++db->get_stats().multishard_query_failed_reader_stops;
} else {
smp::submit_to(shard, [reader = fut.get0().remote_reader, params = std::move(params),
read_operation = std::move(read_operation)] () mutable {
auto pr = promise<>();
auto fut = pr.get_future();
auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close();
gate_fut.then([this] {
for (shard_id shard = 0; shard != smp::count; ++shard) {
if (auto* maybe_dismantling_state = std::get_if<dismantling_state>(&_readers[shard])) {
smp::submit_to(shard, [reader = std::move(maybe_dismantling_state->reader),
params = std::move(maybe_dismantling_state->params),
read_operation = std::move(maybe_dismantling_state->read_operation)] () mutable {
reader.release();
params.release();
read_operation.release();
});
}
});
};
std::vector<future<>> futures;
auto immediate_cleanup = size_t(0);
auto future_cleanup = size_t(0);
// Wait for pending read-aheads in the background.
for (shard_id shard = 0; shard != smp::count; ++shard) {
auto& rs = _readers[shard];
if (auto maybe_dismantling_state = std::get_if<dismantling_state>(&rs)) {
++immediate_cleanup;
cleanup(shard, std::move(*maybe_dismantling_state));
} else if (auto maybe_future_dismantling_state = std::get_if<future_dismantling_state>(&rs)) {
++future_cleanup;
futures.emplace_back(maybe_future_dismantling_state->fut.then_wrapped([=] (future<dismantling_state>&& current_state_fut) {
if (current_state_fut.failed()) {
mmq_log.debug("Failed to stop reader on shard {}: {}", shard, current_state_fut.get_exception());
++_db.local().get_stats().multishard_query_failed_reader_stops;
} else {
cleanup(shard, current_state_fut.get0());
}
}));
}
}
if (const auto total = immediate_cleanup + future_cleanup) {
tracing::trace(_trace_state,
"Stopping {} shard readers, {} ready for immediate cleanup, {} will be cleaned up after finishes read-ahead",
total,
immediate_cleanup,
future_cleanup);
}
return when_all(futures.begin(), futures.end()).discard_result();
}).finally([pr = std::move(pr)] () mutable {
pr.set_value();
});
return fut;
}
read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(circular_buffer<mutation_fragment> combined_buffer,
@@ -455,40 +397,6 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
return stats;
}
read_context::ready_to_save_state* read_context::prepare_reader_for_saving(
dismantling_state& current_state,
future<stopped_reader>&& stopped_reader_fut,
const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey) {
const auto shard = current_state.params.get_owner_shard();
auto& rs = _readers[shard];
if (stopped_reader_fut.failed()) {
mmq_log.debug("Failed to stop reader on shard {}: {}", shard, stopped_reader_fut.get_exception());
++_db.local().get_stats().multishard_query_failed_reader_stops;
// We don't want to leave the reader in dismantling state, lest stop()
// will try to wait on the reader_fut again and crash the application.
rs = {};
return nullptr;
}
auto stopped_reader = stopped_reader_fut.get0();
// If the buffer is empty just overwrite it.
// If it has some data in it append the fragments to the back.
// The unconsumed fragments appended here come from the
// foreign_reader which is at the lowest layer, hence its
// fragments need to be at the back of the buffer.
if (current_state.buffer.empty()) {
current_state.buffer = std::move(stopped_reader.unconsumed_fragments);
} else {
std::move(stopped_reader.unconsumed_fragments.begin(), stopped_reader.unconsumed_fragments.end(), std::back_inserter(current_state.buffer));
}
rs = ready_to_save_state{std::move(current_state.params), std::move(current_state.read_operation), std::move(stopped_reader.remote_reader),
std::move(current_state.buffer)};
return &std::get<ready_to_save_state>(rs);
}
future<> read_context::save_reader(ready_to_save_state& current_state, const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey) {
const auto shard = current_state.reader.get_owner_shard();
@@ -573,6 +481,8 @@ future<> read_context::save_readers(circular_buffer<mutation_fragment> unconsume
return make_ready_future<>();
}
return _dismantling_gate.close().then([this, unconsumed_buffer = std::move(unconsumed_buffer), compaction_state = std::move(compaction_state),
last_ckey = std::move(last_ckey)] () mutable {
auto last_pkey = compaction_state.partition_start.key();
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
@@ -593,39 +503,17 @@ future<> read_context::save_readers(circular_buffer<mutation_fragment> unconsume
return save_reader(std::get<ready_to_save_state>(rs), last_pkey, last_ckey);
}
auto finish_saving = [this, &last_pkey, &last_ckey] (dismantling_state& current_state) {
return current_state.reader_fut.then_wrapped([this, &current_state, &last_pkey, &last_ckey] (
future<stopped_reader>&& stopped_reader_fut) mutable {
if (auto* ready_state = prepare_reader_for_saving(current_state, std::move(stopped_reader_fut), last_pkey, last_ckey)) {
return save_reader(*ready_state, last_pkey, last_ckey);
}
return make_ready_future<>();
});
};
if (auto* maybe_dismantling_state = std::get_if<dismantling_state>(&rs)) {
return finish_saving(*maybe_dismantling_state);
}
if (auto* maybe_future_dismantling_state = std::get_if<future_dismantling_state>(&rs)) {
return maybe_future_dismantling_state->fut.then_wrapped([this, &rs,
finish_saving = std::move(finish_saving)] (future<dismantling_state>&& next_state_fut) mutable {
if (next_state_fut.failed()) {
mmq_log.debug("Failed to stop reader: {}", next_state_fut.get_exception());
++_db.local().get_stats().multishard_query_failed_reader_stops;
// We don't want to leave the reader in future dismantling state, lest
// stop() will try to wait on the fut again and crash the application.
rs = {};
return make_ready_future<>();
}
rs = next_state_fut.get0();
return finish_saving(std::get<dismantling_state>(rs));
});
auto& current_state = *maybe_dismantling_state;
rs = ready_to_save_state{std::move(current_state.params), std::move(current_state.read_operation),
std::move(current_state.reader), std::move(current_state.buffer)};
return save_reader(std::get<ready_to_save_state>(rs), last_pkey, last_ckey);
}
return make_ready_future<>();
});
});
});
}
static future<reconcilable_result> do_query_mutations(