repair: Reduce unnecessary streaming traffic
If the remote peers have the same checksum, we can only fetch from one of the peer node instead of all of them since they all have the same data anyway. No need to fetch from all of them. In addition to above optimization, if the local peer has no data, we can skip sending the data back to the remote peer. Due to the fact that all the remote peers have the same checksum and local peer has no data, so each and every remote peer has all the data. There is no need to merge the remote data with local data and send back the merged data back to remote peers. Refs: #1617
This commit is contained in:
@@ -420,11 +420,14 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
|
||||
static void sync_range(seastar::sharded<database>& db,
|
||||
const sstring& keyspace, const sstring& cf,
|
||||
const ::range<dht::token>& range,
|
||||
std::vector<gms::inet_address>& neighbors,
|
||||
const std::vector<gms::inet_address>& neighbors_in,
|
||||
const std::vector<gms::inet_address>& neighbors_out,
|
||||
streaming::stream_plan& sp_in,
|
||||
streaming::stream_plan& sp_out) {
|
||||
for (const auto& peer : neighbors) {
|
||||
for (const auto& peer : neighbors_in) {
|
||||
sp_in.request_ranges(peer, keyspace, {range}, {cf});
|
||||
}
|
||||
for (const auto& peer : neighbors_out) {
|
||||
sp_out.transfer_ranges(peer, keyspace, {range}, {cf});
|
||||
}
|
||||
}
|
||||
@@ -541,6 +544,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
// we set success=false so repair will fail, but we can
|
||||
// still do our best to repair available replicas.
|
||||
std::vector<gms::inet_address> live_neighbors;
|
||||
std::vector<partition_checksum> live_neighbors_checksum;
|
||||
for (unsigned i = 0; i < checksums.size(); i++) {
|
||||
if (checksums[i].failed()) {
|
||||
logger.warn(
|
||||
@@ -555,18 +559,43 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
// (and discard) all the exceptions.
|
||||
} else if (i > 0) {
|
||||
live_neighbors.push_back(neighbors[i - 1]);
|
||||
live_neighbors_checksum.push_back(checksums[i].get0());
|
||||
}
|
||||
}
|
||||
if (!checksums[0].available() || live_neighbors.empty()) {
|
||||
if (!checksums[0].available() || live_neighbors.empty() || live_neighbors_checksum.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// If one of the available checksums is different, repair
|
||||
// all the neighbors which returned a checksum.
|
||||
auto checksum0 = checksums[0].get();
|
||||
for (unsigned i = 1; i < checksums.size(); i++) {
|
||||
if (checksums[i].available() && checksum0 != checksums[i].get()) {
|
||||
logger.info("Found differing range {} on nodes {}", range, live_neighbors);
|
||||
sync_range(db, keyspace, cf, range, live_neighbors, sp_in, sp_out);
|
||||
auto checksum0 = checksums[0].get0();
|
||||
auto all_live_neighbors_have_same_checksum = std::all_of(live_neighbors_checksum.begin() + 1,
|
||||
live_neighbors_checksum.end(), [&live_neighbors_checksum] (const auto& checksum) {
|
||||
return checksum == live_neighbors_checksum.front();
|
||||
});
|
||||
std::vector<gms::inet_address> live_neighbors_in(live_neighbors);
|
||||
std::vector<gms::inet_address> live_neighbors_out(live_neighbors);
|
||||
if (all_live_neighbors_have_same_checksum) {
|
||||
// Since all the live neighbors have the same checksum,
|
||||
// we can fetch data from one of the them instead all of them.
|
||||
// TODO: Choose a best node from live_neighbors, not the first one
|
||||
logger.debug("Reduce live_neighbors_in {} to one node, range = {}", live_neighbors_in, range);
|
||||
live_neighbors_in.resize(1);
|
||||
// - If local node has zero data and all peer nodes have
|
||||
// the same data we can skip sending data to peer node.
|
||||
// - If local node has data and all peer nodes have the
|
||||
// same data, we need to fetch data from one of the
|
||||
// peer node and merge the data with local data and
|
||||
// send back to *all* the peer node.
|
||||
if (checksum0 == partition_checksum()) {
|
||||
logger.debug("Reduce live_neighbors_out {} to zero node, range = {}", live_neighbors_out, range);
|
||||
live_neighbors_out.clear();
|
||||
}
|
||||
}
|
||||
for (const auto& checksum : live_neighbors_checksum) {
|
||||
if (checksum0 != checksum) {
|
||||
logger.info("Found differing range {} on nodes {}, in = {}, out = {}", range,
|
||||
live_neighbors, live_neighbors_in, live_neighbors_out);
|
||||
sync_range(db, keyspace, cf, range, live_neighbors_in, live_neighbors_out, sp_in, sp_out);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user