streaming: Make the mutation readers when streaming starts
Currenlty we make the mutation readers for streaming at different
time point, i.e.,
do_for_each(_ranges.begin(), _ranges.end(), [] (auto range) {
make a mutation reader for this range
read mutations from the reader and send
})
If there are write workload in the background, we will stream extra
data, since the later the reader is made the more data we need to send.
Fix it by making all the readers before starting to stream.
Fixes #1815
Message-Id: <1479341474-1364-2-git-send-email-asias@scylladb.com>
This commit is contained in:
@@ -73,6 +73,7 @@ struct send_info {
|
||||
size_t mutations_nr{0};
|
||||
semaphore mutations_done{0};
|
||||
bool error_logged = false;
|
||||
mutation_reader reader;
|
||||
send_info(database& db_, utils::UUID plan_id_, utils::UUID cf_id_,
|
||||
query::partition_range pr_, net::messaging_service::msg_addr id_,
|
||||
uint32_t dst_cpu_id_)
|
||||
@@ -82,18 +83,21 @@ struct send_info {
|
||||
, pr(pr_)
|
||||
, id(id_)
|
||||
, dst_cpu_id(dst_cpu_id_) {
|
||||
auto& cf = db.find_column_family(this->cf_id);
|
||||
// reader = std::move(cf.make_streaming_reader(cf.schema(), this->pr));
|
||||
reader = cf.make_streaming_reader(cf.schema(), this->pr);
|
||||
}
|
||||
};
|
||||
|
||||
future<> do_send_mutations(auto si, auto fm, bool fragmented) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable {
|
||||
future<> do_send_mutations(auto& si, auto fm, bool fragmented) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([&si, fragmented, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] {
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([&si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
}).handle_exception([&si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
@@ -108,17 +112,16 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) {
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto smopt) {
|
||||
return do_with(std::move(si), [] (auto& si) {
|
||||
return repeat([&si] () {
|
||||
return si->reader().then([&si] (auto smopt) {
|
||||
if (smopt && si->db.column_family_exists(si->cf_id)) {
|
||||
size_t fragment_size = default_frozen_fragment_size;
|
||||
// Mutations cannot be sent fragmented if the receiving side doesn't support that.
|
||||
if (!service::get_local_storage_service().cluster_supports_large_partitions()) {
|
||||
fragment_size = std::numeric_limits<size_t>::max();
|
||||
}
|
||||
return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) {
|
||||
return fragment_and_freeze(std::move(*smopt), [&si] (auto fm, bool fragmented) {
|
||||
si->mutations_nr++;
|
||||
return do_send_mutations(si, std::move(fm), fragmented);
|
||||
}, fragment_size).then([] { return stop_iteration::no; });
|
||||
@@ -126,9 +129,9 @@ future<> send_mutations(auto si) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).then([&si] {
|
||||
return si->mutations_done.wait(si->mutations_nr);
|
||||
});
|
||||
}).then([si] {
|
||||
return si->mutations_done.wait(si->mutations_nr);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -149,9 +152,23 @@ void stream_transfer_task::start() {
|
||||
[this, plan_id, cf_id, id, dst_cpu_id, pr] (unsigned shard) {
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}, invoke_on shard={}", plan_id, cf_id, shard);
|
||||
return this->session->get_db().invoke_on(shard, [plan_id, cf_id, id, dst_cpu_id, pr] (database& db) {
|
||||
auto si = make_foreign(make_shared<send_info>(db, plan_id, cf_id, pr, id, dst_cpu_id));
|
||||
return make_ready_future<foreign_ptr<shared_ptr<send_info>>>(std::move(si));
|
||||
}).then([this, shard] (auto si) {
|
||||
this->_send_infos.emplace(shard, std::move(si));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}).then([this, plan_id, cf_id, id] {
|
||||
auto&cf = this->session->get_local_db().find_column_family(cf_id);
|
||||
sslog.info("[Stream #{}] stream_transfer_task: cf_id={}, ks={}, cf={}, send_info.size={} to peer {}",
|
||||
plan_id, cf_id, cf.schema()->ks_name(), cf.schema()->cf_name(), this->_send_infos.size(), id);
|
||||
return do_for_each(this->_send_infos, [this] (auto& item) mutable {
|
||||
auto shard = item.first;
|
||||
auto si = std::move(item.second);
|
||||
return this->session->get_db().invoke_on(shard, [si = std::move(si)] (database& db) mutable {
|
||||
// Send mutations on related shards, do not capture this
|
||||
auto si = make_lw_shared<send_info>(db, plan_id, cf_id, pr, id, dst_cpu_id);
|
||||
return send_mutations(si);
|
||||
return send_mutations(std::move(si));
|
||||
});
|
||||
});
|
||||
}).then([this, plan_id, cf_id, id] {
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
namespace streaming {
|
||||
|
||||
class stream_session;
|
||||
class send_info;
|
||||
|
||||
/**
|
||||
* StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
|
||||
@@ -59,6 +60,7 @@ private:
|
||||
// A stream_transfer_task always contains the same range to stream
|
||||
std::vector<nonwrapping_range<dht::token>> _ranges;
|
||||
long _total_size;
|
||||
std::unordered_multimap<unsigned, foreign_ptr<shared_ptr<send_info>>> _send_infos;
|
||||
public:
|
||||
using UUID = utils::UUID;
|
||||
stream_transfer_task(stream_transfer_task&&) = default;
|
||||
|
||||
Reference in New Issue
Block a user