streaming: Make the mutation readers when streaming starts

Currenlty we make the mutation readers for streaming at different
time point, i.e.,

do_for_each(_ranges.begin(), _ranges.end(), [] (auto range) {
     make a mutation reader for this range
     read mutations from the reader and send
})

If there are write workload in the background, we will stream extra
data, since the later the reader is made the more data we need to send.

Fix it by making all the readers before starting to stream.

Fixes #1815
Message-Id: <1479341474-1364-2-git-send-email-asias@scylladb.com>
This commit is contained in:
Asias He
2016-11-17 00:11:13 +00:00
committed by Pekka Enberg
parent ae0a2935b4
commit dc50ce0ce5
2 changed files with 32 additions and 13 deletions

View File

@@ -73,6 +73,7 @@ struct send_info {
size_t mutations_nr{0};
semaphore mutations_done{0};
bool error_logged = false;
mutation_reader reader;
send_info(database& db_, utils::UUID plan_id_, utils::UUID cf_id_,
query::partition_range pr_, net::messaging_service::msg_addr id_,
uint32_t dst_cpu_id_)
@@ -82,18 +83,21 @@ struct send_info {
, pr(pr_)
, id(id_)
, dst_cpu_id(dst_cpu_id_) {
auto& cf = db.find_column_family(this->cf_id);
// reader = std::move(cf.make_streaming_reader(cf.schema(), this->pr));
reader = cf.make_streaming_reader(cf.schema(), this->pr);
}
};
future<> do_send_mutations(auto si, auto fm, bool fragmented) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable {
future<> do_send_mutations(auto& si, auto fm, bool fragmented) {
return get_local_stream_manager().mutation_send_limiter().wait().then([&si, fragmented, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] {
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([&si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
}).handle_exception([&si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
@@ -108,17 +112,16 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) {
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto smopt) {
return do_with(std::move(si), [] (auto& si) {
return repeat([&si] () {
return si->reader().then([&si] (auto smopt) {
if (smopt && si->db.column_family_exists(si->cf_id)) {
size_t fragment_size = default_frozen_fragment_size;
// Mutations cannot be sent fragmented if the receiving side doesn't support that.
if (!service::get_local_storage_service().cluster_supports_large_partitions()) {
fragment_size = std::numeric_limits<size_t>::max();
}
return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) {
return fragment_and_freeze(std::move(*smopt), [&si] (auto fm, bool fragmented) {
si->mutations_nr++;
return do_send_mutations(si, std::move(fm), fragmented);
}, fragment_size).then([] { return stop_iteration::no; });
@@ -126,9 +129,9 @@ future<> send_mutations(auto si) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([&si] {
return si->mutations_done.wait(si->mutations_nr);
});
}).then([si] {
return si->mutations_done.wait(si->mutations_nr);
});
}
@@ -149,9 +152,23 @@ void stream_transfer_task::start() {
[this, plan_id, cf_id, id, dst_cpu_id, pr] (unsigned shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}, invoke_on shard={}", plan_id, cf_id, shard);
return this->session->get_db().invoke_on(shard, [plan_id, cf_id, id, dst_cpu_id, pr] (database& db) {
auto si = make_foreign(make_shared<send_info>(db, plan_id, cf_id, pr, id, dst_cpu_id));
return make_ready_future<foreign_ptr<shared_ptr<send_info>>>(std::move(si));
}).then([this, shard] (auto si) {
this->_send_infos.emplace(shard, std::move(si));
return make_ready_future<>();
});
});
}).then([this, plan_id, cf_id, id] {
auto&cf = this->session->get_local_db().find_column_family(cf_id);
sslog.info("[Stream #{}] stream_transfer_task: cf_id={}, ks={}, cf={}, send_info.size={} to peer {}",
plan_id, cf_id, cf.schema()->ks_name(), cf.schema()->cf_name(), this->_send_infos.size(), id);
return do_for_each(this->_send_infos, [this] (auto& item) mutable {
auto shard = item.first;
auto si = std::move(item.second);
return this->session->get_db().invoke_on(shard, [si = std::move(si)] (database& db) mutable {
// Send mutations on related shards, do not capture this
auto si = make_lw_shared<send_info>(db, plan_id, cf_id, pr, id, dst_cpu_id);
return send_mutations(si);
return send_mutations(std::move(si));
});
});
}).then([this, plan_id, cf_id, id] {

View File

@@ -48,6 +48,7 @@
namespace streaming {
class stream_session;
class send_info;
/**
* StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@ -59,6 +60,7 @@ private:
// A stream_transfer_task always contains the same range to stream
std::vector<nonwrapping_range<dht::token>> _ranges;
long _total_size;
std::unordered_multimap<unsigned, foreign_ptr<shared_ptr<send_info>>> _send_infos;
public:
using UUID = utils::UUID;
stream_transfer_task(stream_transfer_task&&) = default;