forward_service: uncoroutinize dispatch method

Done to mitigate potential misscompilations.
This commit is contained in:
Michał Sala
2022-04-04 15:18:52 +02:00
parent edc32a7118
commit 28970389bc

View File

@@ -27,6 +27,7 @@
#include "replica/database.hh"
#include "schema.hh"
#include "schema_registry.hh"
#include "seastar/core/do_with.hh"
#include "service/pager/query_pagers.hh"
#include "tracing/trace_state.hh"
#include "tracing/tracing.hh"
@@ -320,10 +321,7 @@ future<> forward_service::uninit_messaging_service() {
return ser::forward_request_rpc_verbs::unregister(&_messaging);
}
future<query::forward_result> forward_service::dispatch(query::forward_request req_, tracing::trace_state_ptr tr_state_) {
query::forward_request req = std::move(req_);
tracing::trace_state_ptr tr_state = std::move(tr_state_);
future<query::forward_result> forward_service::dispatch(query::forward_request req, tracing::trace_state_ptr tr_state) {
schema_ptr schema = local_schema_registry().get(req.cmd.schema_version);
replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name());
// next_vnode is used to iterate through all vnodes produced by
@@ -357,49 +355,63 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
tracing::trace(tr_state, "Dispatching forward_request to {} endpoints", vnodes_per_addr.size());
retrying_dispatcher dispatcher(*this, tr_state);
std::optional<query::forward_result> result;
// Forward request to each endpoint and merge results.
co_await parallel_for_each(vnodes_per_addr.begin(), vnodes_per_addr.end(),
[this, &req, &result, &tr_state, &dispatcher] (std::pair<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_with_addr) -> future<> {
netw::messaging_service::msg_addr addr = vnodes_with_addr.first;
std::optional<query::forward_result>& result_ = result;
tracing::trace_state_ptr& tr_state_ = tr_state;
query::forward_request req_with_modified_pr = req;
req_with_modified_pr.pr = std::move(vnodes_with_addr.second);
return do_with(std::move(dispatcher), std::move(result), std::move(vnodes_per_addr), std::move(req), std::move(tr_state),
[this] (
retrying_dispatcher& dispatcher,
std::optional<query::forward_result>& result,
std::map<netw::messaging_service::msg_addr, dht::partition_range_vector>& vnodes_per_addr,
query::forward_request& req,
tracing::trace_state_ptr& tr_state
)-> future<query::forward_result> {
return parallel_for_each(vnodes_per_addr.begin(), vnodes_per_addr.end(),
[this, &req, &result, &tr_state, &dispatcher] (
std::pair<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_with_addr
) -> future<> {
netw::messaging_service::msg_addr addr = vnodes_with_addr.first;
std::optional<query::forward_result>& result_ = result;
tracing::trace_state_ptr& tr_state_ = tr_state;
retrying_dispatcher& dispatcher_ = dispatcher;
tracing::trace(tr_state_, "Sending forward_request to {}", addr);
flogger.debug("dispatching forward_request={} to address={}", req_with_modified_pr, addr);
query::forward_request req_with_modified_pr = req;
req_with_modified_pr.pr = std::move(vnodes_with_addr.second);
query::forward_result partial_result = co_await dispatcher.dispatch_to_node(
addr,
req_with_modified_pr
tracing::trace(tr_state_, "Sending forward_request to {}", addr);
flogger.debug("dispatching forward_request={} to address={}", req_with_modified_pr, addr);
query::forward_result partial_result = co_await dispatcher_.dispatch_to_node(
addr,
req_with_modified_pr
);
query::forward_result::printer partial_result_printer{
.types = req_with_modified_pr.reduction_types,
.res = partial_result
};
tracing::trace(tr_state_, "Received forward_result={} from {}", partial_result_printer, addr);
flogger.debug("received forward_result={} from {}", partial_result_printer, addr);
if (result_) {
result_->merge(partial_result, req_with_modified_pr.reduction_types);
} else {
result_ = partial_result;
}
}
).then(
[&result, &req, &tr_state] () -> query::forward_result {
query::forward_result::printer result_printer{
.types = req.reduction_types,
.res = *result
};
tracing::trace(tr_state, "Merged result is {}", result_printer);
flogger.debug("merged result is {}", result_printer);
return *result;
}
);
query::forward_result::printer partial_result_printer{
.types = req_with_modified_pr.reduction_types,
.res = partial_result
};
tracing::trace(tr_state_, "Received forward_result={} from {}", partial_result_printer, addr);
flogger.debug("received forward_result={} from {}", partial_result_printer, addr);
if (result_) {
result_->merge(partial_result, req_with_modified_pr.reduction_types);
} else {
result_ = partial_result;
}
}
);
query::forward_result::printer result_printer{
.types = req.reduction_types,
.res = *result
};
tracing::trace(tr_state, "Merged result is {}", result_printer);
flogger.debug("merged result is {}", result_printer);
co_return *result;
}
void forward_service::register_metrics() {