mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge "Avoid avalanche of tasks after memtable flush" from Tomasz
"Before, the logic for releasing writes blocked on dirty worked like this:
1) When region group size changes and it is not under pressure and there
are some requests blocked, then schedule request releasing task
2) request releasing task, if no pressure, runs one request and if there are
still blocked requests, schedules next request releasing task
If requests don't change the size of the region group, then either some request
executes or there is a request releasing task scheduled. The amount of scheduled
tasks is at most 1, there is a single releasing thread.
However, if requests themselves would change the size of the group, then each
such change would schedule yet another request releasing thread, growing the task
queue size by one.
The group size can also change when memory is reclaimed from the groups (e.g.
when contains sparse segments). Compaction may start many request releasing
threads due to group size updates.
Such behavior is detrimental for performance and stability if there are a lot
of blocked requests. This can happen on 1.5 even with modest concurrency
because timed out requests stay in the queue. This is less likely on 1.6 where
they are dropped from the queue.
The releasing of tasks may start to dominate over other processes in the
system. When the amount of scheduled tasks reaches 1000, polling stops and
server becomes unresponsive until all of the released requests are done, which
is either when they start to block on dirty memory again or run out of blocked
requests. It may take a while to reach pressure condition after memtable flush
if it brings virtual dirty much below the threshold, which is currently the
case for workloads with overwrites producing sparse regions.
I saw this happening in a write workload from issue #2021 where the number of
request releasing threads grew into thousands.
Fix by ensuring there is at most one request releasing thread at a time. There
will be one releasing fiber per region group which is woken up when pressure is
lifted. It executes blocked requests until pressure occurs."
* tag 'tgrabiec/lsa-single-threaded-releasing-v2' of github.com:cloudius-systems/seastar-dev:
tests: lsa: Add test for reclaimer starting and stopping
tests: lsa: Add request releasing stress test
lsa: Avoid avalanche releasing of requests
lsa: Move definitions to .cc
lsa: Simplify hard pressure notification management
lsa: Do not start or stop reclaiming on hard pressure
tests: lsa: Adjust to take into account that reclaimers are run synchronously
lsa: Document and annotate reclaimer notification callbacks
tests: lsa: Use with_timeout() in quiesce()
This commit is contained in:
@@ -2854,7 +2854,7 @@ future<> dirty_memory_manager::flush_when_needed() {
|
||||
});
|
||||
}
|
||||
|
||||
void dirty_memory_manager::start_reclaiming() {
|
||||
void dirty_memory_manager::start_reclaiming() noexcept {
|
||||
_should_flush.signal();
|
||||
}
|
||||
|
||||
|
||||
@@ -149,7 +149,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
|
||||
std::unordered_map<const logalloc::region*, flush_permit> _flush_manager;
|
||||
|
||||
future<> _waiting_flush;
|
||||
virtual void start_reclaiming() override;
|
||||
virtual void start_reclaiming() noexcept override;
|
||||
|
||||
bool has_pressure() const {
|
||||
return over_soft_limit();
|
||||
|
||||
@@ -29,7 +29,9 @@
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/tests/test-utils.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <deque>
|
||||
#include "utils/phased_barrier.hh"
|
||||
|
||||
#include "utils/logalloc.hh"
|
||||
#include "utils/managed_ref.hh"
|
||||
@@ -529,11 +531,7 @@ inline void quiesce(FutureType&& fut) {
|
||||
// a request may be broken into many continuations. While we could just yield many times, the
|
||||
// exact amount needed to guarantee execution would be dependent on the internals of the
|
||||
// implementation, we want to avoid that.
|
||||
timer<> tmr;
|
||||
tmr.set_callback([] { BOOST_FAIL("The future we were waiting for took too long to get ready"); });
|
||||
tmr.arm(2s);
|
||||
fut.get();
|
||||
tmr.cancel();
|
||||
with_timeout(lowres_clock::now() + 2s, std::move(fut)).get();
|
||||
}
|
||||
|
||||
// Simple RAII structure that wraps around a region_group
|
||||
@@ -859,15 +857,22 @@ class test_reclaimer: public region_group_reclaimer {
|
||||
region_group _rg;
|
||||
std::vector<size_t> _reclaim_sizes;
|
||||
bool _shutdown = false;
|
||||
shared_promise<> _unleash_reclaimer;
|
||||
seastar::gate _reclaimers_done;
|
||||
public:
|
||||
virtual void start_reclaiming() override {
|
||||
while (this->under_pressure()) {
|
||||
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
|
||||
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
|
||||
}
|
||||
virtual void start_reclaiming() noexcept override {
|
||||
with_gate(_reclaimers_done, [this] {
|
||||
return _unleash_reclaimer.get_shared_future().then([this] {
|
||||
while (this->under_pressure()) {
|
||||
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
|
||||
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
~test_reclaimer() {
|
||||
_reclaimers_done.close().get();
|
||||
_rg.shutdown().get();
|
||||
}
|
||||
|
||||
@@ -881,6 +886,10 @@ public:
|
||||
|
||||
test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg(*this) {}
|
||||
test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg(&parent._rg, *this) {}
|
||||
|
||||
void unleash() {
|
||||
_unleash_reclaimer.set_value();
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
|
||||
@@ -888,6 +897,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
|
||||
// allocate a single region to exhaustion, and make sure active reclaim is activated.
|
||||
test_reclaimer simple(logalloc::segment_size);
|
||||
test_async_reclaim_region simple_region(simple.rg(), logalloc::segment_size);
|
||||
simple.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed something
|
||||
auto fut = simple.rg().run_when_memory_available([] {});
|
||||
@@ -912,6 +922,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offen
|
||||
test_async_reclaim_region small_region(simple.rg(), logalloc::segment_size);
|
||||
test_async_reclaim_region medium_region(simple.rg(), 2 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_region(simple.rg(), 3 * logalloc::segment_size);
|
||||
simple.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed
|
||||
auto fut = simple.rg().run_when_memory_available([&simple] {
|
||||
@@ -941,6 +952,9 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offend
|
||||
test_async_reclaim_region small_region(small_leaf.rg(), logalloc::segment_size);
|
||||
test_async_reclaim_region medium_region(root.rg(), 2 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_region(large_leaf.rg(), 3 * logalloc::segment_size);
|
||||
root.unleash();
|
||||
large_leaf.unleash();
|
||||
small_leaf.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed. Try at the root, and we'll make sure
|
||||
// that the leaves are forced correctly.
|
||||
@@ -967,6 +981,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_bl
|
||||
test_reclaimer leaf(root, logalloc::segment_size);
|
||||
|
||||
test_async_reclaim_region root_region(root.rg(), logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure
|
||||
// that the root reclaims
|
||||
@@ -992,6 +1008,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_
|
||||
test_async_reclaim_region root_region(root.rg(), 4 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_leaf_region(leaf.rg(), 3 * logalloc::segment_size);
|
||||
test_async_reclaim_region small_leaf_region(leaf.rg(), 2 * logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
auto fut = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
|
||||
@@ -1018,6 +1036,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r
|
||||
test_reclaimer leaf(root, logalloc::segment_size);
|
||||
|
||||
test_async_reclaim_region leaf_region(leaf.rg(), logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
auto fut_root = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
|
||||
@@ -1037,3 +1057,117 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
|
||||
});
|
||||
}
|
||||
|
||||
// Reproduces issue #2021
|
||||
SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) {
|
||||
return seastar::async([] {
|
||||
#ifndef DEFAULT_ALLOCATOR // Because we need memory::stats().free_memory();
|
||||
logging::logger_registry().set_logger_level("lsa", seastar::log_level::debug);
|
||||
|
||||
auto free_space = memory::stats().free_memory();
|
||||
size_t threshold = size_t(0.75 * free_space);
|
||||
region_group_reclaimer recl(threshold, threshold);
|
||||
region_group gr(recl);
|
||||
auto close_gr = defer([&gr] { gr.shutdown().get(); });
|
||||
region r(gr);
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
std::vector<managed_bytes> objs;
|
||||
|
||||
r.make_evictable([&] {
|
||||
if (objs.empty()) {
|
||||
return memory::reclaiming_result::reclaimed_nothing;
|
||||
}
|
||||
with_allocator(r.allocator(), [&] {
|
||||
objs.pop_back();
|
||||
});
|
||||
return memory::reclaiming_result::reclaimed_something;
|
||||
});
|
||||
|
||||
auto fill_to_pressure = [&] {
|
||||
while (!recl.under_pressure()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024));
|
||||
}
|
||||
};
|
||||
|
||||
utils::phased_barrier request_barrier;
|
||||
auto wait_for_requests = defer([&] { request_barrier.advance_and_await().get(); });
|
||||
|
||||
for (int i = 0; i < 1000000; ++i) {
|
||||
fill_to_pressure();
|
||||
future<> f = gr.run_when_memory_available([&, op = request_barrier.start()] {
|
||||
// Trigger group size change (Refs issue #2021)
|
||||
gr.update(-10);
|
||||
gr.update(+10);
|
||||
});
|
||||
BOOST_REQUIRE(!f.available());
|
||||
}
|
||||
|
||||
// Release
|
||||
while (recl.under_pressure()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
});
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) {
|
||||
return seastar::async([] {
|
||||
size_t hard_threshold = logalloc::segment_size * 8;
|
||||
size_t soft_threshold = hard_threshold / 2;
|
||||
|
||||
class reclaimer : public region_group_reclaimer {
|
||||
bool _reclaim = false;
|
||||
protected:
|
||||
void start_reclaiming() noexcept override {
|
||||
_reclaim = true;
|
||||
}
|
||||
|
||||
void stop_reclaiming() noexcept override {
|
||||
_reclaim = false;
|
||||
}
|
||||
public:
|
||||
reclaimer(size_t hard_threshold, size_t soft_threshold)
|
||||
: region_group_reclaimer(hard_threshold, soft_threshold)
|
||||
{ }
|
||||
bool reclaiming() const { return _reclaim; };
|
||||
};
|
||||
|
||||
reclaimer recl(hard_threshold, soft_threshold);
|
||||
region_group gr(recl);
|
||||
auto close_gr = defer([&gr] { gr.shutdown().get(); });
|
||||
region r(gr);
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
std::vector<managed_bytes> objs;
|
||||
|
||||
BOOST_REQUIRE(!recl.reclaiming());
|
||||
|
||||
while (!recl.over_soft_limit()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (!recl.under_pressure()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (recl.under_pressure()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.over_soft_limit());
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (recl.over_soft_limit()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!recl.reclaiming());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2065,56 +2065,6 @@ uint64_t region_group::top_region_evictable_space() const {
|
||||
return _regions.empty() ? 0 : _regions.top()->evictable_occupancy().total_space();
|
||||
}
|
||||
|
||||
void region_group::release_requests() noexcept {
|
||||
// The later() statement is here to avoid executing the function in update() context. But
|
||||
// also guarantees that we won't dominate the CPU if we have many requests to release.
|
||||
//
|
||||
// However, both with_gate() and later() can ultimately call to schedule() and consequently
|
||||
// allocate memory, which (if that allocation triggers a compaction - that frees memory) would
|
||||
// defeat the very purpose of not executing this on update() context. Allocations should be rare
|
||||
// on those but can happen, so we need to at least make sure they will not reclaim.
|
||||
//
|
||||
// Whatever comes after later() is already in a safe context, so we don't need to keep the lock
|
||||
// alive until we are done with the whole execution - only until later is successfully executed.
|
||||
tracker_reclaimer_lock rl;
|
||||
|
||||
_reclaimer.notify_relief();
|
||||
if (_descendant_blocked_requests) {
|
||||
_descendant_blocked_requests->set_value();
|
||||
}
|
||||
_descendant_blocked_requests = {};
|
||||
|
||||
if (_blocked_requests.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
with_gate(_asynchronous_gate, [this, rl = std::move(rl)] () mutable {
|
||||
return later().then([this] {
|
||||
// Check again, we may have executed release_requests() in this mean time from another entry
|
||||
// point (for instance, a descendant notification)
|
||||
if (_blocked_requests.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
|
||||
return rg->execution_permitted() ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
|
||||
if (!blocked_at) {
|
||||
auto req = std::move(_blocked_requests.front());
|
||||
_blocked_requests.pop_front();
|
||||
req->allocate();
|
||||
release_requests();
|
||||
} else {
|
||||
// If someone blocked us in the mean time then we can't execute. We need to make
|
||||
// sure that we are listening to notifications, though. It could be that we used to
|
||||
// be blocked on ourselves and now we are blocking on an ancestor
|
||||
subscribe_for_ancestor_available_memory_notification(blocked_at);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
region* region_group::get_largest_region() {
|
||||
if (!_maximal_rg || _maximal_rg->_regions.empty()) {
|
||||
return nullptr;
|
||||
@@ -2148,6 +2098,88 @@ region_group::del(region_impl* child) {
|
||||
update(-child->occupancy().total_space());
|
||||
}
|
||||
|
||||
bool
|
||||
region_group::execution_permitted() noexcept {
|
||||
return do_for_each_parent(this, [] (auto rg) {
|
||||
return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no;
|
||||
}) == nullptr;
|
||||
}
|
||||
|
||||
future<>
|
||||
region_group::start_releaser() {
|
||||
return later().then([this] {
|
||||
return repeat([this] () noexcept {
|
||||
if (_shutdown_requested) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
|
||||
if (!_blocked_requests.empty() && execution_permitted()) {
|
||||
auto req = std::move(_blocked_requests.front());
|
||||
_blocked_requests.pop_front();
|
||||
req->allocate();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
// Block reclaiming to prevent signal() from being called by reclaimer inside wait()
|
||||
// FIXME: handle allocation failures (not very likely) like allocating_section does
|
||||
tracker_reclaimer_lock rl;
|
||||
return _relief.wait().then([] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
region_group::region_group(region_group *parent, region_group_reclaimer& reclaimer)
|
||||
: _parent(parent)
|
||||
, _reclaimer(reclaimer)
|
||||
, _releaser(reclaimer_can_block() ? start_releaser() : make_ready_future<>())
|
||||
{
|
||||
if (_parent) {
|
||||
_parent->add(this);
|
||||
}
|
||||
}
|
||||
|
||||
bool region_group::reclaimer_can_block() const {
|
||||
return _reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max();
|
||||
}
|
||||
|
||||
void region_group::notify_relief() {
|
||||
_relief.signal();
|
||||
for (region_group* child : _subgroups) {
|
||||
child->notify_relief();
|
||||
}
|
||||
}
|
||||
|
||||
void region_group::update(ssize_t delta) {
|
||||
// Most-enclosing group which was relieved.
|
||||
region_group* top_relief = nullptr;
|
||||
|
||||
do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable {
|
||||
rg->update_maximal_rg();
|
||||
rg->_total_memory += delta;
|
||||
|
||||
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_pressure();
|
||||
} else {
|
||||
rg->_reclaimer.notify_soft_relief();
|
||||
}
|
||||
|
||||
if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) {
|
||||
rg->_reclaimer.notify_pressure();
|
||||
} else if (rg->_reclaimer.under_pressure()) {
|
||||
rg->_reclaimer.notify_relief();
|
||||
top_relief = rg;
|
||||
}
|
||||
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
if (top_relief) {
|
||||
top_relief->notify_relief();
|
||||
}
|
||||
}
|
||||
|
||||
allocating_section::guard::guard()
|
||||
: _prev(shard_segment_pool.emergency_reserve_max())
|
||||
{ }
|
||||
|
||||
@@ -64,8 +64,20 @@ protected:
|
||||
size_t _soft_limit;
|
||||
bool _under_pressure = false;
|
||||
bool _under_soft_pressure = false;
|
||||
virtual void start_reclaiming() {}
|
||||
virtual void stop_reclaiming() {}
|
||||
// The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
|
||||
//
|
||||
// - must not use any region or region_group objects, because they're invoked synchronously
|
||||
// with operations on those.
|
||||
//
|
||||
// - must be noexcept, because they're called on the free path.
|
||||
//
|
||||
// - the implementation may be called synchronously with any operation
|
||||
// which allocates memory, because these are called by memory reclaimer.
|
||||
// In particular, the implementation should not depend on memory allocation
|
||||
// because that may fail when in reclaiming context.
|
||||
//
|
||||
virtual void start_reclaiming() noexcept {}
|
||||
virtual void stop_reclaiming() noexcept {}
|
||||
public:
|
||||
bool under_pressure() const {
|
||||
return _under_pressure;
|
||||
@@ -75,32 +87,26 @@ public:
|
||||
return _under_soft_pressure;
|
||||
}
|
||||
|
||||
void notify_soft_pressure() {
|
||||
void notify_soft_pressure() noexcept {
|
||||
if (!_under_soft_pressure) {
|
||||
_under_soft_pressure = true;
|
||||
start_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_soft_relief() {
|
||||
void notify_soft_relief() noexcept {
|
||||
if (_under_soft_pressure) {
|
||||
_under_soft_pressure = false;
|
||||
stop_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_pressure() {
|
||||
if (!_under_pressure) {
|
||||
_under_pressure = true;
|
||||
start_reclaiming();
|
||||
}
|
||||
void notify_pressure() noexcept {
|
||||
_under_pressure = true;
|
||||
}
|
||||
|
||||
void notify_relief() {
|
||||
if (_under_pressure) {
|
||||
_under_pressure = false;
|
||||
stop_reclaiming();
|
||||
}
|
||||
void notify_relief() noexcept {
|
||||
_under_pressure = false;
|
||||
}
|
||||
|
||||
region_group_reclaimer()
|
||||
@@ -108,7 +114,9 @@ public:
|
||||
region_group_reclaimer(size_t threshold)
|
||||
: _threshold(threshold), _soft_limit(threshold) {}
|
||||
region_group_reclaimer(size_t threshold, size_t soft)
|
||||
: _threshold(threshold), _soft_limit(soft) {}
|
||||
: _threshold(threshold), _soft_limit(soft) {
|
||||
assert(_soft_limit <= _threshold);
|
||||
}
|
||||
|
||||
virtual ~region_group_reclaimer() {}
|
||||
|
||||
@@ -229,9 +237,13 @@ class region_group {
|
||||
// a different ancestor)
|
||||
std::experimental::optional<shared_promise<>> _descendant_blocked_requests = {};
|
||||
|
||||
region_group* _waiting_on_ancestor = nullptr;
|
||||
seastar::gate _asynchronous_gate;
|
||||
condition_variable _relief;
|
||||
future<> _releaser;
|
||||
bool _shutdown_requested = false;
|
||||
|
||||
bool reclaimer_can_block() const;
|
||||
future<> start_releaser();
|
||||
void notify_relief();
|
||||
public:
|
||||
// When creating a region_group, one can specify an optional throttle_threshold parameter. This
|
||||
// parameter won't affect normal allocations, but an API is provided, through the region_group's
|
||||
@@ -239,17 +251,13 @@ public:
|
||||
// the total memory for the region group (and all of its parents) is lower or equal to the
|
||||
// region_group's throttle_treshold (and respectively for its parents).
|
||||
region_group(region_group_reclaimer& reclaimer = no_reclaimer) : region_group(nullptr, reclaimer) {}
|
||||
region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer) : _parent(parent), _reclaimer(reclaimer) {
|
||||
if (_parent) {
|
||||
_parent->add(this);
|
||||
}
|
||||
}
|
||||
region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer);
|
||||
region_group(region_group&& o) = delete;
|
||||
region_group(const region_group&) = delete;
|
||||
~region_group() {
|
||||
// If we set a throttle threshold, we'd be postponing many operations. So shutdown must be
|
||||
// called.
|
||||
if (_reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max()) {
|
||||
if (reclaimer_can_block()) {
|
||||
assert(_shutdown_requested);
|
||||
}
|
||||
if (_parent) {
|
||||
@@ -261,24 +269,7 @@ public:
|
||||
size_t memory_used() const {
|
||||
return _total_memory;
|
||||
}
|
||||
void update(ssize_t delta) {
|
||||
do_for_each_parent(this, [delta] (auto rg) mutable {
|
||||
rg->update_maximal_rg();
|
||||
rg->_total_memory += delta;
|
||||
// It is okay to call release_requests for a region_group that can't allow execution.
|
||||
// But that can generate various spurious messages to groups waiting on us that will be
|
||||
// then woken up just so they can go to wait again. So let's filter that.
|
||||
if (rg->execution_permitted()) {
|
||||
rg->release_requests();
|
||||
}
|
||||
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_pressure();
|
||||
} else if (rg->_total_memory < rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_relief();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
void update(ssize_t delta);
|
||||
|
||||
// It would be easier to call update, but it is unfortunately broken in boost versions up to at
|
||||
// least 1.59.
|
||||
@@ -324,36 +315,18 @@ public:
|
||||
using futurator = futurize<std::result_of_t<Func()>>;
|
||||
|
||||
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
|
||||
return (rg->_blocked_requests.empty() && rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes;
|
||||
return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
|
||||
if (!blocked_at) {
|
||||
return futurator::apply(func);
|
||||
}
|
||||
subscribe_for_ancestor_available_memory_notification(blocked_at);
|
||||
|
||||
auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
|
||||
auto fut = fn->get_future();
|
||||
_blocked_requests.push_back(std::move(fn), timeout);
|
||||
++_blocked_requests_counter;
|
||||
|
||||
// This is called here, and not at update(), for two reasons: the first, is that things that
|
||||
// are done during the free() path should be done carefuly, in the sense that they can
|
||||
// trigger another update call and put us in a loop. Not to mention we would like to keep
|
||||
// those from having exceptions. We solve that for release_requests by using later(), but in
|
||||
// here we can do away with that need altogether.
|
||||
//
|
||||
// Second and most important, until we actually block a request, the pressure condition may
|
||||
// very well be transient. There are opportunities for compactions, the condition can go
|
||||
// away on its own, etc.
|
||||
//
|
||||
// The reason we check execution permitted(), is that we'll still block requests if we have
|
||||
// free memory but existing requests in the queue. That is so we can keep our FIFO ordering
|
||||
// guarantee. So we need to distinguish here the case in which we're blocking merely to
|
||||
// serialize requests, so that the caller does not evict more than it should.
|
||||
if (!blocked_at->execution_permitted()) {
|
||||
blocked_at->_reclaimer.notify_pressure();
|
||||
}
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -363,9 +336,11 @@ public:
|
||||
region* get_largest_region();
|
||||
|
||||
// Shutdown is mandatory for every user who has set a threshold
|
||||
// Can be called at most once.
|
||||
future<> shutdown() {
|
||||
_shutdown_requested = true;
|
||||
return _asynchronous_gate.close();
|
||||
_relief.signal();
|
||||
return std::move(_releaser);
|
||||
}
|
||||
|
||||
size_t blocked_requests() {
|
||||
@@ -376,43 +351,9 @@ public:
|
||||
return _blocked_requests_counter;
|
||||
}
|
||||
private:
|
||||
// Make sure we get a notification and can call release_requests when one of our ancestors that
|
||||
// used to block us is no longer under memory pressure.
|
||||
void subscribe_for_ancestor_available_memory_notification(region_group *ancestor) {
|
||||
if ((this == ancestor) || (_waiting_on_ancestor)) {
|
||||
return; // already subscribed, or no need to
|
||||
}
|
||||
|
||||
_waiting_on_ancestor = ancestor;
|
||||
|
||||
with_gate(_asynchronous_gate, [this] {
|
||||
// We reevaluate _waiting_on_ancestor here so we make sure there is no deferring point
|
||||
// between determining the ancestor and registering with it for a notification. We start
|
||||
// with _waiting_on_ancestor set to the initial value, and after we are notified, we
|
||||
// will set _waiting_on_ancestor to nullptr to force this lambda to reevaluate it.
|
||||
auto evaluate_ancestor_and_stop = [this] {
|
||||
if (!_waiting_on_ancestor) {
|
||||
auto new_blocking_point = do_for_each_parent(this, [] (auto rg) {
|
||||
return (rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
if (!new_blocking_point) {
|
||||
release_requests();
|
||||
}
|
||||
_waiting_on_ancestor = (new_blocking_point == this) ? nullptr : new_blocking_point;
|
||||
}
|
||||
return _waiting_on_ancestor == nullptr;
|
||||
};
|
||||
|
||||
return do_until(evaluate_ancestor_and_stop, [this] {
|
||||
if (!_waiting_on_ancestor->_descendant_blocked_requests) {
|
||||
_waiting_on_ancestor->_descendant_blocked_requests = shared_promise<>();
|
||||
}
|
||||
return _waiting_on_ancestor->_descendant_blocked_requests->get_shared_future().then([this] {
|
||||
_waiting_on_ancestor = nullptr;
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
// Returns true if and only if constraints of this group are not violated.
|
||||
// That's taking into account any constraints imposed by enclosing (parent) groups.
|
||||
bool execution_permitted() noexcept;
|
||||
|
||||
// Executes the function func for each region_group upwards in the hierarchy, starting with the
|
||||
// parameter node. The function func may return stop_iteration::no, in which case it proceeds to
|
||||
@@ -432,11 +373,10 @@ private:
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
inline bool execution_permitted() const {
|
||||
return _total_memory <= _reclaimer.throttle_threshold();
|
||||
}
|
||||
|
||||
void release_requests() noexcept;
|
||||
inline bool under_pressure() const {
|
||||
return _reclaimer.under_pressure();
|
||||
}
|
||||
|
||||
uint64_t top_region_evictable_space() const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user