repair: Do not explicitly switch sched group
When registering callbacks for row-level repair verbs the sched groups is assigned automatically with the help of messaging_service::scheduling_group_for_verb. Thus the the lambda will be called in the needed sched group, no need for manual switch. This removes the last occurence of global storage_service usage from row-level repair. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -30,7 +30,6 @@
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "db/view/view_update_checks.hh"
|
||||
#include "database.hh"
|
||||
@@ -2011,44 +2010,35 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
ms.register_repair_get_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_hash_with_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
|
||||
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
|
||||
});
|
||||
ms.register_repair_put_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
|
||||
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_stream_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
|
||||
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
|
||||
// Start a new fiber.
|
||||
(void)repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
|
||||
Reference in New Issue
Block a user