Merge 'repair: row_level: coroutinize some slow-path functions' from Avi Kivity
This series coroutinizes up some functions in repair/row_level.cc. This enhances
readability and reduces bloat:
```
size build/release/repair/row_level.o.{before,after}
text data bss dec hex filename
1650619 48 524 1651191 1931f7 build/release/repair/row_level.o.before
1604610 48 524 1605182 187e3e build/release/repair/row_level.o.after
```
46kB of text were saved.
Functions that only touch a single mutation fragment were not coroutinized to avoid
adding a allocation in a fast path. In one case a function was split into a fast path and a
slow path.
Clean-up series, backport not needed.
Closes scylladb/scylladb#20283
* github.com:scylladb/scylladb:
repair: row_level: restore indentation
repair: row_level: coroutinize repair_meta::get_full_row_hashes_sink_op()
repair: row_level: coroutinize repair_meta::get_full_row_hashes_source_op()
repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_handler()
repair: row_level: coroutinize repair_put_row_diff_with_rpc_stream_handler()
repair: row_level: coroutinize repair_get_row_diff_with_rpc_stream_handler()
repair: row_level: coroutinize repair_get_full_row_hashes_with_rpc_stream_process()
repair: row_level: coroutinize repair_get_row_diff_with_rpc_stream_process_op_slow_path()
repair: row_level: split repair_get_row_diff_with_rpc_stream_process_op() into fast and slow paths
repair: row_level: coroutinize repair_meta::put_row_diff_handler()
repair: row_level: coroutinize repair_meta::put_row_diff_sink_op()
repair: row_level: coroutinize repair_meta::put_row_diff_source_op()
repair: row_level: coroutinize repair_meta::put_row_diff()
repair: row_level: coroutinize repair_meta::get_row_diff_handler()
repair: row_level: coroutinize repair_meta::get_row_diff_sink_op()
repair: row_level: coroutinize repair_meta::to_repair_rows_on_wire()
repair: row_level: coroutinize repair_meta::do_apply_rows()
repair: row_level: coroutinize repair_meta::copy_rows_from_working_row_buf_within_set_diff()
repair: row_level: coroutinize repair_meta::copy_rows_from_working_row_buf()
repair: row_level: coroutinize repair_meta::row_buf_csum()
repair: row_level: coroutinize repair_meta::get_repairs_row_size()
repair: row_level: coroutinize repair_meta::set_estimated_partitions()
repair: row_level: coroutinize repair_meta::get_estimated_partitions()
repair: row_level: coroutinize repair_meta::do_estimate_partitions_on_local_shard()
repair: row_level: coroutinize repair_reader::close()
repair: row_level: coroutinize repair_reader::end_of_stream()
repair: row_level: coroutinize sink_source_for_repair::close()
repair: row_level: coroutinize sink_source_for_repair::get_sink_source()
This commit is contained in:
@@ -45,6 +45,7 @@
|
||||
#include "streaming/consumer.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/all.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
@@ -145,39 +146,32 @@ public:
|
||||
future<std::tuple<sink_type, source_type>> get_sink_source(gms::inet_address remote_node, unsigned node_idx, std::optional<shard_id> dst_cpu_id) {
|
||||
using value_type = std::tuple<sink_type, source_type>;
|
||||
if (_sinks[node_idx] && _sources[node_idx]) {
|
||||
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
|
||||
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
|
||||
}
|
||||
if (_sinks[node_idx] || _sources[node_idx]) {
|
||||
return make_exception_future<value_type>(std::runtime_error(format("sink or source is missing for node {}", remote_node)));
|
||||
throw std::runtime_error(format("sink or source is missing for node {}", remote_node));
|
||||
}
|
||||
return _fn(_repair_meta_id, dst_cpu_id, netw::messaging_service::msg_addr(remote_node)).then_unpack([this, node_idx] (rpc::sink<SinkType> sink, rpc::source<SourceType> source) mutable {
|
||||
_sinks[node_idx].emplace(std::move(sink));
|
||||
_sources[node_idx].emplace(std::move(source));
|
||||
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
|
||||
});
|
||||
auto [sink, source] = co_await _fn(_repair_meta_id, dst_cpu_id, netw::messaging_service::msg_addr(remote_node));
|
||||
_sinks[node_idx].emplace(std::move(sink));
|
||||
_sources[node_idx].emplace(std::move(source));
|
||||
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
|
||||
}
|
||||
future<> close() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable {
|
||||
co_await coroutine::parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) -> future<> {
|
||||
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
|
||||
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
|
||||
return f.finally([this, node_idx] {
|
||||
f = co_await coroutine::as_future(std::move(f));
|
||||
try {
|
||||
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
|
||||
if (source_opt && !_sources_closed[node_idx]) {
|
||||
return repeat([&source_opt] () mutable {
|
||||
while (co_await (*source_opt)()) {
|
||||
// Keep reading source until end of stream
|
||||
return (*source_opt)().then([] (std::optional<std::tuple<SourceType>> opt) mutable {
|
||||
if (opt) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
}).handle_exception([] (std::exception_ptr ep) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
} catch (...) {
|
||||
// Ignore the exception as we're just draining *source_opt
|
||||
}
|
||||
f.get();
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -378,18 +372,16 @@ repair_reader::read_mutation_fragment() {
|
||||
}
|
||||
|
||||
future<> repair_reader::on_end_of_stream() noexcept {
|
||||
return _reader.close().then([this] {
|
||||
_permit.release_base_resources();
|
||||
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
|
||||
_reader_handle.reset();
|
||||
});
|
||||
co_await _reader.close();
|
||||
_permit.release_base_resources();
|
||||
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
|
||||
_reader_handle.reset();
|
||||
}
|
||||
|
||||
future<> repair_reader::close() noexcept {
|
||||
return _reader.close().then([this] {
|
||||
_permit.release_base_resources();
|
||||
_reader_handle.reset();
|
||||
});
|
||||
co_await _reader.close();
|
||||
_permit.release_base_resources();
|
||||
_reader_handle.reset();
|
||||
}
|
||||
|
||||
void repair_reader::set_current_dk(const dht::decorated_key& key) {
|
||||
@@ -1023,49 +1015,41 @@ private:
|
||||
|
||||
future<uint64_t> do_estimate_partitions_on_local_shard() {
|
||||
auto& cf = _db.local().find_column_family(_schema->id());
|
||||
return do_with(cf.get_sstables(), uint64_t(0), [this] (lw_shared_ptr<const sstable_list>& sstables, uint64_t& partition_count) {
|
||||
return do_for_each(*sstables, [this, &partition_count] (const sstables::shared_sstable& sst) mutable {
|
||||
partition_count += sst->estimated_keys_for_range(_range);
|
||||
}).then([&partition_count] {
|
||||
return partition_count;
|
||||
});
|
||||
});
|
||||
lw_shared_ptr<const sstable_list> sstables = cf.get_sstables();
|
||||
uint64_t partition_count = 0;
|
||||
for (const sstables::shared_sstable& sst : *sstables) {
|
||||
partition_count += sst->estimated_keys_for_range(_range);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return partition_count;
|
||||
}
|
||||
|
||||
future<uint64_t> get_estimated_partitions() {
|
||||
return with_gate(_gate, [this] {
|
||||
auto gate_held = _gate.hold();
|
||||
if (_repair_master || _same_sharding_config || _is_tablet) {
|
||||
return do_estimate_partitions_on_local_shard();
|
||||
co_return co_await do_estimate_partitions_on_local_shard();
|
||||
} else {
|
||||
return do_with(dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard), uint64_t(0), uint64_t(0), [this] (auto& sharder, auto& partitions_sum, auto& subranges) mutable {
|
||||
return repeat([this, &sharder, &partitions_sum, &subranges] () mutable {
|
||||
auto shard_range = sharder.next();
|
||||
if (shard_range) {
|
||||
++subranges;
|
||||
return do_estimate_partitions_on_all_shards(*shard_range).then([&partitions_sum] (uint64_t partitions) mutable {
|
||||
partitions_sum += partitions;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
}).then([this, &partitions_sum, &subranges] {
|
||||
_local_range_estimation = local_range_estimation {
|
||||
.master_subranges_count = subranges,
|
||||
.partitions_count = partitions_sum
|
||||
};
|
||||
return partitions_sum;
|
||||
});
|
||||
});
|
||||
auto sharder = dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard);
|
||||
auto partitions_sum = uint64_t(0);
|
||||
auto subranges = uint64_t(0);
|
||||
for (auto shard_range = sharder.next(); shard_range; shard_range = sharder.next()) {
|
||||
++subranges;
|
||||
auto partitions = co_await do_estimate_partitions_on_all_shards(*shard_range);
|
||||
partitions_sum += partitions;
|
||||
}
|
||||
_local_range_estimation = local_range_estimation {
|
||||
.master_subranges_count = subranges,
|
||||
.partitions_count = partitions_sum
|
||||
};
|
||||
co_return partitions_sum;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> set_estimated_partitions(uint64_t estimated_partitions) {
|
||||
return with_gate(_gate, [this, estimated_partitions] {
|
||||
_estimated_partitions = estimated_partitions;
|
||||
_repair_writer->set_estimated_partitions(_estimated_partitions);
|
||||
});
|
||||
auto gate_held = _gate.hold();
|
||||
_estimated_partitions = estimated_partitions;
|
||||
_repair_writer->set_estimated_partitions(_estimated_partitions);
|
||||
co_return;
|
||||
}
|
||||
|
||||
dht::static_sharder make_remote_sharder() {
|
||||
@@ -1082,13 +1066,12 @@ private:
|
||||
}
|
||||
|
||||
future<size_t> get_repair_rows_size(const std::list<repair_row>& rows) const {
|
||||
return do_with(size_t(0), [&rows] (size_t& sz) {
|
||||
return do_for_each(rows, [&sz] (const repair_row& r) mutable {
|
||||
sz += r.size();
|
||||
}).then([&sz] {
|
||||
return sz;
|
||||
});
|
||||
});
|
||||
size_t sz = 0;
|
||||
for (const repair_row& r : rows) {
|
||||
sz += r.size();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return sz;
|
||||
}
|
||||
|
||||
// Get the size of rows in _row_buf
|
||||
@@ -1098,13 +1081,12 @@ private:
|
||||
|
||||
// return the combined checksum of rows in _row_buf
|
||||
future<repair_hash> row_buf_csum() {
|
||||
return do_with(repair_hash(), [this] (repair_hash& combined) {
|
||||
return do_for_each(_row_buf, [&combined] (repair_row& r) mutable {
|
||||
combined.add(r.hash());
|
||||
}).then([&combined] {
|
||||
return combined;
|
||||
});
|
||||
});
|
||||
repair_hash combined;
|
||||
for (repair_row& r : _row_buf) {
|
||||
combined.add(r.hash());
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return combined;
|
||||
}
|
||||
|
||||
void handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
@@ -1318,27 +1300,24 @@ private:
|
||||
|
||||
future<std::list<repair_row>>
|
||||
copy_rows_from_working_row_buf() {
|
||||
return do_with(std::list<repair_row>(), [this] (std::list<repair_row>& rows) {
|
||||
return do_for_each(_working_row_buf, [&rows] (const repair_row& r) {
|
||||
rows.push_back(r);
|
||||
}).then([&rows] {
|
||||
return std::move(rows);
|
||||
});
|
||||
});
|
||||
std::list<repair_row> rows;
|
||||
for (const repair_row& r : _working_row_buf) {
|
||||
rows.push_back(r);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return rows;
|
||||
}
|
||||
|
||||
future<std::list<repair_row>>
|
||||
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
|
||||
return do_with(std::list<repair_row>(), std::move(set_diff),
|
||||
[this] (std::list<repair_row>& rows, repair_hash_set& set_diff) {
|
||||
return do_for_each(_working_row_buf, [&set_diff, &rows] (const repair_row& r) {
|
||||
if (set_diff.contains(r.hash())) {
|
||||
rows.push_back(r);
|
||||
}
|
||||
}).then([&rows] {
|
||||
return std::move(rows);
|
||||
});
|
||||
});
|
||||
std::list<repair_row> rows;
|
||||
for (const repair_row& r : _working_row_buf) {
|
||||
if (set_diff.contains(r.hash())) {
|
||||
rows.push_back(r);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return rows;
|
||||
}
|
||||
|
||||
// Return rows in the _working_row_buf with hash within the given sef_diff
|
||||
@@ -1356,31 +1335,24 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer->sem(), 1, [this, update_buf, &row_diff] {
|
||||
_repair_writer->create_writer();
|
||||
return repeat([this, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
repair_row& r = row_diff.front();
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
r.reset_mutation_fragment();
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer->do_write(std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
row_diff.pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
future<> do_apply_rows(std::list<repair_row> row_diff, update_working_row_buf update_buf) {
|
||||
auto sem_units = co_await get_units(_repair_writer->sem(), 1);
|
||||
_repair_writer->create_writer();
|
||||
while (!row_diff.empty()) {
|
||||
repair_row& r = row_diff.front();
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
r.reset_mutation_fragment();
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
co_await _repair_writer->do_write(std::move(dk_with_hash), std::move(mf));
|
||||
row_diff.pop_front();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
@@ -1440,33 +1412,30 @@ private:
|
||||
|
||||
future<repair_rows_on_wire> to_repair_rows_on_wire(std::list<repair_row> row_list) {
|
||||
lw_shared_ptr<const decorated_key_with_hash> last_dk_with_hash;
|
||||
return do_with(repair_rows_on_wire(), std::move(row_list), std::move(last_dk_with_hash),
|
||||
[this] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& last_dk_with_hash) {
|
||||
return get_repair_rows_size(row_list).then([this, &rows, &row_list, &last_dk_with_hash] (size_t row_bytes) {
|
||||
_metrics.tx_row_nr += row_list.size();
|
||||
_metrics.tx_row_bytes += row_bytes;
|
||||
return do_for_each(row_list, [this, &rows, &last_dk_with_hash] (repair_row& r) {
|
||||
const auto& dk_with_hash = r.get_dk_with_hash();
|
||||
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
|
||||
if (rows.empty()) {
|
||||
auto pk = dk_with_hash->dk.key();
|
||||
last_dk_with_hash = dk_with_hash;
|
||||
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
|
||||
} else {
|
||||
auto& row = rows.back();
|
||||
if (last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0) {
|
||||
row.push_mutation_fragment(std::move(r.get_frozen_mutation()));
|
||||
} else {
|
||||
auto pk = dk_with_hash->dk.key();
|
||||
last_dk_with_hash = dk_with_hash;
|
||||
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
|
||||
}
|
||||
}
|
||||
}).then([&rows] {
|
||||
return std::move(rows);
|
||||
});
|
||||
});
|
||||
});
|
||||
repair_rows_on_wire rows;
|
||||
size_t row_bytes = co_await get_repair_rows_size(row_list);
|
||||
_metrics.tx_row_nr += row_list.size();
|
||||
_metrics.tx_row_bytes += row_bytes;
|
||||
for (repair_row& r : row_list) {
|
||||
const auto& dk_with_hash = r.get_dk_with_hash();
|
||||
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
|
||||
if (rows.empty()) {
|
||||
auto pk = dk_with_hash->dk.key();
|
||||
last_dk_with_hash = dk_with_hash;
|
||||
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
|
||||
} else {
|
||||
auto& row = rows.back();
|
||||
if (last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0) {
|
||||
row.push_mutation_fragment(std::move(r.get_frozen_mutation()));
|
||||
} else {
|
||||
auto pk = dk_with_hash->dk.key();
|
||||
last_dk_with_hash = dk_with_hash;
|
||||
rows.push_back(repair_row_on_wire(std::move(pk), {std::move(r.get_frozen_mutation())}));
|
||||
}
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return rows;
|
||||
};
|
||||
|
||||
public:
|
||||
@@ -1493,37 +1462,35 @@ private:
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_hash_with_cmd>& source) {
|
||||
return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable {
|
||||
return source().then([this, current_hashes, remote_node, node_idx] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
|
||||
current_hashes->insert(hash_cmd.hash);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
|
||||
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
|
||||
} else {
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
} else {
|
||||
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
|
||||
}
|
||||
});
|
||||
});
|
||||
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
|
||||
current_hashes->insert(hash_cmd.hash);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
|
||||
co_return;
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
|
||||
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
|
||||
} else {
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
}
|
||||
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
|
||||
}
|
||||
|
||||
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {
|
||||
return sink(repair_stream_cmd::get_full_row_hashes).then([&sink] {
|
||||
return sink.flush();
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
co_await sink(repair_stream_cmd::get_full_row_hashes);
|
||||
co_await sink.flush();
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink.close();
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -1781,28 +1748,29 @@ private:
|
||||
needs_all_rows_t needs_all_rows,
|
||||
rpc::sink<repair_hash_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, &sink] (repair_hash_set& set_diff) mutable {
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop"));
|
||||
throw std::runtime_error("get_row_diff: Inject sender error in sink loop");
|
||||
}
|
||||
if (needs_all_rows) {
|
||||
rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows");
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()}).then([&sink] () mutable {
|
||||
return sink.flush();
|
||||
});
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()});
|
||||
co_await sink.flush();
|
||||
co_return;
|
||||
}
|
||||
return do_for_each(set_diff, [&sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([&sink] () mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
}).then([&sink] () mutable {
|
||||
return sink.flush();
|
||||
});
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
for (const repair_hash& hash : set_diff) {
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
co_await sink.flush();
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink.close();
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -1836,11 +1804,9 @@ public:
|
||||
|
||||
// RPC handler
|
||||
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
|
||||
return with_gate(_gate, [this, set_diff = std::move(set_diff), needs_all_rows] () mutable {
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return to_repair_rows_on_wire(std::move(row_diff));
|
||||
});
|
||||
});
|
||||
auto gate_held = _gate.hold();
|
||||
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
|
||||
co_return co_await to_repair_rows_on_wire(std::move(row_diff));
|
||||
}
|
||||
|
||||
// RPC API
|
||||
@@ -1848,28 +1814,22 @@ public:
|
||||
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, shard_id dst_cpu_id) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == myip()) {
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
size_t sz = set_diff.size();
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, dst_cpu_id, sz] (std::list<repair_row> row_diff) {
|
||||
if (row_diff.size() != sz) {
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
return do_with(std::move(row_diff), [this, remote_node, dst_cpu_id] (std::list<repair_row>& row_diff) {
|
||||
return get_repair_rows_size(row_diff).then([this, remote_node, dst_cpu_id, &row_diff] (size_t row_bytes) mutable {
|
||||
stats().tx_row_nr += row_diff.size();
|
||||
stats().tx_row_nr_peer[remote_node] += row_diff.size();
|
||||
stats().tx_row_bytes += row_bytes;
|
||||
stats().rpc_call_nr++;
|
||||
return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, dst_cpu_id] (repair_rows_on_wire rows) {
|
||||
return _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
|
||||
if (row_diff.size() != sz) {
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
size_t row_bytes = co_await get_repair_rows_size(row_diff);
|
||||
stats().tx_row_nr += row_diff.size();
|
||||
stats().tx_row_nr_peer[remote_node] += row_diff.size();
|
||||
stats().tx_row_bytes += row_bytes;
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
|
||||
co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -1877,46 +1837,42 @@ private:
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_stream_cmd>& source) {
|
||||
return repeat([this, remote_node, node_idx, &source] () mutable {
|
||||
return source().then([this, remote_node, node_idx] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
|
||||
if (status_opt) {
|
||||
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
|
||||
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
|
||||
if (status == repair_stream_cmd::put_rows_done) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} else if (status == repair_stream_cmd::error) {
|
||||
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
|
||||
} else {
|
||||
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
} else {
|
||||
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
|
||||
}
|
||||
});
|
||||
});
|
||||
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
|
||||
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
|
||||
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
|
||||
if (status == repair_stream_cmd::put_rows_done) {
|
||||
co_return;
|
||||
} else if (status == repair_stream_cmd::error) {
|
||||
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
|
||||
} else {
|
||||
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
|
||||
}
|
||||
}
|
||||
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
|
||||
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
|
||||
}
|
||||
|
||||
future<> put_row_diff_sink_op(
|
||||
repair_rows_on_wire rows,
|
||||
rpc::sink<repair_row_on_wire_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(rows), [&sink] (repair_rows_on_wire& rows) mutable {
|
||||
return do_for_each(rows, [&sink] (repair_row_on_wire& row) mutable {
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
for (repair_row_on_wire& row : rows) {
|
||||
rlogger.trace("put_row_diff: send row");
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}).then([&sink] () mutable {
|
||||
rlogger.trace("put_row_diff: send empty row");
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}).then([&sink] () mutable {
|
||||
rlogger.trace("put_row_diff: send done");
|
||||
return sink.flush();
|
||||
});
|
||||
});
|
||||
}).handle_exception([&sink] (std::exception_ptr ep) {
|
||||
return sink.close().then([ep = std::move(ep)] () mutable {
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}
|
||||
rlogger.trace("put_row_diff: send empty row");
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
rlogger.trace("put_row_diff: send done");
|
||||
co_await sink.flush();
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink.close();
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -1977,11 +1933,10 @@ public:
|
||||
|
||||
// RPC handler
|
||||
future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) {
|
||||
return with_gate(_gate, [this, rows = std::move(rows)] () mutable {
|
||||
auto& cf = _db.local().find_column_family(_schema->id());
|
||||
cf.update_off_strategy_trigger();
|
||||
return apply_rows_on_follower(std::move(rows));
|
||||
});
|
||||
auto gate_held = _gate.hold();
|
||||
auto& cf = _db.local().find_column_family(_schema->id());
|
||||
cf.update_off_strategy_trigger();
|
||||
co_await apply_rows_on_follower(std::move(rows));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1995,6 +1950,18 @@ get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
|
||||
return set_diff;
|
||||
}
|
||||
|
||||
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
|
||||
sharded<repair_service>& repair,
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t dst_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source,
|
||||
bool &error,
|
||||
repair_hash_set& current_set_diff,
|
||||
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt);
|
||||
|
||||
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
sharded<repair_service>& repair,
|
||||
gms::inet_address from,
|
||||
@@ -2011,45 +1978,55 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
|
||||
current_set_diff.insert(hash_cmd.hash);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
|
||||
} else {
|
||||
return repair_get_row_diff_with_rpc_stream_process_op_slow_path(repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, std::move(sink), std::move(source), error, current_set_diff, std::move(hash_cmd_opt));
|
||||
}
|
||||
}
|
||||
|
||||
static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
|
||||
sharded<repair_service>& repair,
|
||||
gms::inet_address from,
|
||||
uint32_t src_cpu_id,
|
||||
uint32_t dst_cpu_id,
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source,
|
||||
bool &error,
|
||||
repair_hash_set& current_set_diff,
|
||||
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
|
||||
throw std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop");
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
|
||||
return repair.invoke_on(dst_cpu_id, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] (repair_service& local_repair) {
|
||||
repair_rows_on_wire rows_on_wire = co_await repair.invoke_on(dst_cpu_id, [&] (repair_service& local_repair) -> future<repair_rows_on_wire> {
|
||||
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
|
||||
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_started);
|
||||
if (fp.get_owner_shard() == this_shard_id()) {
|
||||
return rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
|
||||
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
|
||||
return rows;
|
||||
});
|
||||
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows));
|
||||
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
|
||||
co_return rows;
|
||||
} else {
|
||||
return rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
|
||||
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
|
||||
return rows;
|
||||
});
|
||||
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows));
|
||||
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
|
||||
co_return rows;
|
||||
}
|
||||
}).then([sink] (repair_rows_on_wire rows_on_wire) mutable {
|
||||
if (rows_on_wire.empty()) {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
}
|
||||
return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable {
|
||||
return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}).then([sink] () mutable {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
});
|
||||
});
|
||||
}).then([sink] () mutable {
|
||||
return sink.flush();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
if (rows_on_wire.empty()) {
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
} else {
|
||||
for (repair_row_on_wire& row : rows_on_wire) {
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
|
||||
}
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
|
||||
}
|
||||
co_await sink.flush();
|
||||
co_return stop_iteration::no;
|
||||
} else {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
|
||||
throw std::runtime_error("Got unexpected repair_stream_cmd");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2109,29 +2086,22 @@ static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process
|
||||
repair_stream_cmd status = std::get<0>(status_opt.value());
|
||||
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
|
||||
if (status == repair_stream_cmd::get_full_row_hashes) {
|
||||
return repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) {
|
||||
repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future<repair_hash_set> {
|
||||
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
|
||||
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
|
||||
return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) {
|
||||
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
}).then([sink] (repair_hash_set hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable {
|
||||
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([sink] () mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
});
|
||||
});
|
||||
}).then([sink] () mutable {
|
||||
return sink.flush();
|
||||
}).then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
repair_hash_set hashes = co_await rm->get_full_row_hashes_handler();
|
||||
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
co_return hashes;
|
||||
});
|
||||
for (const repair_hash& hash : hashes) {
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
|
||||
co_await sink.flush();
|
||||
co_return stop_iteration::no;
|
||||
} else {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got unexpected repair_stream_cmd"));
|
||||
throw std::runtime_error("Got unexpected repair_stream_cmd");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2143,36 +2113,41 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source) {
|
||||
return do_with(false, repair_hash_set(), [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error, repair_hash_set& current_set_diff) mutable {
|
||||
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable {
|
||||
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_get_row_diff_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
|
||||
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
bool error = false;
|
||||
repair_hash_set current_set_diff = repair_hash_set();
|
||||
std::exception_ptr outer_exception;
|
||||
try {
|
||||
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
if (error) {
|
||||
continue;
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
co_await repair_get_row_diff_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt));
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()});
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
outer_exception = std::current_exception();
|
||||
}
|
||||
co_await sink.close();
|
||||
if (outer_exception) {
|
||||
std::rethrow_exception(outer_exception);
|
||||
}
|
||||
}
|
||||
|
||||
static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
@@ -2183,36 +2158,41 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_stream_cmd> sink,
|
||||
rpc::source<repair_row_on_wire_with_cmd> source) {
|
||||
return do_with(false, repair_rows_on_wire(), [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error, repair_rows_on_wire& current_rows) mutable {
|
||||
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] () mutable {
|
||||
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) mutable {
|
||||
if (row_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_put_row_diff_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
|
||||
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
std::exception_ptr outer_exception;
|
||||
bool error = false;
|
||||
repair_rows_on_wire current_rows = repair_rows_on_wire();
|
||||
try {
|
||||
while (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = co_await source()) {
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
if (error) {
|
||||
continue;
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
co_await repair_put_row_diff_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
current_rows,
|
||||
std::move(row_opt));
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink(repair_stream_cmd::error);
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
outer_exception = std::current_exception();
|
||||
}
|
||||
co_await sink.close();
|
||||
if (outer_exception) {
|
||||
std::rethrow_exception(outer_exception);
|
||||
}
|
||||
}
|
||||
|
||||
static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
@@ -2223,35 +2203,42 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_hash_with_cmd> sink,
|
||||
rpc::source<repair_stream_cmd> source) {
|
||||
return repeat([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] () mutable {
|
||||
return do_with(false, [&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source] (bool& error) mutable {
|
||||
return source().then([&repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, sink, source, &error] (std::optional<std::tuple<repair_stream_cmd>> status_opt) mutable {
|
||||
if (status_opt) {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([from, repair_meta_id, sink, &error] (std::exception_ptr ep) mutable {
|
||||
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
std::exception_ptr outer_exception;
|
||||
try {
|
||||
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
|
||||
bool error = false;
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
if (error) {
|
||||
continue;
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
auto stop = co_await repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
|
||||
src_cpu_id,
|
||||
dst_cpu_id,
|
||||
repair_meta_id,
|
||||
sink,
|
||||
source,
|
||||
error,
|
||||
std::move(status_opt));
|
||||
if (stop) {
|
||||
break;
|
||||
}
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
|
||||
error = true;
|
||||
}
|
||||
if (ep) {
|
||||
co_await sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()});
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
outer_exception = std::current_exception();
|
||||
}
|
||||
co_await sink.close();
|
||||
if (outer_exception) {
|
||||
std::rethrow_exception(outer_exception);
|
||||
}
|
||||
}
|
||||
|
||||
future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {
|
||||
|
||||
Reference in New Issue
Block a user