From c19f5edefefe561aa161f4e08c218e34f4d5f2ca Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Jul 2015 08:53:48 +0800 Subject: [PATCH] messaging_service: Add wrapper for READ_MUTATION_DATA verb --- message/messaging_service.cc | 7 +++++++ message/messaging_service.hh | 4 ++++ service/storage_proxy.cc | 4 ++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7aedd12e59..a824263162 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -293,4 +293,11 @@ future messaging_service::send_read_data(shard_id id, query::read return send_message(messaging_verb::READ_DATA, std::move(id), cmd, pr); } +void messaging_service::register_read_mutation_data(std::function>> (query::read_command cmd, query::partition_range pr)>&& func) { + register_handler(net::messaging_verb::READ_MUTATION_DATA, std::move(func)); +} +future messaging_service::send_read_mutation_data(shard_id id, query::read_command& cmd, query::partition_range& pr) { + return send_message(messaging_verb::READ_MUTATION_DATA, std::move(id), cmd, pr); +} + } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 9b3748be5f..0a47417b1c 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -423,6 +423,10 @@ public: void register_read_data(std::function>> (query::read_command cmd, query::partition_range pr)>&& func); future send_read_data(shard_id id, query::read_command& cmd, query::partition_range& pr); + // Wrapper for READ_MUTATION_DATA + void register_read_mutation_data(std::function>> (query::read_command cmd, query::partition_range pr)>&& func); + future send_read_mutation_data(shard_id id, query::read_command& cmd, query::partition_range& pr); + private: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. rpc_protocol_client_wrapper& get_rpc_client(shard_id id); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 135cddf71d..6eab643208 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1470,7 +1470,7 @@ protected: return _proxy.query_mutations_locally(_cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); - return ms.send_message(net::messaging_verb::READ_MUTATION_DATA, net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range).then([this](reconcilable_result&& result) { + return ms.send_read_mutation_data(net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range).then([this](reconcilable_result&& result) { return make_foreign(::make_lw_shared(std::move(result))); }); } @@ -2829,7 +2829,7 @@ void storage_proxy::init_messaging_service() { return query_singular_local(cmd, pr); }); }); - ms.register_handler(net::messaging_verb::READ_MUTATION_DATA, [this] (query::read_command cmd, query::partition_range pr) { + ms.register_read_mutation_data([this] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), [this, cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr) { return query_mutations_locally(cmd, pr); });