result_memory_limiter: split new_read() to new_{data, mutation}_read()

For data queries it is very important that all replicas get limited in
the same place (this includes replicas returning only digest). That's
why they shouldn't be affected by per-shard result memory limit.
Moreover, we should make sure that individual memory limits are the
same, making the coordinator provide it for replicas which allow to
safely change it in the future.

Mutation queries are not as sensitive but it is still beneficial to make
sure that all replicas use the same individual limit.
This commit is contained in:
Paweł Dziepak
2016-12-19 13:26:24 +00:00
parent b8e29cc99c
commit aa083d3d85
3 changed files with 47 additions and 12 deletions

View File

@@ -2493,7 +2493,7 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
utils::latency_counter lc;
_stats.reads.set_latency(lc);
auto f = request == query::result_request::only_digest
? make_ready_future<query::result_memory_accounter>() : memory_limiter.new_read();
? make_ready_future<query::result_memory_accounter>() : memory_limiter.new_data_read(query::result_memory_limiter::maximum_result_size);
return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state)] (query::result_memory_accounter accounter) mutable {
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, request, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;

View File

@@ -67,8 +67,16 @@ public:
return _maximum_total_result_memory - _memory_limiter.available_units();
}
// Reserves minimum_result_size and creates new memory accounter.
future<result_memory_accounter> new_read();
// Reserves minimum_result_size and creates new memory accounter for
// mutation query. Uses the specified maximum result size and may be
// stopped before reaching it due to memory pressure on shard.
future<result_memory_accounter> new_mutation_read(size_t max_result_size);
// Reserves minimum_result_size and creates new memory accounter for
// data query. Uses the specified maximum result size, result will *not*
// be stopped due to on shard memory pressure in order to avoid digest
// mismatches.
future<result_memory_accounter> new_data_read(size_t max_result_size);
// Checks whether the result can grow any more, takes into account only
// the per shard limit.
@@ -108,11 +116,28 @@ class result_memory_accounter {
size_t _blocked_bytes = 0;
size_t _used_memory = 0;
size_t _total_used_memory = 0;
size_t _maximum_result_size;
stop_iteration _stop_on_global_limit;
private:
explicit result_memory_accounter(result_memory_limiter& limiter) noexcept
// Mutation query accounter. Uses provided individual result size limit and
// will stop when shard memory pressure grows too high.
struct mutation_query_tag { };
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
, _stop_on_global_limit(true)
{ }
// Data query accounter. Uses provided individual result size limit and
// will *not* stop even though shard memory pressure grows too high.
struct data_query_tag { };
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
{ }
friend class result_memory_limiter;
public:
result_memory_accounter() = default;
@@ -133,6 +158,8 @@ public:
, _blocked_bytes(other._blocked_bytes)
, _used_memory(other._used_memory)
, _total_used_memory(other._total_used_memory)
, _maximum_result_size(other._maximum_result_size)
, _stop_on_global_limit(other._stop_on_global_limit)
{ }
result_memory_accounter& operator=(result_memory_accounter&& other) noexcept {
@@ -157,11 +184,11 @@ public:
stop_iteration update_and_check(size_t n) {
_used_memory += n;
_total_used_memory += n;
auto stop = stop_iteration(_total_used_memory > result_memory_limiter::maximum_result_size);
auto stop = stop_iteration(_total_used_memory > _maximum_result_size);
if (_limiter && _used_memory > _blocked_bytes) {
auto to_block = std::min(_used_memory - _blocked_bytes, n);
_blocked_bytes += to_block;
stop = _limiter->update_and_check(to_block) || stop;
stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop;
}
return stop;
}
@@ -170,7 +197,7 @@ public:
stop_iteration check() const {
stop_iteration stop { _total_used_memory > result_memory_limiter::maximum_result_size };
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
return _limiter->check();
return _limiter->check() && _stop_on_global_limit;
}
return stop;
}
@@ -189,9 +216,15 @@ public:
}
};
inline future<result_memory_accounter> result_memory_limiter::new_read() {
return _memory_limiter.wait(minimum_result_size).then([this] {
return result_memory_accounter(*this);
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(size_t max_size) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
return result_memory_accounter(result_memory_accounter::mutation_query_tag(), *this, max_size);
});
}
inline future<result_memory_accounter> result_memory_limiter::new_data_read(size_t max_size) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
return result_memory_accounter(result_memory_accounter::data_query_tag(), *this, max_size);
});
}

View File

@@ -3664,7 +3664,8 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_c
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
return db.get_result_memory_limiter().new_read().then([&] (query::result_memory_accounter ma) {
auto max_size = query::result_memory_limiter::maximum_result_size;
return db.get_result_memory_limiter().new_mutation_read(max_size).then([&] (query::result_memory_accounter ma) {
return db.query_mutations(gs, *cmd, pr, std::move(ma), gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
@@ -3738,7 +3739,8 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
dht::ring_position_range_vector_sharder& rprs,
global_schema_ptr& gs,
tracing::global_trace_state_ptr& gt) {
return _db.local().get_result_memory_limiter().new_read().then([&, s] (query::result_memory_accounter ma) {
auto max_size = query::result_memory_limiter::maximum_result_size;
return _db.local().get_result_memory_limiter().new_mutation_read(max_size).then([&, s] (query::result_memory_accounter ma) {
mrm.memory() = std::move(ma);
return repeat_until_value([&, s] () -> future<stdx::optional<reconcilable_result>> {
// We don't want to query a sparsely populated table sequentially, because the latency