streaming: Reduce memory usage when sending mutations
Limit disk bandwidth to 5MB/s to emulate a slow disk: echo "8:0 5000000" > /cgroup/blkio/limit/blkio.throttle.write_bps_device echo "8:0 5000000" > /cgroup/blkio/limit/blkio.throttle.read_bps_device Start scylla node 1 with low memory: scylla -c 1 -m 128M --auto-bootstrap false Run c-s: taskset -c 7 cassandra-stress write duration=5m cl=ONE -schema 'replication(factor=1)' -pop seq=1..100000 -rate threads=20 limit=2000/s -node 127.0.0.1 Start scylla node 2 with low memory: scylla -c 1 -m 128M --auto-bootstrap true Without this patch, I saw std::bad_alloc during streaming ERROR 2016-06-01 14:31:00,196 [shard 0] storage_proxy - exception during mutation write to 127.0.0.1: std::bad_alloc (std::bad_alloc) ... ERROR 2016-06-01 14:31:10,172 [shard 0] database - failed to move memtable to cache: std::bad_alloc (std::bad_alloc) ... To fix: 1. Apply the streaming mutation limiter before we read the mutation into memory to avoid wasting memory holding the mutation which we can not send. 2. Reduce the parallelism of sending streaming mutations. Before we send each range in parallel, after we send each range one by one. before: nr_vnode * nr_shard * (send_info + cf.make_reader memory usage) after: nr_shard * (send_info + cf.make_reader memory usage) We can at least save memory usage by the factor of nr_vnode, 256 by default. In my setup, fix 1) alone is not enough, with both fix 1) and 2), I saw no std::bad_alloc. Also, I did not see streaming bandwidth dropped due to 2). In addition, I tested grow_cluster_test.py:GrowClusterTest.test_grow_3_to_4, as described: https://github.com/scylladb/scylla/issues/1270#issuecomment-222585375 With this patch, I saw no std::bad_alloc any more. Fixes: #1270 Message-Id: <7703cf7a9db40e53a87f0f7b5acbb03fff2daf43.1464785542.git.asias@scylladb.com>
This commit is contained in:
Notes:
Pekka Enberg
2016-06-02 11:19:13 +03:00
backport: 1.1 backport: 1.2
@@ -85,7 +85,6 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
@@ -100,26 +99,27 @@ future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
|
||||
Reference in New Issue
Block a user