diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9f16d23e2d..7729b5240e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -596,102 +596,110 @@ private: }); } - future>, cache_temperature, replica::exception_variant>> - handle_read_data( - const rpc::client_info& cinfo, rpc::opt_time_point t, - query::read_command cmd1, ::compat::wrapping_partition_range pr, - rpc::optional oda, rpc::optional rate_limit_info_opt) { + enum class read_verb { + read_data, + read_mutation_data, + read_digest + }; + friend std::ostream& operator<<(std::ostream& os, const read_verb& verb) { + switch (verb) { + case read_verb::read_data: + os << "read_data"; + break; + case read_verb::read_mutation_data: + os << "read_mutation_data"; + break; + case read_verb::read_digest: + os << "read_digest"; + break; + } + return os; + } + template + future handle_read(const rpc::client_info& cinfo, rpc::opt_time_point t, + query::read_command cmd1, ::compat::wrapping_partition_range pr, + rpc::optional oda, + rpc::optional rate_limit_info_opt) + { tracing::trace_state_ptr trace_state_ptr; auto src_addr = netw::messaging_service::get_source(cinfo); if (cmd1.trace_info) { trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info); tracing::begin(trace_state_ptr); - tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr); + tracing::trace(trace_state_ptr, "{}: message received from /{}", verb, src_addr.addr); } - auto da = oda.value_or(query::digest_algorithm::MD5); auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate()); if (!cmd1.max_result_size) { - auto& cfg = _sp.local_db().get_config(); - cmd1.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit()); + if constexpr (verb == read_verb::read_data) { + auto& cfg = _sp.local_db().get_config(); + cmd1.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit()); + } else { + cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); + } } - shared_ptr p = _sp.shared_from_this(); auto cmd = make_lw_shared(std::move(cmd1)); - p->get_stats().replica_data_reads++; auto src_ip = src_addr.addr; auto timeout = t ? *t : db::no_timeout; schema_ptr s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout); auto pr2 = ::compat::unwrap(std::move(pr), *s); - if (pr2.second) { - // this function assumes singular queries but doesn't validate - throw std::runtime_error("READ_DATA called with wrapping range"); - } - query::result_options opts; - opts.digest_algo = da; - opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest; - future>, cache_temperature>> f = co_await coroutine::as_future(p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info)); - tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip); - co_return co_await encode_replica_exception_for_rpc>, cache_temperature, replica::exception_variant>>(p->features(), std::move(f)); + auto f = co_await coroutine::as_future(std::invoke([&]() { + if constexpr (verb == read_verb::read_data) { + if (pr2.second) { + // this function assumes singular queries but doesn't validate + throw std::runtime_error("READ_DATA called with wrapping range"); + } + p->get_stats().replica_data_reads++; + auto da = oda.value_or(query::digest_algorithm::MD5); + query::result_options opts; + opts.digest_algo = da; + opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest; + return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info); + } else if constexpr (verb == read_verb::read_mutation_data) { + p->get_stats().replica_mutation_data_reads++; + return p->query_mutations_locally(std::move(s), std::move(cmd), pr2, timeout, trace_state_ptr); + } else if constexpr (verb == read_verb::read_digest) { + if (pr2.second) { + // this function assumes singular queries but doesn't validate + throw std::runtime_error("READ_DIGEST called with wrapping range"); + } + p->get_stats().replica_digest_reads++; + auto da = oda.value_or(query::digest_algorithm::MD5); + return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info); + } else { + static_assert(verb == static_cast(-1), "Unsupported verb"); + } + })); + tracing::trace(trace_state_ptr, "{} handling is done, sending a response to /{}", verb, src_ip); + co_return co_await encode_replica_exception_for_rpc(p->features(), std::move(f)); } - future>, cache_temperature, replica::exception_variant>> - handle_read_mutation_data( + using read_data_result_t = rpc::tuple>, cache_temperature, replica::exception_variant>; + future handle_read_data( + const rpc::client_info& cinfo, rpc::opt_time_point t, + query::read_command cmd1, ::compat::wrapping_partition_range pr, + rpc::optional oda, + rpc::optional rate_limit_info_opt) { + return handle_read(cinfo, t, std::move(cmd1), + std::move(pr), oda, rate_limit_info_opt); + } + + using read_mutation_data_result_t = rpc::tuple>, cache_temperature, replica::exception_variant>; + future handle_read_mutation_data( const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd1, ::compat::wrapping_partition_range pr) { - tracing::trace_state_ptr trace_state_ptr; - auto src_addr = netw::messaging_service::get_source(cinfo); - if (cmd1.trace_info) { - trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info); - tracing::begin(trace_state_ptr); - tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr); - } - if (!cmd1.max_result_size) { - cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); - } - shared_ptr p = _sp.shared_from_this(); - ::compat::one_or_two_partition_ranges unwrapped({}); - auto cmd = make_lw_shared(std::move(cmd1)); - p->get_stats().replica_mutation_data_reads++; - auto src_ip = src_addr.addr; - auto timeout = t ? *t : db::no_timeout; - auto s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout); - unwrapped = ::compat::unwrap(std::move(pr), *s); - future>, cache_temperature>> f = co_await coroutine::as_future(p->query_mutations_locally(std::move(s), std::move(cmd), unwrapped, timeout, trace_state_ptr)); - tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip); - co_return co_await encode_replica_exception_for_rpc>, cache_temperature, replica::exception_variant>>(p->features(), std::move(f)); + return handle_read(cinfo, t, std::move(cmd1), + std::move(pr), std::nullopt, std::nullopt); } - future>> - handle_read_digest( + using read_digest_result_t = rpc::tuple>; + future handle_read_digest( const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd1, ::compat::wrapping_partition_range pr, - rpc::optional oda, rpc::optional rate_limit_info_opt) { - tracing::trace_state_ptr trace_state_ptr; - auto src_addr = netw::messaging_service::get_source(cinfo); - if (cmd1.trace_info) { - trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*cmd1.trace_info); - tracing::begin(trace_state_ptr); - tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr); - } - auto da = oda.value_or(query::digest_algorithm::MD5); - auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate()); - if (!cmd1.max_result_size) { - cmd1.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); - } - shared_ptr p = _sp.shared_from_this(); - auto cmd = make_lw_shared(std::move(cmd1)); - p->get_stats().replica_digest_reads++; - auto src_ip = src_addr.addr; - auto timeout = t ? *t : db::no_timeout; - schema_ptr s = co_await get_schema_for_read(cmd->schema_version, std::move(src_addr), timeout); - auto pr2 = ::compat::unwrap(std::move(pr), *s); - if (pr2.second) { - // this function assumes singular queries but doesn't validate - throw std::runtime_error("READ_DIGEST called with wrapping range"); - } - future>> f = co_await coroutine::as_future(p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info)); - tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip); - co_return co_await encode_replica_exception_for_rpc>>(p->features(), std::move(f)); + rpc::optional oda, + rpc::optional rate_limit_info_opt) { + return handle_read(cinfo, t, std::move(cmd1), + std::move(pr), oda, rate_limit_info_opt); } future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) {