repair: Do not explicitly switch sched group

When registering callbacks for row-level repair verbs the
sched groups is assigned automatically with the help of
 messaging_service::scheduling_group_for_verb. Thus the
the lambda will be called in the needed sched group, no
need for manual switch.

This removes the last occurence of global storage_service
usage from row-level repair.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-02-10 22:10:33 +03:00
parent ccc102affa
commit ac998e9576

View File

@@ -30,7 +30,6 @@
#include "dht/i_partitioner.hh"
#include "utils/UUID.hh"
#include "utils/hash.hh"
#include "service/storage_service.hh"
#include "service/priority_manager.hh"
#include "db/view/view_update_checks.hh"
#include "database.hh"
@@ -2011,44 +2010,35 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
ms.register_repair_get_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_hash_with_cmd> source) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
// Start a new fiber.
(void)repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
// Start a new fiber.
(void)repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
});
ms.register_repair_put_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
// Start a new fiber.
(void)repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
// Start a new fiber.
(void)repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
});
ms.register_repair_get_full_row_hashes_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_stream_cmd> source) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
// Start a new fiber.
(void)repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
// Start a new fiber.
(void)repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
});
ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");