storage_proxy.cc: extract handle_read

We continue the refactoring by introducing
the common implementation for all read methods.
This commit is contained in:
Petr Gusev
2023-05-12 17:54:09 +04:00
parent 2d791a5ed4
commit 4004ce1f44

View File

@@ -596,102 +596,110 @@ private:
});
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, replica::exception_variant>>
handle_read_data(
const rpc::client_info& cinfo, rpc::opt_time_point t,
query::read_command cmd1, ::compat::wrapping_partition_range pr,
rpc::optional<query::digest_algorithm> oda, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt) {
enum class read_verb {
read_data,
read_mutation_data,
read_digest
};
friend std::ostream& operator<<(std::ostream& os, const read_verb& verb) {
switch (verb) {
case read_verb::read_data:
os << "read_data";
break;
case read_verb::read_mutation_data:
os << "read_mutation_data";
break;
case read_verb::read_digest:
os << "read_digest";
break;
}
return os;
}
template<typename Result, read_verb verb>
future<Result> handle_read(const rpc::client_info& cinfo, rpc::opt_time_point t,
query::read_command cmd1, ::compat::wrapping_partition_range pr,
rpc::optional<query::digest_algorithm> oda,
rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt)
{
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
if (cmd1.trace_info) {
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info);
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr);
tracing::trace(trace_state_ptr, "{}: message received from /{}", verb, src_addr.addr);
}
auto da = oda.value_or(query::digest_algorithm::MD5);
auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate());
if (!cmd1.max_result_size) {
auto& cfg = _sp.local_db().get_config();
cmd1.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
if constexpr (verb == read_verb::read_data) {
auto& cfg = _sp.local_db().get_config();
cmd1.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
} else {
cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
}
shared_ptr<storage_proxy> p = _sp.shared_from_this();
auto cmd = make_lw_shared<query::read_command>(std::move(cmd1));
p->get_stats().replica_data_reads++;
auto src_ip = src_addr.addr;
auto timeout = t ? *t : db::no_timeout;
schema_ptr s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout);
auto pr2 = ::compat::unwrap(std::move(pr), *s);
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
query::result_options opts;
opts.digest_algo = da;
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> f = co_await coroutine::as_future(p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info));
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
co_return co_await encode_replica_exception_for_rpc<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, replica::exception_variant>>(p->features(), std::move(f));
auto f = co_await coroutine::as_future(std::invoke([&]() {
if constexpr (verb == read_verb::read_data) {
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
p->get_stats().replica_data_reads++;
auto da = oda.value_or(query::digest_algorithm::MD5);
query::result_options opts;
opts.digest_algo = da;
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info);
} else if constexpr (verb == read_verb::read_mutation_data) {
p->get_stats().replica_mutation_data_reads++;
return p->query_mutations_locally(std::move(s), std::move(cmd), pr2, timeout, trace_state_ptr);
} else if constexpr (verb == read_verb::read_digest) {
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DIGEST called with wrapping range");
}
p->get_stats().replica_digest_reads++;
auto da = oda.value_or(query::digest_algorithm::MD5);
return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info);
} else {
static_assert(verb == static_cast<read_verb>(-1), "Unsupported verb");
}
}));
tracing::trace(trace_state_ptr, "{} handling is done, sending a response to /{}", verb, src_ip);
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), std::move(f));
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature, replica::exception_variant>>
handle_read_mutation_data(
using read_data_result_t = rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, replica::exception_variant>;
future<read_data_result_t> handle_read_data(
const rpc::client_info& cinfo, rpc::opt_time_point t,
query::read_command cmd1, ::compat::wrapping_partition_range pr,
rpc::optional<query::digest_algorithm> oda,
rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt) {
return handle_read<read_data_result_t, read_verb::read_data>(cinfo, t, std::move(cmd1),
std::move(pr), oda, rate_limit_info_opt);
}
using read_mutation_data_result_t = rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature, replica::exception_variant>;
future<read_mutation_data_result_t> handle_read_mutation_data(
const rpc::client_info& cinfo, rpc::opt_time_point t,
query::read_command cmd1, ::compat::wrapping_partition_range pr) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
if (cmd1.trace_info) {
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info);
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr);
}
if (!cmd1.max_result_size) {
cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
shared_ptr<storage_proxy> p = _sp.shared_from_this();
::compat::one_or_two_partition_ranges unwrapped({});
auto cmd = make_lw_shared<query::read_command>(std::move(cmd1));
p->get_stats().replica_mutation_data_reads++;
auto src_ip = src_addr.addr;
auto timeout = t ? *t : db::no_timeout;
auto s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout);
unwrapped = ::compat::unwrap(std::move(pr), *s);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> f = co_await coroutine::as_future(p->query_mutations_locally(std::move(s), std::move(cmd), unwrapped, timeout, trace_state_ptr));
tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip);
co_return co_await encode_replica_exception_for_rpc<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature, replica::exception_variant>>(p->features(), std::move(f));
return handle_read<read_mutation_data_result_t, read_verb::read_mutation_data>(cinfo, t, std::move(cmd1),
std::move(pr), std::nullopt, std::nullopt);
}
future<rpc::tuple<query::result_digest, long, cache_temperature, replica::exception_variant, std::optional<full_position>>>
handle_read_digest(
using read_digest_result_t = rpc::tuple<query::result_digest, long, cache_temperature, replica::exception_variant, std::optional<full_position>>;
future<read_digest_result_t> handle_read_digest(
const rpc::client_info& cinfo, rpc::opt_time_point t,
query::read_command cmd1, ::compat::wrapping_partition_range pr,
rpc::optional<query::digest_algorithm> oda, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
if (cmd1.trace_info) {
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info);
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
}
auto da = oda.value_or(query::digest_algorithm::MD5);
auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate());
if (!cmd1.max_result_size) {
cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
shared_ptr<storage_proxy> p = _sp.shared_from_this();
auto cmd = make_lw_shared<query::read_command>(std::move(cmd1));
p->get_stats().replica_digest_reads++;
auto src_ip = src_addr.addr;
auto timeout = t ? *t : db::no_timeout;
schema_ptr s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout);
auto pr2 = ::compat::unwrap(std::move(pr), *s);
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DIGEST called with wrapping range");
}
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature, std::optional<full_position>>> f = co_await coroutine::as_future(p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info));
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
co_return co_await encode_replica_exception_for_rpc<rpc::tuple<query::result_digest, long, cache_temperature, replica::exception_variant, std::optional<full_position>>>(p->features(), std::move(f));
rpc::optional<query::digest_algorithm> oda,
rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt) {
return handle_read<read_digest_result_t, read_verb::read_digest>(cinfo, t, std::move(cmd1),
std::move(pr), oda, rate_limit_info_opt);
}
future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) {