Files
scylladb/message/messaging_service.hh
Avi Kivity c96fc1d585 Merge "Introduce row level repair" from Asias
"
=== How the the partition level repair works

- The repair master decides which ranges to work on.
- The repair master splits the ranges to sub ranges which contains around 100
partitions.
- The repair master computes the checksum of the 100 partitions and asks the
related peers to compute the checksum of the 100 partitions.
- If the checksum matches, the data in this sub range is synced.
- If the checksum mismatches, repair master fetches the data from all the peers
and sends back the merged data to peers.

=== Major problems with partition level repair

- A mismatch of a single row in any of the 100 partitions causes 100
partitions to be transferred. A single partition can be very large. Not to
mention the size of 100 partitions.

- Checksum (find the mismatch) and streaming (fix the mismatch) will read the
same data twice

=== Row level repair

Row level checksum and synchronization: detect row level mismatch and transfer
only the mismatch

=== How the row level repair works

- To solve the problem of reading data twice

Read the data only once for both checksum and synchronization between nodes.

We work on a small range which contains only a few mega bytes of rows,
We read all the rows within the small range into memory. Find the
mismatch and send the mismatch rows between peers.

We need to find a sync boundary among the nodes which contains only N bytes of
rows.

- To solve the problem of sending unnecessary data.

We need to find the mismatched rows between nodes and only send the delta.
The problem is called set reconciliation problem which is a common problem in
distributed systems.

For example:
Node1 has set1 = {row1, row2, row3}
Node2 has set2 = {      row2, row3}
Node3 has set3 = {row1, row2, row4}

To repair:
Node1 fetches nothing from Node2 (set2 - set1), fetches row4 (set3 - set1) from Node3.
Node1 sends row1 and row4 (set1 + set2 + set3 - set2) to Node2
Node1 sends row3 (set1 + set2 + set3 - set3) to Node3.

=== How to implement repair with set reconciliation

- Step A: Negotiate sync boundary

class repair_sync_boundary {
    dht::decorated_key pk;
    position_in_partition position
}

Reads rows from disk into row buffers until the size is larger than N
bytes. Return the repair_sync_boundary of the last mutation_fragment we
read from disk. The smallest repair_sync_boundary of all nodes is
set as the current_sync_boundary.

- Step B: Get missing rows from peer nodes so that repair master contains all the rows

Request combined hashes from all nodes between last_sync_boundary and
current_sync_boundary. If the combined hashes from all nodes are identical,
data is synced, goto Step A. If not, request the full hashes from peers.

At this point, the repair master knows exactly what rows are missing. Request the
missing rows from peer nodes.

Now, local node contains all the rows.

- Step C: Send missing rows to the peer nodes

Since local node also knows what peer nodes own, it sends the missing rows to
the peer nodes.

=== How the RPC API looks like

- repair_range_start()

Step A:
- request_sync_boundary()

Step B:
- request_combined_row_hashes()
- reqeust_full_row_hashes()
- request_row_diff()

Step C:
- send_row_diff()

- repair_range_stop()

=== Performance evaluation

We created a cluster of 3 Scylla nodes on AWS using i3.xlarge instance. We
created a keyspace with a replication factor of 3 and inserted 1 billion
rows to each of the 3 nodes. Each node has 241 GiB of data.
We tested 3 cases below.

1) 0% synced: one of the node has zero data. The other two nodes have 1 billion identical rows.

Time to repair:
   old = 87 min
   new = 70 min (rebuild took 50 minutes)
   improvement = 19.54%

2) 100% synced: all of the 3 nodes have 1 billion identical rows.
Time to repair:
   old = 43 min
   new = 24 min
   improvement = 44.18%

3) 99.9% synced: each node has 1 billion identical rows and 1 billion * 0.1% distinct rows.

Time to repair:
   old: 211 min
   new: 44 min
   improvement: 79.15%

Bytes sent on wire for repair:
   old: tx= 162 GiB,  rx = 90 GiB
   new: tx= 1.15 GiB, tx = 0.57 GiB
   improvement: tx = 99.29%, rx = 99.36%

It is worth noting that row level repair sends and receives exactly the
number of rows needed in theory.

In this test case, repair master needs to receives 2 million rows and
sends 4 million rows. Here are the details: Each node has 1 billion *
0.1% distinct rows, that is 1 million rows. So repair master receives 1
million rows from repair slave 1 and 1 million rows from repair slave 2.
Repair master sends 1 million rows from repair master and 1 million rows
received from repair slave 1 to repair slave 2. Repair master sends
sends 1 million rows from repair master and 1 million rows received from
repair slave 2 to repair slave 1.

In the result, we saw the rows on wire were as expected.

tx_row_nr  = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000
rx_row_nr  =  500233 + 500235 +  499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000

Fixes: #3033

Tests: dtests/repair_additional_test.py
"

* 'asias/row_level_repair_v7' of github.com:cloudius-systems/seastar-dev: (51 commits)
  repair: Enable row level repair
  repair: Add row_level_repair
  repair: Add docs for row level repair
  repair: Add repair_init_messaging_service_handler
  repair: Add repair_meta
  repair: Add repair_writer
  repair: Add repair_reader
  repair: Add repair_row
  repair: Add fragment_hasher
  repair: Add decorated_key_with_hash
  repair: Add get_random_seed
  repair: Add get_common_diff_detect_algorithm
  repair: Add shard_config
  repair: Add suportted_diff_detect_algorithms
  repair: Add repair_stats to repair_info
  repair: Introduce repair_stats
  flat_mutation_reader:  Add make_generating_reader
  storage_service: Introduce ROW_LEVEL_REPAIR feature
  messaging_service: Add RPC verbs for row level repair
  repair: Export the repair logger
  ...
2018-12-25 13:13:00 +02:00

459 lines
22 KiB
C++

/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "messaging_service_fwd.hh"
#include "msg_addr.hh"
#include <seastar/core/reactor.hh>
#include <seastar/core/distributed.hh>
#include <seastar/core/sstring.hh>
#include "gms/inet_address.hh"
#include <seastar/rpc/rpc_types.hh>
#include <unordered_map>
#include "query-request.hh"
#include "mutation_query.hh"
#include "range.hh"
#include "repair/repair.hh"
#include "tracing/tracing.hh"
#include "digest_algorithm.hh"
#include "streaming/stream_reason.hh"
#include "cache_temperature.hh"
#include <list>
#include <vector>
#include <optional>
#include <seastar/net/tls.hh>
// forward declarations
namespace streaming {
class prepare_message;
}
namespace gms {
class gossip_digest_syn;
class gossip_digest_ack;
class gossip_digest_ack2;
}
namespace utils {
class UUID;
}
namespace db {
class seed_provider_type;
}
namespace db::view {
class update_backlog;
}
class frozen_mutation;
class frozen_schema;
class partition_checksum;
namespace dht {
class token;
}
namespace query {
using partition_range = dht::partition_range;
class read_command;
class result;
}
namespace compat {
using wrapping_partition_range = wrapping_range<dht::ring_position>;
}
namespace netw {
/* All verb handler identifiers */
enum class messaging_verb : int32_t {
CLIENT_ID = 0,
MUTATION = 1,
MUTATION_DONE = 2,
READ_DATA = 3,
READ_MUTATION_DATA = 4,
READ_DIGEST = 5,
// Used by gossip
GOSSIP_DIGEST_SYN = 6,
GOSSIP_DIGEST_ACK = 7,
GOSSIP_DIGEST_ACK2 = 8,
GOSSIP_ECHO = 9,
GOSSIP_SHUTDOWN = 10,
// end of gossip verb
DEFINITIONS_UPDATE = 11,
TRUNCATE = 12,
REPLICATION_FINISHED = 13,
MIGRATION_REQUEST = 14,
// Used by streaming
PREPARE_MESSAGE = 15,
PREPARE_DONE_MESSAGE = 16,
STREAM_MUTATION = 17,
STREAM_MUTATION_DONE = 18,
COMPLETE_MESSAGE = 19,
// end of streaming verbs
REPAIR_CHECKSUM_RANGE = 20,
GET_SCHEMA_VERSION = 21,
SCHEMA_CHECK = 22,
COUNTER_MUTATION = 23,
MUTATION_FAILED = 24,
STREAM_MUTATION_FRAGMENTS = 25,
REPAIR_ROW_LEVEL_START = 26,
REPAIR_ROW_LEVEL_STOP = 27,
REPAIR_GET_FULL_ROW_HASHES = 28,
REPAIR_GET_COMBINED_ROW_HASH = 29,
REPAIR_GET_SYNC_BOUNDARY = 30,
REPAIR_GET_ROW_DIFF = 31,
REPAIR_PUT_ROW_DIFF = 32,
REPAIR_GET_ESTIMATED_PARTITIONS= 33,
REPAIR_SET_ESTIMATED_PARTITIONS= 34,
REPAIR_GET_DIFF_ALGORITHMS = 35,
LAST = 36,
};
} // namespace netw
namespace std {
template <>
class hash<netw::messaging_verb> {
public:
size_t operator()(const netw::messaging_verb& x) const {
return hash<int32_t>()(int32_t(x));
}
};
} // namespace std
namespace netw {
struct serializer {};
class messaging_service : public seastar::async_sharded_service<messaging_service> {
public:
struct rpc_protocol_wrapper;
struct rpc_protocol_client_wrapper;
struct rpc_protocol_server_wrapper;
struct shard_info;
using msg_addr = netw::msg_addr;
using inet_address = gms::inet_address;
using UUID = utils::UUID;
using clients_map = std::unordered_map<msg_addr, shard_info, msg_addr::hash>;
// This should change only if serialization format changes
static constexpr int32_t current_version = 0;
struct shard_info {
shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client);
shared_ptr<rpc_protocol_client_wrapper> rpc_client;
rpc::stats get_stats() const;
};
void foreach_client(std::function<void(const msg_addr& id, const shard_info& info)> f) const;
void increment_dropped_messages(messaging_verb verb);
uint64_t get_dropped_messages(messaging_verb verb) const;
const uint64_t* get_dropped_messages() const;
int32_t get_raw_version(const gms::inet_address& endpoint) const;
bool knows_version(const gms::inet_address& endpoint) const;
enum class encrypt_what {
none,
rack,
dc,
all,
};
enum class compress_what {
none,
dc,
all,
};
enum class tcp_nodelay_what {
local,
all,
};
struct memory_config {
size_t rpc_memory_limit = 1'000'000;
};
struct scheduling_config {
scheduling_group statement;
scheduling_group streaming;
scheduling_group gossip;
};
private:
gms::inet_address _listen_address;
uint16_t _port;
uint16_t _ssl_port;
encrypt_what _encrypt_what;
compress_what _compress_what;
tcp_nodelay_what _tcp_nodelay_what;
bool _should_listen_to_broadcast_address;
// map: Node broadcast address -> Node internal IP for communication within the same data center
std::unordered_map<gms::inet_address, gms::inet_address> _preferred_ip_cache;
std::unique_ptr<rpc_protocol_wrapper> _rpc;
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server;
::shared_ptr<seastar::tls::server_credentials> _credentials;
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
std::array<clients_map, 4> _clients;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
bool _stopping = false;
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
memory_config _mcfg;
scheduling_config _scheduling_config;
public:
using clock_type = lowres_clock;
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
uint16_t port = 7000, bool listen_now = true);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what,
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true);
~messaging_service();
public:
void start_listen();
uint16_t port();
gms::inet_address listen_address();
future<> stop_tls_server();
future<> stop_nontls_server();
future<> stop_client();
future<> stop();
static rpc::no_wait_type no_wait();
bool is_stopping() { return _stopping; }
public:
gms::inet_address get_preferred_ip(gms::inet_address ep);
future<> init_local_preferred_ip_cache();
void cache_preferred_ip(gms::inet_address ep, gms::inet_address ip);
// Wrapper for PREPARE_MESSAGE verb
void register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, UUID plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func);
future<streaming::prepare_message> send_prepare_message(msg_addr id, streaming::prepare_message msg, UUID plan_id,
sstring description, streaming::stream_reason);
// Wrapper for PREPARE_DONE_MESSAGE verb
void register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func);
future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for STREAM_MUTATION verb
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>, rpc::optional<streaming::stream_reason>)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented, streaming::stream_reason reason);
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment> source)>&& func);
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment>& source);
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id);
void register_complete_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed)>&& func);
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false);
// Wrapper for REPAIR_CHECKSUM_RANGE verb
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& func);
void unregister_repair_checksum_range();
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
void unregister_repair_get_full_row_hashes();
future<std::unordered_set<repair_hash>> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
void register_repair_get_combined_row_hash(std::function<future<repair_hash> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary)>&& func);
void unregister_repair_get_combined_row_hash();
future<repair_hash> send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary);
// Wrapper for REPAIR_GET_SYNC_BOUNDARY
void register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary)>&& func);
void unregister_repair_get_sync_boundary();
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary);
// Wrapper for REPAIR_GET_ROW_DIFF
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func);
void unregister_repair_get_row_diff();
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows);
// Wrapper for REPAIR_PUT_ROW_DIFF
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff)>&& func);
void unregister_repair_put_row_diff();
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
// Wrapper for REPAIR_ROW_LEVEL_START
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name)>&& func);
void unregister_repair_row_level_start();
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name);
// Wrapper for REPAIR_ROW_LEVEL_STOP
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
void unregister_repair_row_level_stop();
future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range);
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
void register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
void unregister_repair_get_estimated_partitions();
future<uint64_t> send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id);
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
void register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions)>&& func);
void unregister_repair_set_estimated_partitions();
future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions);
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
void register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func);
void unregister_repair_get_diff_algorithms();
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);
void unregister_gossip_echo();
future<> send_gossip_echo(msg_addr id);
// Wrapper for GOSSIP_SHUTDOWN
void register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from)>&& func);
void unregister_gossip_shutdown();
future<> send_gossip_shutdown(msg_addr id, inet_address from);
// Wrapper for GOSSIP_DIGEST_SYN
void register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_syn)>&& func);
void unregister_gossip_digest_syn();
future<> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg);
// Wrapper for GOSSIP_DIGEST_ACK
void register_gossip_digest_ack(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_ack)>&& func);
void unregister_gossip_digest_ack();
future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg);
// Wrapper for GOSSIP_DIGEST_ACK2
void register_gossip_digest_ack2(std::function<rpc::no_wait_type (gms::gossip_digest_ack2)>&& func);
void unregister_gossip_digest_ack2();
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
// Wrapper for DEFINITIONS_UPDATE
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm)>&& func);
void unregister_definitions_update();
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm);
// Wrapper for MIGRATION_REQUEST
void register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func);
void unregister_migration_request();
future<std::vector<frozen_mutation>> send_migration_request(msg_addr id);
// FIXME: response_id_type is an alias in service::storage_proxy::response_id_type
using response_id_type = uint64_t;
// Wrapper for MUTATION
void register_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::experimental::optional<tracing::trace_info>> trace_info)>&& func);
void unregister_mutation();
future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::experimental::optional<tracing::trace_info> trace_info = std::experimental::nullopt);
// Wrapper for COUNTER_MUTATION
void register_counter_mutation(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info)>&& func);
void unregister_counter_mutation();
future<> send_counter_mutation(msg_addr id, clock_type::time_point timeout, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info = std::experimental::nullopt);
// Wrapper for MUTATION_DONE
void register_mutation_done(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, rpc::optional<db::view::update_backlog> backlog)>&& func);
void unregister_mutation_done();
future<> send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id, db::view::update_backlog backlog);
// Wrapper for MUTATION_FAILED
void register_mutation_failed(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed, rpc::optional<db::view::update_backlog> backlog)>&& func);
void unregister_mutation_failed();
future<> send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed, db::view::update_backlog backlog);
// Wrapper for READ_DATA
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
void register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
void unregister_read_data();
future<query::result, rpc::optional<cache_temperature>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
// Wrapper for GET_SCHEMA_VERSION
void register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func);
void unregister_get_schema_version();
future<frozen_schema> send_get_schema_version(msg_addr, table_schema_version);
// Wrapper for SCHEMA_CHECK
void register_schema_check(std::function<future<utils::UUID>()>&& func);
void unregister_schema_check();
future<utils::UUID> send_schema_check(msg_addr);
// Wrapper for READ_MUTATION_DATA
void register_read_mutation_data(std::function<future<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr)>&& func);
void unregister_read_mutation_data();
future<reconcilable_result, rpc::optional<cache_temperature>> send_read_mutation_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr);
// Wrapper for READ_DIGEST
void register_read_digest(std::function<future<query::result_digest, api::timestamp_type, cache_temperature> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
void unregister_read_digest();
future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>> send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
// Wrapper for TRUNCATE
void register_truncate(std::function<future<>(sstring, sstring)>&& func);
void unregister_truncate();
future<> send_truncate(msg_addr, std::chrono::milliseconds, sstring, sstring);
// Wrapper for REPLICATION_FINISHED verb
void register_replication_finished(std::function<future<> (inet_address from)>&& func);
void unregister_replication_finished();
future<> send_replication_finished(msg_addr id, inet_address from);
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
public:
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
shared_ptr<rpc_protocol_client_wrapper> get_rpc_client(messaging_verb verb, msg_addr id);
void remove_error_rpc_client(messaging_verb verb, msg_addr id);
void remove_rpc_client(msg_addr id);
using drop_notifier_handler = decltype(_connection_drop_notifiers)::iterator;
drop_notifier_handler register_connection_drop_notifier(std::function<void(gms::inet_address ep)> cb);
void unregister_connection_drop_notifier(drop_notifier_handler h);
std::unique_ptr<rpc_protocol_wrapper>& rpc();
static msg_addr get_source(const rpc::client_info& client);
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
};
extern distributed<messaging_service> _the_messaging_service;
inline distributed<messaging_service>& get_messaging_service() {
return _the_messaging_service;
}
inline messaging_service& get_local_messaging_service() {
return _the_messaging_service.local();
}
} // namespace netw