Merge 'repair: row_level: coroutinize some slow-path functions' from Avi Kivity

This series coroutinizes up some functions in repair/row_level.cc. This enhances
readability and reduces bloat:

```
size  build/release/repair/row_level.o.{before,after}
   text	   data	    bss	    dec	    hex	filename
1650619	     48	    524	1651191	 1931f7	build/release/repair/row_level.o.before
1604610	     48	    524	1605182	 187e3e	build/release/repair/row_level.o.after
```

46kB of text were saved.

Functions that only touch a single mutation fragment were not coroutinized to avoid
adding a allocation in a fast path. In one case a function was split into a fast path and a
slow path.

Clean-up series, backport not needed.

Closes scylladb/scylladb#20283

* github.com:scylladb/scylladb:
  repair: row_level: restore indentation
  repair: row_level: coroutinize repair_meta::get_full_row_hashes_sink_op()
  repair: row_level: coroutinize repair_meta::get_full_row_hashes_source_op()
  repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_handler()
  repair: row_level: coroutinize repair_put_row_diff_with_rpc_stream_handler()
  repair: row_level: coroutinize repair_get_row_diff_with_rpc_stream_handler()
  repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_process()
  repair: row_level: coroutinize repair_get_row_diff_with_rpc_stream_process_op_slow_path()
  repair: row_level: split repair_get_row_diff_with_rpc_stream_process_op() into fast and slow paths
  repair: row_level: coroutinize repair_meta::put_row_diff_handler()
  repair: row_level: coroutinize repair_meta::put_row_diff_sink_op()
  repair: row_level: coroutinize repair_meta::put_row_diff_source_op()
  repair: row_level: coroutinize repair_meta::put_row_diff()
  repair: row_level: coroutinize repair_meta::get_row_diff_handler()
  repair: row_level: coroutinize repair_meta::get_row_diff_sink_op()
  repair: row_level: coroutinize repair_meta::to_repair_rows_on_wire()
  repair: row_level: coroutinize repair_meta::do_apply_rows()
  repair: row_level: coroutinize repair_meta::copy_rows_from_working_row_buf_within_set_diff()
  repair: row_level: coroutinize repair_meta::copy_rows_from_working_row_buf()
  repair: row_level: coroutinize repair_meta::row_buf_csum()
  repair: row_level: coroutinize repair_meta::get_repairs_row_size()
  repair: row_level: coroutinize repair_meta::set_estimated_partitions()
  repair: row_level: coroutinize repair_meta::get_estimated_partitions()
  repair: row_level: coroutinize repair_meta::do_estimate_partitions_on_local_shard()
  repair: row_level: coroutinize repair_reader::close()
  repair: row_level: coroutinize repair_reader::end_of_stream()
  repair: row_level: coroutinize sink_source_for_repair::close()
  repair: row_level: coroutinize sink_source_for_repair::get_sink_source()
This commit is contained in:
Pavel Emelyanov
2024-09-03 14:41:21 +03:00

View File

@@ -45,6 +45,7 @@
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/as_future.hh>
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
@@ -145,39 +146,32 @@ public:
future<std::tuple<sink_type, source_type>> get_sink_source(gms::inet_address remote_node, unsigned node_idx, std::optional<shard_id> dst_cpu_id) {
using value_type = std::tuple<sink_type, source_type>;
if (_sinks[node_idx] && _sources[node_idx]) {
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
if (_sinks[node_idx] || _sources[node_idx]) {
return make_exception_future<value_type>(std::runtime_error(format("sink or source is missing for node {}", remote_node)));
throw std::runtime_error(format("sink or source is missing for node {}", remote_node));
}
return _fn(_repair_meta_id, dst_cpu_id, netw::messaging_service::msg_addr(remote_node)).then_unpack([this, node_idx] (rpc::sink<SinkType> sink, rpc::source<SourceType> source) mutable {
_sinks[node_idx].emplace(std::move(sink));
_sources[node_idx].emplace(std::move(source));
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
});
auto [sink, source] = co_await _fn(_repair_meta_id, dst_cpu_id, netw::messaging_service::msg_addr(remote_node));
_sinks[node_idx].emplace(std::move(sink));
_sources[node_idx].emplace(std::move(source));
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
future<> close() {
return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable {
co_await coroutine::parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) -> future<> {
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
return f.finally([this, node_idx] {
f = co_await coroutine::as_future(std::move(f));
try {
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
if (source_opt && !_sources_closed[node_idx]) {
return repeat([&source_opt] () mutable {
while (co_await (*source_opt)()) {
// Keep reading source until end of stream
return (*source_opt)().then([] (std::optional<std::tuple<SourceType>> opt) mutable {
if (opt) {
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
}).handle_exception([] (std::exception_ptr ep) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
});
}
}
return make_ready_future<>();
});
} catch (...) {
// Ignore the exception as we're just draining *source_opt
}
f.get();
});
}
};
@@ -378,18 +372,16 @@ repair_reader::read_mutation_fragment() {
}
future<> repair_reader::on_end_of_stream() noexcept {
return _reader.close().then([this] {
_permit.release_base_resources();
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
_reader_handle.reset();
});
co_await _reader.close();
_permit.release_base_resources();
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
_reader_handle.reset();
}
future<> repair_reader::close() noexcept {
return _reader.close().then([this] {
_permit.release_base_resources();
_reader_handle.reset();
});
co_await _reader.close();
_permit.release_base_resources();
_reader_handle.reset();
}
void repair_reader::set_current_dk(const dht::decorated_key& key) {
@@ -1023,49 +1015,41 @@ private:
future<uint64_t> do_estimate_partitions_on_local_shard() {
auto& cf = _db.local().find_column_family(_schema->id());
return do_with(cf.get_sstables(), uint64_t(0), [this] (lw_shared_ptr<const sstable_list>& sstables, uint64_t& partition_count) {
return do_for_each(*sstables, [this, &partition_count] (const sstables::shared_sstable& sst) mutable {
partition_count += sst->estimated_keys_for_range(_range);
}).then([&partition_count] {
return partition_count;
});
});
lw_shared_ptr<const sstable_list> sstables = cf.get_sstables();
uint64_t partition_count = 0;
for (const sstables::shared_sstable& sst : *sstables) {
partition_count += sst->estimated_keys_for_range(_range);
co_await coroutine::maybe_yield();
}
co_return partition_count;
}
future<uint64_t> get_estimated_partitions() {
return with_gate(_gate, [this] {
auto gate_held = _gate.hold();
if (_repair_master || _same_sharding_config || _is_tablet) {
return do_estimate_partitions_on_local_shard();
co_return co_await do_estimate_partitions_on_local_shard();
} else {
return do_with(dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard), uint64_t(0), uint64_t(0), [this] (auto& sharder, auto& partitions_sum, auto& subranges) mutable {
return repeat([this, &sharder, &partitions_sum, &subranges] () mutable {
auto shard_range = sharder.next();
if (shard_range) {
++subranges;
return do_estimate_partitions_on_all_shards(*shard_range).then([&partitions_sum] (uint64_t partitions) mutable {
partitions_sum += partitions;
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
}).then([this, &partitions_sum, &subranges] {
_local_range_estimation = local_range_estimation {
.master_subranges_count = subranges,
.partitions_count = partitions_sum
};
return partitions_sum;
});
});
auto sharder = dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard);
auto partitions_sum = uint64_t(0);
auto subranges = uint64_t(0);
for (auto shard_range = sharder.next(); shard_range; shard_range = sharder.next()) {
++subranges;
auto partitions = co_await do_estimate_partitions_on_all_shards(*shard_range);
partitions_sum += partitions;
}
_local_range_estimation = local_range_estimation {
.master_subranges_count = subranges,
.partitions_count = partitions_sum
};
co_return partitions_sum;
}
});
}
future<> set_estimated_partitions(uint64_t estimated_partitions) {
return with_gate(_gate, [this, estimated_partitions] {
_estimated_partitions = estimated_partitions;
_repair_writer->set_estimated_partitions(_estimated_partitions);
});
auto gate_held = _gate.hold();
_estimated_partitions = estimated_partitions;
_repair_writer->set_estimated_partitions(_estimated_partitions);
co_return;
}
dht::static_sharder make_remote_sharder() {
@@ -1082,13 +1066,12 @@ private:
}
future<size_t> get_repair_rows_size(const std::list<repair_row>& rows) const {
return do_with(size_t(0), [&rows] (size_t& sz) {
return do_for_each(rows, [&sz] (const repair_row& r) mutable {
sz += r.size();
}).then([&sz] {
return sz;
});
});
size_t sz = 0;
for (const repair_row& r : rows) {
sz += r.size();
co_await coroutine::maybe_yield();
}
co_return sz;
}
// Get the size of rows in _row_buf
@@ -1098,13 +1081,12 @@ private:
// return the combined checksum of rows in _row_buf
future<repair_hash> row_buf_csum() {
return do_with(repair_hash(), [this] (repair_hash& combined) {
return do_for_each(_row_buf, [&combined] (repair_row& r) mutable {
combined.add(r.hash());
}).then([&combined] {
return combined;
});
});
repair_hash combined;
for (repair_row& r : _row_buf) {
combined.add(r.hash());
co_await coroutine::maybe_yield();
}
co_return combined;
}
void handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
@@ -1318,27 +1300,24 @@ private:
future<std::list<repair_row>>
copy_rows_from_working_row_buf() {
return do_with(std::list<repair_row>(), [this] (std::list<repair_row>& rows) {
return do_for_each(_working_row_buf, [&rows] (const repair_row& r) {
rows.push_back(r);
}).then([&rows] {
return std::move(rows);
});
});
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
rows.push_back(r);
co_await coroutine::maybe_yield();
}
co_return rows;
}
future<std::list<repair_row>>
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
return do_with(std::list<repair_row>(), std::move(set_diff),
[this] (std::list<repair_row>& rows, repair_hash_set& set_diff) {
return do_for_each(_working_row_buf, [&set_diff, &rows] (const repair_row& r) {
if (set_diff.contains(r.hash())) {
rows.push_back(r);
}
}).then([&rows] {
return std::move(rows);
});
});
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
if (set_diff.contains(r.hash())) {
rows.push_back(r);
}
co_await coroutine::maybe_yield();
}
co_return rows;
}
// Return rows in the _working_row_buf with hash within the given sef_diff
@@ -1356,31 +1335,24 @@ private:
}
}
future<> do_apply_rows(std::list<repair_row>&& row_diff, update_working_row_buf update_buf) {
return do_with(std::move(row_diff), [this, update_buf] (std::list<repair_row>& row_diff) {
return with_semaphore(_repair_writer->sem(), 1, [this, update_buf, &row_diff] {
_repair_writer->create_writer();
return repeat([this, update_buf, &row_diff] () mutable {
if (row_diff.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
repair_row& r = row_diff.front();
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
r.reset_mutation_fragment();
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer->do_write(std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
row_diff.pop_front();
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
});
});
future<> do_apply_rows(std::list<repair_row> row_diff, update_working_row_buf update_buf) {
auto sem_units = co_await get_units(_repair_writer->sem(), 1);
_repair_writer->create_writer();
while (!row_diff.empty()) {
repair_row& r = row_diff.front();
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
r.reset_mutation_fragment();
auto dk_with_hash = r.get_dk_with_hash();
co_await _repair_writer->do_write(std::move(dk_with_hash), std::move(mf));
row_diff.pop_front();
co_await coroutine::maybe_yield();
}
}
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
@@ -1440,33 +1412,30 @@ private:
future<repair_rows_on_wire> to_repair_rows_on_wire(std::list<repair_row> row_list) {
lw_shared_ptr<const decorated_key_with_hash> last_dk_with_hash;
return do_with(repair_rows_on_wire(), std::move(row_list), std::move(last_dk_with_hash),
[this] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& last_dk_with_hash) {
return get_repair_rows_size(row_list).then([this, &rows, &row_list, &last_dk_with_hash] (size_t row_bytes) {
_metrics.tx_row_nr += row_list.size();
_metrics.tx_row_bytes += row_bytes;
return do_for_each(row_list, [this, &rows, &last_dk_with_hash] (repair_row& r) {
const auto& dk_with_hash = r.get_dk_with_hash();
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
if (rows.empty()) {
auto pk = dk_with_hash->dk.key();
last_dk_with_hash = dk_with_hash;
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
} else {
auto& row = rows.back();
if (last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0) {
row.push_mutation_fragment(std::move(r.get_frozen_mutation()));
} else {
auto pk = dk_with_hash->dk.key();
last_dk_with_hash = dk_with_hash;
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
}
}
}).then([&rows] {
return std::move(rows);
});
});
});
repair_rows_on_wire rows;
size_t row_bytes = co_await get_repair_rows_size(row_list);
_metrics.tx_row_nr += row_list.size();
_metrics.tx_row_bytes += row_bytes;
for (repair_row& r : row_list) {
const auto& dk_with_hash = r.get_dk_with_hash();
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
if (rows.empty()) {
auto pk = dk_with_hash->dk.key();
last_dk_with_hash = dk_with_hash;
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
} else {
auto& row = rows.back();
if (last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0) {
row.push_mutation_fragment(std::move(r.get_frozen_mutation()));
} else {
auto pk = dk_with_hash->dk.key();
last_dk_with_hash = dk_with_hash;
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
}
}
co_await coroutine::maybe_yield();
}
co_return rows;
};
public:
@@ -1493,37 +1462,35 @@ private:
gms::inet_address remote_node,
unsigned node_idx,
rpc::source<repair_hash_with_cmd>& source) {
return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable {
return source().then([this, current_hashes, remote_node, node_idx] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
if (hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_hashes->insert(hash_cmd.hash);
return make_ready_future<stop_iteration>(stop_iteration::no);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
} else {
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
}
} else {
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
}
});
});
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_hashes->insert(hash_cmd.hash);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
co_return;
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
} else {
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
}
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {
return sink(repair_stream_cmd::get_full_row_hashes).then([&sink] {
return sink.flush();
}).handle_exception([&sink] (std::exception_ptr ep) {
return sink.close().then([ep = std::move(ep)] () mutable {
return make_exception_future<>(std::move(ep));
});
});
std::exception_ptr ep;
try {
co_await sink(repair_stream_cmd::get_full_row_hashes);
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
@@ -1781,28 +1748,29 @@ private:
needs_all_rows_t needs_all_rows,
rpc::sink<repair_hash_with_cmd>& sink,
gms::inet_address remote_node) {
return do_with(std::move(set_diff), [needs_all_rows, &sink] (repair_hash_set& set_diff) mutable {
std::exception_ptr ep;
try {
if (inject_rpc_stream_error) {
return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop"));
throw std::runtime_error("get_row_diff: Inject sender error in sink loop");
}
if (needs_all_rows) {
rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows");
return sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()}).then([&sink] () mutable {
return sink.flush();
});
co_await sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()});
co_await sink.flush();
co_return;
}
return do_for_each(set_diff, [&sink] (const repair_hash& hash) mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}).then([&sink] () mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
}).then([&sink] () mutable {
return sink.flush();
});
}).handle_exception([&sink] (std::exception_ptr ep) {
return sink.close().then([ep = std::move(ep)] () mutable {
return make_exception_future<>(std::move(ep));
});
});
for (const repair_hash& hash : set_diff) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
@@ -1836,11 +1804,9 @@ public:
// RPC handler
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
return with_gate(_gate, [this, set_diff = std::move(set_diff), needs_all_rows] () mutable {
return get_row_diff(std::move(set_diff), needs_all_rows).then([this] (std::list<repair_row> row_diff) {
return to_repair_rows_on_wire(std::move(row_diff));
});
});
auto gate_held = _gate.hold();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
co_return co_await to_repair_rows_on_wire(std::move(row_diff));
}
// RPC API
@@ -1848,28 +1814,22 @@ public:
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, shard_id dst_cpu_id) {
if (!set_diff.empty()) {
if (remote_node == myip()) {
return make_ready_future<>();
co_return;
}
size_t sz = set_diff.size();
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, dst_cpu_id, sz] (std::list<repair_row> row_diff) {
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
return do_with(std::move(row_diff), [this, remote_node, dst_cpu_id] (std::list<repair_row>& row_diff) {
return get_repair_rows_size(row_diff).then([this, remote_node, dst_cpu_id, &row_diff] (size_t row_bytes) mutable {
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, dst_cpu_id] (repair_rows_on_wire rows) {
return _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
});
});
});
});
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
}
return make_ready_future<>();
}
private:
@@ -1877,46 +1837,42 @@ private:
gms::inet_address remote_node,
unsigned node_idx,
rpc::source<repair_stream_cmd>& source) {
return repeat([this, remote_node, node_idx, &source] () mutable {
return source().then([this, remote_node, node_idx] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
if (status_opt) {
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
if (status == repair_stream_cmd::put_rows_done) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else if (status == repair_stream_cmd::error) {
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
} else {
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
}
} else {
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
}
});
});
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
if (status == repair_stream_cmd::put_rows_done) {
co_return;
} else if (status == repair_stream_cmd::error) {
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
} else {
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
}
future<> put_row_diff_sink_op(
repair_rows_on_wire rows,
rpc::sink<repair_row_on_wire_with_cmd>& sink,
gms::inet_address remote_node) {
return do_with(std::move(rows), [&sink] (repair_rows_on_wire& rows) mutable {
return do_for_each(rows, [&sink] (repair_row_on_wire& row) mutable {
std::exception_ptr ep;
try {
for (repair_row_on_wire& row : rows) {
rlogger.trace("put_row_diff: send row");
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}).then([&sink] () mutable {
rlogger.trace("put_row_diff: send empty row");
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}).then([&sink] () mutable {
rlogger.trace("put_row_diff: send done");
return sink.flush();
});
});
}).handle_exception([&sink] (std::exception_ptr ep) {
return sink.close().then([ep = std::move(ep)] () mutable {
return make_exception_future<>(std::move(ep));
});
});
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}
rlogger.trace("put_row_diff: send empty row");
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
rlogger.trace("put_row_diff: send done");
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
@@ -1977,11 +1933,10 @@ public:
// RPC handler
future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) {
return with_gate(_gate, [this, rows = std::move(rows)] () mutable {
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
return apply_rows_on_follower(std::move(rows));
});
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_await apply_rows_on_follower(std::move(rows));
}
};
@@ -1995,6 +1950,18 @@ get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
return set_diff;
}
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
gms::inet_address from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt);
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
gms::inet_address from,
@@ -2011,45 +1978,55 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_set_diff.insert(hash_cmd.hash);
return make_ready_future<stop_iteration>(stop_iteration::no);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
} else {
return repair_get_row_diff_with_rpc_stream_process_op_slow_path(repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, std::move(sink), std::move(source), error, current_set_diff, std::move(hash_cmd_opt));
}
}
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
gms::inet_address from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
if (inject_rpc_stream_error) {
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
throw std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop");
}
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
_metrics.rx_hashes_nr += current_set_diff.size();
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
return repair.invoke_on(dst_cpu_id, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] (repair_service& local_repair) {
repair_rows_on_wire rows_on_wire = co_await repair.invoke_on(dst_cpu_id, [&] (repair_service& local_repair) -> future<repair_rows_on_wire> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_started);
if (fp.get_owner_shard() == this_shard_id()) {
return rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
return rows;
});
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
} else {
return rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
return rows;
});
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
}
}).then([sink] (repair_rows_on_wire rows_on_wire) mutable {
if (rows_on_wire.empty()) {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
}
return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable {
return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}).then([sink] () mutable {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
});
});
}).then([sink] () mutable {
return sink.flush();
}).then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
if (rows_on_wire.empty()) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
} else {
for (repair_row_on_wire& row : rows_on_wire) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
}
co_await sink.flush();
co_return stop_iteration::no;
} else {
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
@@ -2109,29 +2086,22 @@ static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process
repair_stream_cmd status = std::get<0>(status_opt.value());
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
if (status == repair_stream_cmd::get_full_row_hashes) {
return repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) {
repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future<repair_hash_set> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) {
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
_metrics.tx_hashes_nr += hashes.size();
return hashes;
});
}).then([sink] (repair_hash_set hashes) mutable {
return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable {
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}).then([sink] () mutable {
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
});
});
}).then([sink] () mutable {
return sink.flush();
}).then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::no);
repair_hash_set hashes = co_await rm->get_full_row_hashes_handler();
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
_metrics.tx_hashes_nr += hashes.size();
co_return hashes;
});
for (const repair_hash& hash : hashes) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
co_return stop_iteration::no;
} else {
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
@@ -2143,36 +2113,41 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source) {
return do_with(false, repair_hash_set(), [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error, repair_hash_set& current_set_diff) mutable {
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error, &current_set_diff] () mutable {
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error, &current_set_diff] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
if (hash_cmd_opt) {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return repair_get_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_set_diff,
std::move(hash_cmd_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
bool error = false;
repair_hash_set current_set_diff = repair_hash_set();
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
co_await repair_get_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_set_diff,
std::move(hash_cmd_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_put_row_diff_with_rpc_stream_handler(
@@ -2183,36 +2158,41 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
uint32_t repair_meta_id,
rpc::sink<repair_stream_cmd> sink,
rpc::source<repair_row_on_wire_with_cmd> source) {
return do_with(false, repair_rows_on_wire(), [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error, repair_rows_on_wire& current_rows) mutable {
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &current_rows, &error] () mutable {
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &current_rows, &error] (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) mutable {
if (row_opt) {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return repair_put_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_rows,
std::move(row_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
return sink(repair_stream_cmd::error).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
std::exception_ptr outer_exception;
bool error = false;
repair_rows_on_wire current_rows = repair_rows_on_wire();
try {
while (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
co_await repair_put_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_rows,
std::move(row_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_stream_cmd::error);
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
@@ -2223,35 +2203,42 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source) {
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] () mutable {
return do_with(false, [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error) mutable {
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
if (status_opt) {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
std::move(status_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
bool error = false;
std::exception_ptr ep;
try {
if (error) {
continue;
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
auto stop = co_await repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
std::move(status_opt));
if (stop) {
break;
}
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {