Merge "Repair switch to rpc stream" from Asias
"
The put_row_diff, get_row_dif and get_full_row_hashes verbs are switched
to use rpc stream instead of rpc verb. They are the verbs that could
send big rpc messages. The rpc stream sink and source are created per
repair follower for each of the above 3 verbs. The sink and source are
shared for multiple requests during the entire repair operation for a
given range, so there is no overhead to setup rpc stream.
The row buffer is now increased to 32MiB from 256KiB, giving better
bandwidth in high latency links. The downside of bigger row buffer is
reduced possibility that all the rows inside a row buffer are identical.
This causes more full hashes to be exchanged. To address this issue, the
plan is to add better set reconciliation algorithm in addition to the
current send full hashes.
I compared rebuild using regular stream plan with repair using rpc
stream. With 2 nodes, 1 smp, 8M rows, delete all data on one of the
node before repair or rebuild.
repair using seastar rpc verb
Time to complete: 82.17s
rebuild using regular streaming which uses seastar rpc stream
Time to complete: 63.87s
repair using seastar rpc stream
Time to complete: 68.48s
For 1) and 3), the improvement is 16.6% (repair using rpc verb v.s. repair using rpc stream)
For 2) and 3), the difference is 7.2% (repair v.s. stream)
The result is promising for the future repair-based bootstrap/replace node operations.
NOTE: We do not actually enable rpc stream in row level repair for now. We
will enable it after we fix the the stall issues caused by handling
bigger row buffers.
Fixes #4581
"
* 'repair_switch_to_rpc_stream_v9' of https://github.com/asias/scylla: (45 commits)
docs: Add RPC stream doc for row level repair
repair: Mark some of the helper functions static
repair: Increase max row buf size
repair: Hook rpc stream version of verbs in row level repair
repair: Add use_rpc_stream to repair_meta
repair: Add is_rpc_stream_supported
repair: Add needs_all_rows flag to put_row_diff
repair: Optimize get_row_diff
repair: Register repair_get_full_row_hashes_with_rpc_strea
repair: Register repair_put_row_diff_with_rpc_stream
repair: Register repair_get_row_diff_with_rpc_stream
repair: Add repair_get_full_row_hashes_with_rpc_stream_handler
repair: Add repair_put_row_diff_with_rpc_stream_handler
repair: Add repair_get_row_diff_with_rpc_stream_handler
repair: Add repair_get_full_row_hashes_with_rpc_stream_process_op
repair: Add repair_put_row_diff_with_rpc_stream_process_op
repair: Add repair_get_row_diff_with_rpc_stream_process_op
repair: Add put_row_diff_with_rpc_stream
repair: Add put_row_diff_sink_op
repair: Add put_row_diff_source_op
...
This commit is contained in:
@@ -162,3 +162,122 @@ In the result, we saw the rows on wire were as expected.
|
||||
|
||||
tx_row_nr = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000
|
||||
rx_row_nr = 500233 + 500235 + 499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000
|
||||
|
||||
## RPC stream usage in repair
|
||||
|
||||
Some of the RPC verbs used by row level repair can transmit bulk of data,
|
||||
namely REPAIR_GET_FULL_ROW_HASHES, REPAIR_GET_ROW_DIFF and REPAIR_PUT_ROW_DIFF.
|
||||
To have better repair bandwidth with high latency link, we want to increase the
|
||||
row buff size. It is more efficent to send large amount of data with RPC
|
||||
stream interface instead of the RPC verb interface. We want to switch to RPC
|
||||
stream for such RPC verbs. Three functions, get_full_row_hashes(),
|
||||
get_row_diff() and put_row_diff(), are converted to use the new RPC stream
|
||||
interface.
|
||||
|
||||
|
||||
### 1) get_full_row_hashes()
|
||||
A new REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM rpc veb is introduced.
|
||||
|
||||
#### The repair master sends: repair_stream_cmd.
|
||||
|
||||
The repair_stream_cmd can be:
|
||||
- repair_stream_cmd::get_full_row_hashes
|
||||
Asks the repair follower to send all the hashes in working row buffer.
|
||||
|
||||
#### The repair follower replies: repair_hash_with_cmd.
|
||||
|
||||
```
|
||||
struct repair_hash_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_hash hash;
|
||||
};
|
||||
```
|
||||
|
||||
The repair_stream_cmd in repair_hash_with_cmd can be:
|
||||
|
||||
- repair_stream_cmd::hash_data
|
||||
One of the hashes in the working row buffer. The hash is stored in repair_hash_with_cmd.hash.
|
||||
|
||||
- repair_stream_cmd::end_of_current_hash_set
|
||||
Notifies repair master that repair follower has send all the hashes in the working row buffer
|
||||
|
||||
- repair_stream_cmd::error
|
||||
Notifies an error has happened on the follower
|
||||
|
||||
|
||||
### 2) get_row_diff()
|
||||
|
||||
A new REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced.
|
||||
|
||||
```
|
||||
struct repair_hash_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_hash hash;
|
||||
};
|
||||
|
||||
struct repair_row_on_wire_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_row_on_wire row;
|
||||
};
|
||||
```
|
||||
|
||||
#### The repair master sends: repair_hash_with_cmd
|
||||
|
||||
The repair_stream_cmd in repair_hash_with_cmd can be:
|
||||
|
||||
- repair_stream_cmd::needs_all_rows
|
||||
Asks the repair follower to send all the rows in the working row buffer.
|
||||
|
||||
- repair_stream_cmd::hash_data
|
||||
Contains the hash for the row in the working row buffer that repair master
|
||||
needs. The hash is stored in repair_hash_with_cmd.hash.
|
||||
|
||||
- repair_stream_cmd::end_of_current_hash_set
|
||||
|
||||
Notifies repair follower that repair master has sent all the rows it needs.
|
||||
|
||||
#### The repair follower replies: repair_row_on_wire_with_cmd
|
||||
|
||||
The repair_stream_cmd in repair_row_on_wire_with_cmd can be:
|
||||
|
||||
- repair_stream_cmd::row_data
|
||||
This is one of the row repair master requested. The row data is stored in
|
||||
repair_row_on_wire_with_cmd.row.
|
||||
|
||||
- repair_stream_cmd::end_of_current_rows
|
||||
Notifes repair follower has sent all the rows repair master requested.
|
||||
|
||||
- repair_stream_cmd::error
|
||||
Notifies an error has happened on the follower.
|
||||
|
||||
|
||||
### 3) put_row_diff()
|
||||
|
||||
A new REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced.
|
||||
|
||||
```
|
||||
struct repair_row_on_wire_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_row_on_wire row;
|
||||
};
|
||||
```
|
||||
|
||||
#### The repair master sends: repair_row_on_wire_with_cmd
|
||||
|
||||
The repair_stream_cmd in repair_row_on_wire_with_cmd can be:
|
||||
|
||||
- repair_stream_cmd::row_data
|
||||
Contains one of the rows that repair master wants to send to repair follower.
|
||||
The row data is stored in repair_row_on_wire_with_cmd.row.
|
||||
|
||||
- repair_stream_cmd::end_of_current_rows
|
||||
Notifies repair follower that repair master has sent all the rows it wants to send.
|
||||
|
||||
#### The repair follower replies: repair_stream_cmd
|
||||
|
||||
- repair_stream_cmd::put_rows_done
|
||||
Notifies repair master that repair follower has received and applied all the
|
||||
rows repair master sent.
|
||||
|
||||
- repair_stream_cmd::error
|
||||
Notifies an error has happened on the follower.
|
||||
|
||||
@@ -76,4 +76,26 @@ struct get_combined_row_hash_response {
|
||||
|
||||
enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
send_full_set,
|
||||
send_full_set_rpc_stream,
|
||||
};
|
||||
|
||||
enum class repair_stream_cmd : uint8_t {
|
||||
error,
|
||||
hash_data,
|
||||
row_data,
|
||||
end_of_current_hash_set,
|
||||
needs_all_rows,
|
||||
end_of_current_rows,
|
||||
get_full_row_hashes,
|
||||
put_rows_done,
|
||||
};
|
||||
|
||||
struct repair_hash_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_hash hash;
|
||||
};
|
||||
|
||||
struct repair_row_on_wire_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
partition_key_and_mutation_fragments row;
|
||||
};
|
||||
|
||||
@@ -464,6 +464,9 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS:
|
||||
case messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS:
|
||||
case messaging_verb::REPAIR_GET_DIFF_ALGORITHMS:
|
||||
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
|
||||
return 2;
|
||||
case messaging_verb::MUTATION_DONE:
|
||||
case messaging_verb::MUTATION_FAILED:
|
||||
@@ -691,6 +694,67 @@ void messaging_service::register_stream_mutation_fragments(std::function<future<
|
||||
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
|
||||
}
|
||||
|
||||
template<class SinkType, class SourceType>
|
||||
future<rpc::sink<SinkType>, rpc::source<SourceType>>
|
||||
do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr<messaging_service::rpc_protocol_client_wrapper> rpc_client, std::unique_ptr<messaging_service::rpc_protocol_wrapper>& rpc) {
|
||||
return rpc_client->make_stream_sink<netw::serializer, SinkType>().then([&rpc, verb, repair_meta_id, rpc_client] (rpc::sink<SinkType> sink) mutable {
|
||||
auto rpc_handler = rpc->make_client<rpc::source<SourceType> (uint32_t, rpc::sink<SinkType>)>(verb);
|
||||
return rpc_handler(*rpc_client, repair_meta_id, sink).then_wrapped([sink, rpc_client] (future<rpc::source<SourceType>> source) mutable {
|
||||
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
|
||||
return make_ready_future<rpc::sink<SinkType>, rpc::source<SourceType>>(std::move(sink), std::move(source.get0()));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>
|
||||
messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
|
||||
auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM;
|
||||
auto rpc_client = get_rpc_client(verb, id);
|
||||
return do_make_sink_source<repair_hash_with_cmd, repair_row_on_wire_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
|
||||
}
|
||||
|
||||
rpc::sink<repair_row_on_wire_with_cmd> messaging_service::make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source<repair_hash_with_cmd>& source) {
|
||||
return source.make_sink<netw::serializer, repair_row_on_wire_with_cmd>();
|
||||
}
|
||||
|
||||
void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_row_on_wire_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_hash_with_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>
|
||||
messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
|
||||
auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM;
|
||||
auto rpc_client = get_rpc_client(verb, id);
|
||||
return do_make_sink_source<repair_row_on_wire_with_cmd, repair_stream_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
|
||||
}
|
||||
|
||||
rpc::sink<repair_stream_cmd> messaging_service::make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source<repair_row_on_wire_with_cmd>& source) {
|
||||
return source.make_sink<netw::serializer, repair_stream_cmd>();
|
||||
}
|
||||
|
||||
void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_stream_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>
|
||||
messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
|
||||
auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM;
|
||||
auto rpc_client = get_rpc_client(verb, id);
|
||||
return do_make_sink_source<repair_stream_cmd, repair_hash_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
|
||||
}
|
||||
|
||||
rpc::sink<repair_hash_with_cmd> messaging_service::make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source<repair_stream_cmd>& source) {
|
||||
return source.make_sink<netw::serializer, repair_hash_with_cmd>();
|
||||
}
|
||||
|
||||
void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
|
||||
// Send a message for verb
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
|
||||
@@ -130,7 +130,10 @@ enum class messaging_verb : int32_t {
|
||||
REPAIR_GET_ESTIMATED_PARTITIONS= 33,
|
||||
REPAIR_SET_ESTIMATED_PARTITIONS= 34,
|
||||
REPAIR_GET_DIFF_ALGORITHMS = 35,
|
||||
LAST = 36,
|
||||
REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM = 36,
|
||||
REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM = 37,
|
||||
REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM = 38,
|
||||
LAST = 39,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -274,6 +277,21 @@ public:
|
||||
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment>& source);
|
||||
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_row_on_wire_with_cmd> make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source<repair_hash_with_cmd>& source);
|
||||
void register_repair_get_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_row_on_wire_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_hash_with_cmd> source)>&& func);
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_stream_cmd> make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source<repair_row_on_wire_with_cmd>& source);
|
||||
void register_repair_put_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_stream_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source)>&& func);
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_hash_with_cmd> make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source<repair_stream_cmd>& source);
|
||||
void register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source)>&& func);
|
||||
|
||||
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
|
||||
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id);
|
||||
|
||||
|
||||
@@ -69,6 +69,8 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo
|
||||
switch (algo) {
|
||||
case row_level_diff_detect_algorithm::send_full_set:
|
||||
return out << "send_full_set";
|
||||
case row_level_diff_detect_algorithm::send_full_set_rpc_stream:
|
||||
return out << "send_full_set_rpc_stream";
|
||||
};
|
||||
return out << "unknown";
|
||||
}
|
||||
|
||||
@@ -332,6 +332,9 @@ class partition_key_and_mutation_fragments {
|
||||
partition_key _key;
|
||||
std::list<frozen_mutation_fragment> _mfs;
|
||||
public:
|
||||
partition_key_and_mutation_fragments()
|
||||
: _key(std::vector<bytes>() ) {
|
||||
}
|
||||
partition_key_and_mutation_fragments(partition_key key, std::list<frozen_mutation_fragment> mfs)
|
||||
: _key(std::move(key))
|
||||
, _mfs(std::move(mfs)) {
|
||||
@@ -346,8 +349,30 @@ public:
|
||||
using repair_row_on_wire = partition_key_and_mutation_fragments;
|
||||
using repair_rows_on_wire = std::list<partition_key_and_mutation_fragments>;
|
||||
|
||||
enum class repair_stream_cmd : uint8_t {
|
||||
error,
|
||||
hash_data,
|
||||
row_data,
|
||||
end_of_current_hash_set,
|
||||
needs_all_rows,
|
||||
end_of_current_rows,
|
||||
get_full_row_hashes,
|
||||
put_rows_done,
|
||||
};
|
||||
|
||||
struct repair_hash_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_hash hash;
|
||||
};
|
||||
|
||||
struct repair_row_on_wire_with_cmd {
|
||||
repair_stream_cmd cmd;
|
||||
repair_row_on_wire row;
|
||||
};
|
||||
|
||||
enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
send_full_set,
|
||||
send_full_set_rpc_stream,
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
|
||||
|
||||
@@ -57,9 +57,79 @@ struct shard_config {
|
||||
sstring partitioner_name;
|
||||
};
|
||||
|
||||
static bool inject_rpc_stream_error = false;
|
||||
|
||||
distributed<db::system_distributed_keyspace>* _sys_dist_ks;
|
||||
distributed<db::view::view_update_generator>* _view_update_generator;
|
||||
|
||||
// Wraps sink and source objects for repair master or repair follower nodes.
|
||||
// For repair master, it stores sink and source pair for each of the followers.
|
||||
// For repair follower, it stores one sink and source pair for repair master.
|
||||
template<class SinkType, class SourceType>
|
||||
class sink_source_for_repair {
|
||||
uint32_t _repair_meta_id;
|
||||
using get_sink_source_fn_type = std::function<future<rpc::sink<SinkType>, rpc::source<SourceType>> (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr)>;
|
||||
using sink_type = std::reference_wrapper<rpc::sink<SinkType>>;
|
||||
using source_type = std::reference_wrapper<rpc::source<SourceType>>;
|
||||
// The vectors below store sink and source object for peer nodes.
|
||||
std::vector<std::optional<rpc::sink<SinkType>>> _sinks;
|
||||
std::vector<std::optional<rpc::source<SourceType>>> _sources;
|
||||
std::vector<bool> _sources_closed;
|
||||
get_sink_source_fn_type _fn;
|
||||
public:
|
||||
sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn)
|
||||
: _repair_meta_id(repair_meta_id)
|
||||
, _sinks(nr_peer_nodes)
|
||||
, _sources(nr_peer_nodes)
|
||||
, _sources_closed(nr_peer_nodes, false)
|
||||
, _fn(std::move(fn)) {
|
||||
}
|
||||
void mark_source_closed(unsigned node_idx) {
|
||||
_sources_closed[node_idx] = true;
|
||||
}
|
||||
future<sink_type, source_type> get_sink_source(gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (_sinks[node_idx] && _sources[node_idx]) {
|
||||
return make_ready_future<sink_type, source_type>(_sinks[node_idx].value(), _sources[node_idx].value());
|
||||
}
|
||||
if (_sinks[node_idx] || _sources[node_idx]) {
|
||||
return make_exception_future<sink_type, source_type>(std::runtime_error(format("sink or source is missing for node {}", remote_node)));
|
||||
}
|
||||
return _fn(_repair_meta_id, netw::messaging_service::msg_addr(remote_node)).then([this, node_idx] (rpc::sink<SinkType> sink, rpc::source<SourceType> source) mutable {
|
||||
_sinks[node_idx].emplace(std::move(sink));
|
||||
_sources[node_idx].emplace(std::move(source));
|
||||
return make_ready_future<sink_type, source_type>(_sinks[node_idx].value(), _sources[node_idx].value());
|
||||
});
|
||||
}
|
||||
future<> close() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable {
|
||||
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
|
||||
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
|
||||
return f.finally([this, node_idx] {
|
||||
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
|
||||
if (source_opt && !_sources_closed[node_idx]) {
|
||||
return repeat([&source_opt] () mutable {
|
||||
// Keep reading source until end of stream
|
||||
return (*source_opt)().then([] (std::optional<std::tuple<SourceType>> opt) mutable {
|
||||
if (opt) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
}).handle_exception([] (std::exception_ptr ep) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
using sink_source_for_get_full_row_hashes = sink_source_for_repair<repair_stream_cmd, repair_hash_with_cmd>;
|
||||
using sink_source_for_get_row_diff = sink_source_for_repair<repair_hash_with_cmd, repair_row_on_wire_with_cmd>;
|
||||
using sink_source_for_put_row_diff = sink_source_for_repair<repair_row_on_wire_with_cmd, repair_stream_cmd>;
|
||||
|
||||
struct row_level_repair_metrics {
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
uint64_t tx_row_nr{0};
|
||||
@@ -130,6 +200,11 @@ static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(const st
|
||||
return common_algorithms.back();
|
||||
}
|
||||
|
||||
static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) {
|
||||
// send_full_set is the only algorithm that does not support rpc stream
|
||||
return algo != row_level_diff_detect_algorithm::send_full_set;
|
||||
}
|
||||
|
||||
static uint64_t get_random_seed() {
|
||||
static thread_local std::default_random_engine random_engine{std::random_device{}()};
|
||||
static thread_local std::uniform_int_distribution<uint64_t> random_dist{};
|
||||
@@ -524,6 +599,9 @@ private:
|
||||
std::vector<std::unordered_set<repair_hash>> _peer_row_hash_sets;
|
||||
// Gate used to make sure pending operation of meta data is done
|
||||
seastar::gate _gate;
|
||||
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
|
||||
sink_source_for_get_row_diff _sink_source_for_get_row_diff;
|
||||
sink_source_for_put_row_diff _sink_source_for_put_row_diff;
|
||||
public:
|
||||
repair_stats& stats() {
|
||||
return _stats;
|
||||
@@ -543,6 +621,9 @@ public:
|
||||
const repair_hash& working_row_buf_combined_hash() const {
|
||||
return _working_row_buf_combined_hash;
|
||||
}
|
||||
bool use_rpc_stream() const {
|
||||
return is_rpc_stream_supported(_algo);
|
||||
}
|
||||
|
||||
public:
|
||||
repair_meta(
|
||||
@@ -583,14 +664,30 @@ public:
|
||||
_seed,
|
||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||
)
|
||||
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes) {
|
||||
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes)
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||
})
|
||||
, _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes,
|
||||
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, addr);
|
||||
})
|
||||
, _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes,
|
||||
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return netw::get_local_messaging_service().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr);
|
||||
})
|
||||
{
|
||||
}
|
||||
|
||||
public:
|
||||
future<> stop() {
|
||||
auto gate_future = _gate.close();
|
||||
auto writer_future = _repair_writer.wait_for_writer_done();
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future));
|
||||
auto f1 = _sink_source_for_get_full_row_hashes.close();
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3));
|
||||
}
|
||||
|
||||
static std::unordered_map<node_repair_meta_id, lw_shared_ptr<repair_meta>>& repair_meta_map() {
|
||||
@@ -980,7 +1077,11 @@ private:
|
||||
get_row_diff(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
|
||||
std::list<repair_row> rows;
|
||||
if (needs_all_rows) {
|
||||
rows = _working_row_buf;
|
||||
if (!_repair_master || _nr_peer_nodes == 1) {
|
||||
rows = std::move(_working_row_buf);
|
||||
} else {
|
||||
rows = _working_row_buf;
|
||||
}
|
||||
} else {
|
||||
rows = boost::copy_range<std::list<repair_row>>(_working_row_buf |
|
||||
boost::adaptors::filtered([&set_diff] (repair_row& r) { return set_diff.count(r.hash()) > 0; }));
|
||||
@@ -1120,6 +1221,63 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
future<> get_full_row_hashes_source_op(
|
||||
lw_shared_ptr<std::unordered_set<repair_hash>> current_hashes,
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_hash_with_cmd>& source) {
|
||||
return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable {
|
||||
return source().then([this, current_hashes, remote_node, node_idx] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
|
||||
current_hashes->insert(hash_cmd.hash);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
|
||||
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
|
||||
} else {
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
} else {
|
||||
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {
|
||||
return sink(repair_stream_cmd::get_full_row_hashes).then([&sink] {
|
||||
return sink.flush();
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unordered_set<repair_hash>>
|
||||
get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (remote_node == _myip) {
|
||||
return get_full_row_hashes_handler();
|
||||
}
|
||||
auto current_hashes = make_lw_shared<std::unordered_set<repair_hash>>();
|
||||
return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx).then(
|
||||
[this, current_hashes, remote_node, node_idx]
|
||||
(rpc::sink<repair_stream_cmd>& sink, rpc::source<repair_hash_with_cmd>& source) mutable {
|
||||
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
|
||||
auto sink_op = get_full_row_hashes_sink_op(sink);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
}).then([current_hashes] () mutable {
|
||||
return std::move(*current_hashes);
|
||||
});
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<std::unordered_set<repair_hash>>
|
||||
get_full_row_hashes_handler() {
|
||||
@@ -1176,8 +1334,8 @@ public:
|
||||
return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
|
||||
utils::fb_utilities::get_broadcast_address())));
|
||||
}
|
||||
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}",
|
||||
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range);
|
||||
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
|
||||
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
|
||||
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version));
|
||||
}
|
||||
|
||||
@@ -1291,6 +1449,102 @@ public:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
private:
|
||||
future<> get_row_diff_source_op(
|
||||
update_peer_row_hash_sets update_hash_set,
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::sink<repair_hash_with_cmd>& sink,
|
||||
rpc::source<repair_row_on_wire_with_cmd>& source) {
|
||||
auto current_rows = make_lw_shared<repair_rows_on_wire>();
|
||||
return repeat([this, current_rows, update_hash_set, remote_node, node_idx, &sink, &source] () mutable {
|
||||
return source().then([this, current_rows, update_hash_set, remote_node, node_idx] (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) mutable {
|
||||
if (row_opt) {
|
||||
if (inject_rpc_stream_error) {
|
||||
throw std::runtime_error("get_row_diff: Inject sender error in source loop");
|
||||
}
|
||||
auto row = std::move(std::get<0>(row_opt.value()));
|
||||
if (row.cmd == repair_stream_cmd::row_data) {
|
||||
rlogger.trace("get_row_diff: Got repair_row_on_wire with data");
|
||||
current_rows->push_back(std::move(row.row));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
|
||||
rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt");
|
||||
return apply_rows(std::move(*current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx).then([current_rows] {
|
||||
current_rows->clear();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
} else if (row.cmd == repair_stream_cmd::error) {
|
||||
throw std::runtime_error("get_row_diff: Peer failed to process");
|
||||
} else {
|
||||
throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
} else {
|
||||
_sink_source_for_get_row_diff.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("get_row_diff: Got unexpected end of stream");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> get_row_diff_sink_op(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
rpc::sink<repair_hash_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (std::unordered_set<repair_hash>& set_diff) mutable {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop"));
|
||||
}
|
||||
if (needs_all_rows) {
|
||||
rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows");
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()}).then([&sink] () mutable {
|
||||
return sink.flush();
|
||||
});
|
||||
}
|
||||
return do_for_each(set_diff, [&sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([&sink] () mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
}).then([&sink] () mutable {
|
||||
return sink.flush();
|
||||
});
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<> get_row_diff_with_rpc_stream(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
update_peer_row_hash_sets update_hash_set,
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx) {
|
||||
if (needs_all_rows || !set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (needs_all_rows) {
|
||||
set_diff.clear();
|
||||
} else {
|
||||
stats().tx_hashes_nr += set_diff.size();
|
||||
_metrics.tx_hashes_nr += set_diff.size();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
return _sink_source_for_get_row_diff.get_sink_source(remote_node, node_idx).then(
|
||||
[this, set_diff = std::move(set_diff), needs_all_rows, update_hash_set, remote_node, node_idx]
|
||||
(rpc::sink<repair_hash_with_cmd>& sink, rpc::source<repair_row_on_wire_with_cmd>& source) mutable {
|
||||
auto source_op = get_row_diff_source_op(update_hash_set, remote_node, node_idx, sink, source);
|
||||
auto sink_op = get_row_diff_sink_op(std::move(set_diff), needs_all_rows, sink, remote_node);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<repair_rows_on_wire> get_row_diff_handler(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows) {
|
||||
return with_gate(_gate, [this, &set_diff, needs_all_rows] {
|
||||
@@ -1301,12 +1555,12 @@ public:
|
||||
|
||||
// RPC API
|
||||
// Send rows in the _working_row_buf with hash within the given sef_diff
|
||||
future<> put_row_diff(const std::unordered_set<repair_hash>& set_diff, gms::inet_address remote_node) {
|
||||
future<> put_row_diff(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
std::list<repair_row> row_diff = get_row_diff(set_diff);
|
||||
std::list<repair_row> row_diff = get_row_diff(set_diff, needs_all_rows);
|
||||
if (row_diff.size() != set_diff.size()) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
}
|
||||
@@ -1321,6 +1575,83 @@ public:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
private:
|
||||
future<> put_row_diff_source_op(
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_stream_cmd>& source) {
|
||||
return repeat([this, remote_node, node_idx, &source] () mutable {
|
||||
return source().then([this, remote_node, node_idx] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
|
||||
if (status_opt) {
|
||||
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
|
||||
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
|
||||
if (status == repair_stream_cmd::put_rows_done) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} else if (status == repair_stream_cmd::error) {
|
||||
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff hanlder, status={}", remote_node, int(status)));
|
||||
} else {
|
||||
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
} else {
|
||||
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> put_row_diff_sink_op(
|
||||
repair_rows_on_wire rows,
|
||||
rpc::sink<repair_row_on_wire_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(rows), [&sink, remote_node] (repair_rows_on_wire& rows) mutable {
|
||||
return do_for_each(rows, [&sink] (repair_row_on_wire& row) mutable {
|
||||
rlogger.trace("put_row_diff: send row");
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}).then([&sink] () mutable {
|
||||
rlogger.trace("put_row_diff: send empty row");
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}).then([&sink] () mutable {
|
||||
rlogger.trace("put_row_diff: send done");
|
||||
return sink.flush();
|
||||
});
|
||||
});
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<> put_row_diff_with_rpc_stream(
|
||||
const std::unordered_set<repair_hash>& set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
std::list<repair_row> row_diff = get_row_diff(set_diff, needs_all_rows);
|
||||
if (row_diff.size() != set_diff.size()) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
}
|
||||
stats().tx_row_nr += row_diff.size();
|
||||
stats().tx_row_nr_peer[remote_node] += row_diff.size();
|
||||
stats().tx_row_bytes += get_repair_rows_size(row_diff);
|
||||
stats().rpc_call_nr++;
|
||||
return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, node_idx] (repair_rows_on_wire rows) {
|
||||
return _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx).then(
|
||||
[this, rows = std::move(rows), remote_node, node_idx]
|
||||
(rpc::sink<repair_row_on_wire_with_cmd>& sink, rpc::source<repair_stream_cmd>& source) mutable {
|
||||
auto source_op = put_row_diff_source_op(remote_node, node_idx, source);
|
||||
auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) {
|
||||
return with_gate(_gate, [this, rows = std::move(rows), from] () mutable {
|
||||
@@ -1329,10 +1660,280 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source,
|
||||
bool &error,
|
||||
std::unordered_set<repair_hash>& current_set_diff,
|
||||
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
|
||||
current_set_diff.insert(hash_cmd.hash);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, set_diff = std::move(current_set_diff)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows));
|
||||
}).then([sink] (repair_rows_on_wire rows_on_wire) mutable {
|
||||
if (rows_on_wire.empty()) {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
}
|
||||
return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable {
|
||||
return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}).then([sink] () mutable {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
});
|
||||
});
|
||||
}).then([sink] () mutable {
|
||||
return sink.flush();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
|
||||
}
|
||||
}
|
||||
|
||||
static future<stop_iteration> repair_put_row_diff_with_rpc_stream_process_op(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_stream_cmd> sink,
|
||||
rpc::source<repair_row_on_wire_with_cmd> source,
|
||||
bool& error,
|
||||
repair_rows_on_wire& current_rows,
|
||||
std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) {
|
||||
auto row = std::move(std::get<0>(row_opt.value()));
|
||||
if (row.cmd == repair_stream_cmd::row_data) {
|
||||
rlogger.trace("Got repair_rows_on_wire from peer={}, got row_data", from);
|
||||
current_rows.push_back(std::move(row.row));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
|
||||
rlogger.trace("Got repair_rows_on_wire from peer={}, got end_of_current_rows", from);
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, rows = std::move(current_rows)] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->put_row_diff_handler(std::move(rows), from);
|
||||
}).then([sink] () mutable {
|
||||
return sink(repair_stream_cmd::put_rows_done);
|
||||
}).then([sink] () mutable {
|
||||
return sink.flush();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
|
||||
}
|
||||
}
|
||||
|
||||
static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process_op(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_hash_with_cmd> sink,
|
||||
rpc::source<repair_stream_cmd> source,
|
||||
bool &error,
|
||||
std::optional<std::tuple<repair_stream_cmd>> status_opt) {
|
||||
repair_stream_cmd status = std::get<0>(status_opt.value());
|
||||
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
|
||||
if (status == repair_stream_cmd::get_full_row_hashes) {
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
}).then([sink] (std::unordered_set<repair_hash> hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (std::unordered_set<repair_hash>& hashes) mutable {
|
||||
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([sink] () mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
});
|
||||
});
|
||||
}).then([sink] () mutable {
|
||||
return sink.flush();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
|
||||
}
|
||||
}
|
||||
|
||||
static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source) {
|
||||
return do_with(false, std::unordered_set<repair_hash>(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, std::unordered_set<repair_hash>& current_set_diff) mutable {
|
||||
return repeat([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable {
|
||||
return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_get_row_diff_with_rpc_stream_process_op(from,
|
||||
src_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_stream_cmd> sink,
|
||||
rpc::source<repair_row_on_wire_with_cmd> source) {
|
||||
return do_with(false, repair_rows_on_wire(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, repair_rows_on_wire& current_rows) mutable {
|
||||
return repeat([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] () mutable {
|
||||
return source().then([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) mutable {
|
||||
if (row_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_put_row_diff_with_rpc_stream_process_op(from,
|
||||
src_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_hash_with_cmd> sink,
|
||||
rpc::source<repair_stream_cmd> source) {
|
||||
return repeat([from, src_cpu_id, repair_meta_id, sink, source] () mutable {
|
||||
return do_with(false, [from, src_cpu_id, repair_meta_id, sink, source] (bool& error) mutable {
|
||||
return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
|
||||
if (status_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_get_full_row_hashes_with_rpc_stream_process_op(from,
|
||||
src_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_init_messaging_service_handler(repair_service& rs, distributed<db::system_distributed_keyspace>& sys_dist_ks, distributed<db::view::view_update_generator>& view_update_generator) {
|
||||
_sys_dist_ks = &sys_dist_ks;
|
||||
_view_update_generator = &view_update_generator;
|
||||
return netw::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
ms.register_repair_get_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_hash_with_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
|
||||
repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
|
||||
});
|
||||
});
|
||||
ms.register_repair_put_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
|
||||
repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_stream_cmd> source) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
|
||||
repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
@@ -1470,9 +2071,6 @@ class row_level_repair {
|
||||
// A flag indicates any error during the repair
|
||||
bool _failed = false;
|
||||
|
||||
// Max buffer size per repair round
|
||||
static constexpr size_t _max_row_buf_size = 256 * 1024;
|
||||
|
||||
// Seed for the repair row hashing. If we ever had a hash conflict for a row
|
||||
// and we are not using stable hash, there is chance we will fix the row in
|
||||
// the next repair.
|
||||
@@ -1499,6 +2097,11 @@ private:
|
||||
all_done,
|
||||
};
|
||||
|
||||
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
|
||||
// Max buffer size per repair round
|
||||
return is_rpc_stream_supported(algo) ? 32 * 1024 * 1024 : 256 * 1024;
|
||||
}
|
||||
|
||||
// Step A: Negotiate sync boundary to use
|
||||
op_status negotiate_sync_boundary(repair_meta& master) {
|
||||
check_in_shutdown();
|
||||
@@ -1624,13 +2227,26 @@ private:
|
||||
|
||||
// Fast path: if local has zero row and remote has rows, request them all.
|
||||
if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) {
|
||||
master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get();
|
||||
master.peer_row_hash_sets(node_idx).clear();
|
||||
if (master.use_rpc_stream()) {
|
||||
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc stream");
|
||||
master.get_row_diff_with_rpc_stream({}, repair_meta::needs_all_rows_t::yes, repair_meta::update_peer_row_hash_sets::yes, node, node_idx).get();
|
||||
} else {
|
||||
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc verb");
|
||||
master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
rlogger.debug("Before master.get_full_row_hashes for node {}, hash_sets={}",
|
||||
node, master.peer_row_hash_sets(node_idx).size());
|
||||
// Ask the peer to send the full list hashes in the working row buf.
|
||||
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0();
|
||||
rlogger.debug("Calling master.get_full_row_hashes for node {}, hash_sets={}",
|
||||
if (master.use_rpc_stream()) {
|
||||
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes_with_rpc_stream(node, node_idx).get0();
|
||||
} else {
|
||||
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0();
|
||||
}
|
||||
rlogger.debug("After master.get_full_row_hashes for node {}, hash_sets={}",
|
||||
node, master.peer_row_hash_sets(node_idx).size());
|
||||
|
||||
// With hashes of rows from peer node, we can figure out
|
||||
@@ -1642,12 +2258,16 @@ private:
|
||||
// between repair master and repair follower 2.
|
||||
std::unordered_set<repair_hash> set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes());
|
||||
// Request missing sets from peer node
|
||||
rlogger.debug("Calling master.get_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff);
|
||||
rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size());
|
||||
// If we need to pull all rows from the peer. We can avoid
|
||||
// sending the row hashes on wire by setting needs_all_rows flag.
|
||||
auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size());
|
||||
master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get();
|
||||
if (master.use_rpc_stream()) {
|
||||
master.get_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, repair_meta::update_peer_row_hash_sets::no, node, node_idx).get();
|
||||
} else {
|
||||
master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get();
|
||||
}
|
||||
rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myip(), master.working_row_hashes().size());
|
||||
}
|
||||
return op_status::next_step;
|
||||
@@ -1662,8 +2282,13 @@ private:
|
||||
std::unordered_set<repair_hash> local_row_hash_sets = master.working_row_hashes();
|
||||
parallel_for_each(boost::irange(size_t(0), _all_live_peer_nodes.size()), [&, this] (size_t idx) {
|
||||
auto set_diff = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
|
||||
rlogger.trace("Calling master.put_row_diff to node {}, set_diff={}", _all_live_peer_nodes[idx], set_diff.size());
|
||||
return master.put_row_diff(set_diff, _all_live_peer_nodes[idx]);
|
||||
auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty());
|
||||
rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows);
|
||||
if (master.use_rpc_stream()) {
|
||||
return master.put_row_diff_with_rpc_stream(set_diff, needs_all_rows, _all_live_peer_nodes[idx], idx);
|
||||
} else {
|
||||
return master.put_row_diff(set_diff, needs_all_rows, _all_live_peer_nodes[idx]);
|
||||
}
|
||||
}).get();
|
||||
master.stats().round_nr_slow_path++;
|
||||
}
|
||||
@@ -1675,6 +2300,7 @@ public:
|
||||
_ri.check_in_abort();
|
||||
auto repair_meta_id = repair_meta::get_next_repair_meta_id().get0();
|
||||
auto algorithm = get_common_diff_detect_algorithm(_all_live_peer_nodes);
|
||||
auto max_row_buf_size = get_max_row_buf_size(algorithm);
|
||||
auto master_node_shard_config = shard_config {
|
||||
engine().cpu_id(),
|
||||
dht::global_partitioner().shard_count(),
|
||||
@@ -1689,7 +2315,7 @@ public:
|
||||
s,
|
||||
_range,
|
||||
algorithm,
|
||||
_max_row_buf_size,
|
||||
max_row_buf_size,
|
||||
_seed,
|
||||
repair_meta::repair_master::yes,
|
||||
repair_meta_id,
|
||||
@@ -1699,8 +2325,8 @@ public:
|
||||
// All nodes including the node itself.
|
||||
_all_nodes.insert(_all_nodes.begin(), master.myip());
|
||||
|
||||
rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}",
|
||||
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed);
|
||||
rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}",
|
||||
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size);
|
||||
|
||||
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user