diff --git a/database.cc b/database.cc index da2217edc8..1796e3359b 100644 --- a/database.cc +++ b/database.cc @@ -2714,7 +2714,7 @@ future<> dirty_memory_manager::flush_when_needed() { }); } -void dirty_memory_manager::start_reclaiming() { +void dirty_memory_manager::start_reclaiming() noexcept { _should_flush.signal(); } diff --git a/database.hh b/database.hh index 05d9ff39be..f925e67b2c 100644 --- a/database.hh +++ b/database.hh @@ -144,7 +144,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer { std::unordered_map _flush_manager; future<> _waiting_flush; - virtual void start_reclaiming() override; + virtual void start_reclaiming() noexcept override; bool has_pressure() const { return over_soft_limit(); diff --git a/tests/logalloc_test.cc b/tests/logalloc_test.cc index d3e5ae4846..ee9309dcad 100644 --- a/tests/logalloc_test.cc +++ b/tests/logalloc_test.cc @@ -29,7 +29,9 @@ #include #include #include +#include #include +#include "utils/phased_barrier.hh" #include "utils/logalloc.hh" #include "utils/managed_ref.hh" @@ -529,11 +531,7 @@ inline void quiesce(FutureType&& fut) { // a request may be broken into many continuations. While we could just yield many times, the // exact amount needed to guarantee execution would be dependent on the internals of the // implementation, we want to avoid that. - timer<> tmr; - tmr.set_callback([] { BOOST_FAIL("The future we were waiting for took too long to get ready"); }); - tmr.arm(2s); - fut.get(); - tmr.cancel(); + with_timeout(lowres_clock::now() + 2s, std::move(fut)).get(); } // Simple RAII structure that wraps around a region_group @@ -859,15 +857,22 @@ class test_reclaimer: public region_group_reclaimer { region_group _rg; std::vector _reclaim_sizes; bool _shutdown = false; + shared_promise<> _unleash_reclaimer; + seastar::gate _reclaimers_done; public: - virtual void start_reclaiming() override { - while (this->under_pressure()) { - size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict(); - _result_accumulator->_reclaim_sizes.push_back(reclaimed); - } + virtual void start_reclaiming() noexcept override { + with_gate(_reclaimers_done, [this] { + return _unleash_reclaimer.get_shared_future().then([this] { + while (this->under_pressure()) { + size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict(); + _result_accumulator->_reclaim_sizes.push_back(reclaimed); + } + }); + }); } ~test_reclaimer() { + _reclaimers_done.close().get(); _rg.shutdown().get(); } @@ -881,6 +886,10 @@ public: test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg(*this) {} test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg(&parent._rg, *this) {} + + void unleash() { + _unleash_reclaimer.set_value(); + } }; SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) { @@ -888,6 +897,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) { // allocate a single region to exhaustion, and make sure active reclaim is activated. test_reclaimer simple(logalloc::segment_size); test_async_reclaim_region simple_region(simple.rg(), logalloc::segment_size); + simple.unleash(); // Can't run this function until we have reclaimed something auto fut = simple.rg().run_when_memory_available([] {}); @@ -912,6 +922,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offen test_async_reclaim_region small_region(simple.rg(), logalloc::segment_size); test_async_reclaim_region medium_region(simple.rg(), 2 * logalloc::segment_size); test_async_reclaim_region big_region(simple.rg(), 3 * logalloc::segment_size); + simple.unleash(); // Can't run this function until we have reclaimed auto fut = simple.rg().run_when_memory_available([&simple] { @@ -941,6 +952,9 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offend test_async_reclaim_region small_region(small_leaf.rg(), logalloc::segment_size); test_async_reclaim_region medium_region(root.rg(), 2 * logalloc::segment_size); test_async_reclaim_region big_region(large_leaf.rg(), 3 * logalloc::segment_size); + root.unleash(); + large_leaf.unleash(); + small_leaf.unleash(); // Can't run this function until we have reclaimed. Try at the root, and we'll make sure // that the leaves are forced correctly. @@ -967,6 +981,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_bl test_reclaimer leaf(root, logalloc::segment_size); test_async_reclaim_region root_region(root.rg(), logalloc::segment_size); + root.unleash(); + leaf.unleash(); // Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure // that the root reclaims @@ -992,6 +1008,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_ test_async_reclaim_region root_region(root.rg(), 4 * logalloc::segment_size); test_async_reclaim_region big_leaf_region(leaf.rg(), 3 * logalloc::segment_size); test_async_reclaim_region small_leaf_region(leaf.rg(), 2 * logalloc::segment_size); + root.unleash(); + leaf.unleash(); auto fut = root.rg().run_when_memory_available([&root] { BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3); @@ -1018,6 +1036,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r test_reclaimer leaf(root, logalloc::segment_size); test_async_reclaim_region leaf_region(leaf.rg(), logalloc::segment_size); + root.unleash(); + leaf.unleash(); auto fut_root = root.rg().run_when_memory_available([&root] { BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1); @@ -1037,3 +1057,117 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size); }); } + +// Reproduces issue #2021 +SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) { + return seastar::async([] { +#ifndef DEFAULT_ALLOCATOR // Because we need memory::stats().free_memory(); + logging::logger_registry().set_logger_level("lsa", seastar::log_level::debug); + + auto free_space = memory::stats().free_memory(); + size_t threshold = size_t(0.75 * free_space); + region_group_reclaimer recl(threshold, threshold); + region_group gr(recl); + auto close_gr = defer([&gr] { gr.shutdown().get(); }); + region r(gr); + + with_allocator(r.allocator(), [&] { + std::vector objs; + + r.make_evictable([&] { + if (objs.empty()) { + return memory::reclaiming_result::reclaimed_nothing; + } + with_allocator(r.allocator(), [&] { + objs.pop_back(); + }); + return memory::reclaiming_result::reclaimed_something; + }); + + auto fill_to_pressure = [&] { + while (!recl.under_pressure()) { + objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024)); + } + }; + + utils::phased_barrier request_barrier; + auto wait_for_requests = defer([&] { request_barrier.advance_and_await().get(); }); + + for (int i = 0; i < 1000000; ++i) { + fill_to_pressure(); + future<> f = gr.run_when_memory_available([&, op = request_barrier.start()] { + // Trigger group size change (Refs issue #2021) + gr.update(-10); + gr.update(+10); + }); + BOOST_REQUIRE(!f.available()); + } + + // Release + while (recl.under_pressure()) { + objs.pop_back(); + } + }); +#endif + }); +} + +SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) { + return seastar::async([] { + size_t hard_threshold = logalloc::segment_size * 8; + size_t soft_threshold = hard_threshold / 2; + + class reclaimer : public region_group_reclaimer { + bool _reclaim = false; + protected: + void start_reclaiming() noexcept override { + _reclaim = true; + } + + void stop_reclaiming() noexcept override { + _reclaim = false; + } + public: + reclaimer(size_t hard_threshold, size_t soft_threshold) + : region_group_reclaimer(hard_threshold, soft_threshold) + { } + bool reclaiming() const { return _reclaim; }; + }; + + reclaimer recl(hard_threshold, soft_threshold); + region_group gr(recl); + auto close_gr = defer([&gr] { gr.shutdown().get(); }); + region r(gr); + + with_allocator(r.allocator(), [&] { + std::vector objs; + + BOOST_REQUIRE(!recl.reclaiming()); + + while (!recl.over_soft_limit()) { + objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size)); + } + + BOOST_REQUIRE(recl.reclaiming()); + + while (!recl.under_pressure()) { + objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size)); + } + + BOOST_REQUIRE(recl.reclaiming()); + + while (recl.under_pressure()) { + objs.pop_back(); + } + + BOOST_REQUIRE(recl.over_soft_limit()); + BOOST_REQUIRE(recl.reclaiming()); + + while (recl.over_soft_limit()) { + objs.pop_back(); + } + + BOOST_REQUIRE(!recl.reclaiming()); + }); + }); +} diff --git a/utils/logalloc.cc b/utils/logalloc.cc index cbfbac6a72..b8c3aecaad 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -2076,56 +2076,6 @@ uint64_t region_group::top_region_evictable_space() const { return _regions.empty() ? 0 : _regions.top()->evictable_occupancy().total_space(); } -void region_group::release_requests() noexcept { - // The later() statement is here to avoid executing the function in update() context. But - // also guarantees that we won't dominate the CPU if we have many requests to release. - // - // However, both with_gate() and later() can ultimately call to schedule() and consequently - // allocate memory, which (if that allocation triggers a compaction - that frees memory) would - // defeat the very purpose of not executing this on update() context. Allocations should be rare - // on those but can happen, so we need to at least make sure they will not reclaim. - // - // Whatever comes after later() is already in a safe context, so we don't need to keep the lock - // alive until we are done with the whole execution - only until later is successfully executed. - tracker_reclaimer_lock rl; - - _reclaimer.notify_relief(); - if (_descendant_blocked_requests) { - _descendant_blocked_requests->set_value(); - } - _descendant_blocked_requests = {}; - - if (_blocked_requests.empty()) { - return; - } - - with_gate(_asynchronous_gate, [this, rl = std::move(rl)] () mutable { - return later().then([this] { - // Check again, we may have executed release_requests() in this mean time from another entry - // point (for instance, a descendant notification) - if (_blocked_requests.empty()) { - return; - } - - auto blocked_at = do_for_each_parent(this, [] (auto rg) { - return rg->execution_permitted() ? stop_iteration::no : stop_iteration::yes; - }); - - if (!blocked_at) { - auto req = std::move(_blocked_requests.front()); - _blocked_requests.pop_front(); - req->allocate(); - release_requests(); - } else { - // If someone blocked us in the mean time then we can't execute. We need to make - // sure that we are listening to notifications, though. It could be that we used to - // be blocked on ourselves and now we are blocking on an ancestor - subscribe_for_ancestor_available_memory_notification(blocked_at); - } - }); - }); -} - region* region_group::get_largest_region() { if (!_maximal_rg || _maximal_rg->_regions.empty()) { return nullptr; @@ -2159,6 +2109,88 @@ region_group::del(region_impl* child) { update(-child->occupancy().total_space()); } +bool +region_group::execution_permitted() noexcept { + return do_for_each_parent(this, [] (auto rg) { + return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no; + }) == nullptr; +} + +future<> +region_group::start_releaser() { + return later().then([this] { + return repeat([this] () noexcept { + if (_shutdown_requested) { + return make_ready_future(stop_iteration::yes); + } + + if (!_blocked_requests.empty() && execution_permitted()) { + auto req = std::move(_blocked_requests.front()); + _blocked_requests.pop_front(); + req->allocate(); + return make_ready_future(stop_iteration::no); + } else { + // Block reclaiming to prevent signal() from being called by reclaimer inside wait() + // FIXME: handle allocation failures (not very likely) like allocating_section does + tracker_reclaimer_lock rl; + return _relief.wait().then([] { + return stop_iteration::no; + }); + } + }); + }); +} + +region_group::region_group(region_group *parent, region_group_reclaimer& reclaimer) + : _parent(parent) + , _reclaimer(reclaimer) + , _releaser(reclaimer_can_block() ? start_releaser() : make_ready_future<>()) +{ + if (_parent) { + _parent->add(this); + } +} + +bool region_group::reclaimer_can_block() const { + return _reclaimer.throttle_threshold() != std::numeric_limits::max(); +} + +void region_group::notify_relief() { + _relief.signal(); + for (region_group* child : _subgroups) { + child->notify_relief(); + } +} + +void region_group::update(ssize_t delta) { + // Most-enclosing group which was relieved. + region_group* top_relief = nullptr; + + do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable { + rg->update_maximal_rg(); + rg->_total_memory += delta; + + if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) { + rg->_reclaimer.notify_soft_pressure(); + } else { + rg->_reclaimer.notify_soft_relief(); + } + + if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) { + rg->_reclaimer.notify_pressure(); + } else if (rg->_reclaimer.under_pressure()) { + rg->_reclaimer.notify_relief(); + top_relief = rg; + } + + return stop_iteration::no; + }); + + if (top_relief) { + top_relief->notify_relief(); + } +} + allocating_section::guard::guard() : _prev(shard_segment_pool.emergency_reserve_max()) { } diff --git a/utils/logalloc.hh b/utils/logalloc.hh index ad7245ddbc..d26798876b 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -65,8 +65,20 @@ protected: size_t _soft_limit; bool _under_pressure = false; bool _under_soft_pressure = false; - virtual void start_reclaiming() {} - virtual void stop_reclaiming() {} + // The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming(): + // + // - must not use any region or region_group objects, because they're invoked synchronously + // with operations on those. + // + // - must be noexcept, because they're called on the free path. + // + // - the implementation may be called synchronously with any operation + // which allocates memory, because these are called by memory reclaimer. + // In particular, the implementation should not depend on memory allocation + // because that may fail when in reclaiming context. + // + virtual void start_reclaiming() noexcept {} + virtual void stop_reclaiming() noexcept {} public: bool under_pressure() const { return _under_pressure; @@ -76,32 +88,26 @@ public: return _under_soft_pressure; } - void notify_soft_pressure() { + void notify_soft_pressure() noexcept { if (!_under_soft_pressure) { _under_soft_pressure = true; start_reclaiming(); } } - void notify_soft_relief() { + void notify_soft_relief() noexcept { if (_under_soft_pressure) { _under_soft_pressure = false; stop_reclaiming(); } } - void notify_pressure() { - if (!_under_pressure) { - _under_pressure = true; - start_reclaiming(); - } + void notify_pressure() noexcept { + _under_pressure = true; } - void notify_relief() { - if (_under_pressure) { - _under_pressure = false; - stop_reclaiming(); - } + void notify_relief() noexcept { + _under_pressure = false; } region_group_reclaimer() @@ -109,7 +115,9 @@ public: region_group_reclaimer(size_t threshold) : _threshold(threshold), _soft_limit(threshold) {} region_group_reclaimer(size_t threshold, size_t soft) - : _threshold(threshold), _soft_limit(soft) {} + : _threshold(threshold), _soft_limit(soft) { + assert(_soft_limit <= _threshold); + } virtual ~region_group_reclaimer() {} @@ -230,9 +238,13 @@ class region_group { // a different ancestor) std::experimental::optional> _descendant_blocked_requests = {}; - region_group* _waiting_on_ancestor = nullptr; - seastar::gate _asynchronous_gate; + condition_variable _relief; + future<> _releaser; bool _shutdown_requested = false; + + bool reclaimer_can_block() const; + future<> start_releaser(); + void notify_relief(); public: // When creating a region_group, one can specify an optional throttle_threshold parameter. This // parameter won't affect normal allocations, but an API is provided, through the region_group's @@ -240,17 +252,13 @@ public: // the total memory for the region group (and all of its parents) is lower or equal to the // region_group's throttle_treshold (and respectively for its parents). region_group(region_group_reclaimer& reclaimer = no_reclaimer) : region_group(nullptr, reclaimer) {} - region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer) : _parent(parent), _reclaimer(reclaimer) { - if (_parent) { - _parent->add(this); - } - } + region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer); region_group(region_group&& o) = delete; region_group(const region_group&) = delete; ~region_group() { // If we set a throttle threshold, we'd be postponing many operations. So shutdown must be // called. - if (_reclaimer.throttle_threshold() != std::numeric_limits::max()) { + if (reclaimer_can_block()) { assert(_shutdown_requested); } if (_parent) { @@ -262,24 +270,7 @@ public: size_t memory_used() const { return _total_memory; } - void update(ssize_t delta) { - do_for_each_parent(this, [delta] (auto rg) mutable { - rg->update_maximal_rg(); - rg->_total_memory += delta; - // It is okay to call release_requests for a region_group that can't allow execution. - // But that can generate various spurious messages to groups waiting on us that will be - // then woken up just so they can go to wait again. So let's filter that. - if (rg->execution_permitted()) { - rg->release_requests(); - } - if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) { - rg->_reclaimer.notify_soft_pressure(); - } else if (rg->_total_memory < rg->_reclaimer.soft_limit_threshold()) { - rg->_reclaimer.notify_soft_relief(); - } - return stop_iteration::no; - }); - } + void update(ssize_t delta); // It would be easier to call update, but it is unfortunately broken in boost versions up to at // least 1.59. @@ -325,36 +316,18 @@ public: using futurator = futurize>; auto blocked_at = do_for_each_parent(this, [] (auto rg) { - return (rg->_blocked_requests.empty() && rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes; + return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes; }); if (!blocked_at) { return futurator::apply(func); } - subscribe_for_ancestor_available_memory_notification(blocked_at); auto fn = std::make_unique>(std::forward(func)); auto fut = fn->get_future(); _blocked_requests.push_back(std::move(fn), timeout); ++_blocked_requests_counter; - // This is called here, and not at update(), for two reasons: the first, is that things that - // are done during the free() path should be done carefuly, in the sense that they can - // trigger another update call and put us in a loop. Not to mention we would like to keep - // those from having exceptions. We solve that for release_requests by using later(), but in - // here we can do away with that need altogether. - // - // Second and most important, until we actually block a request, the pressure condition may - // very well be transient. There are opportunities for compactions, the condition can go - // away on its own, etc. - // - // The reason we check execution permitted(), is that we'll still block requests if we have - // free memory but existing requests in the queue. That is so we can keep our FIFO ordering - // guarantee. So we need to distinguish here the case in which we're blocking merely to - // serialize requests, so that the caller does not evict more than it should. - if (!blocked_at->execution_permitted()) { - blocked_at->_reclaimer.notify_pressure(); - } return fut; } @@ -364,9 +337,11 @@ public: region* get_largest_region(); // Shutdown is mandatory for every user who has set a threshold + // Can be called at most once. future<> shutdown() { _shutdown_requested = true; - return _asynchronous_gate.close(); + _relief.signal(); + return std::move(_releaser); } size_t blocked_requests() { @@ -377,43 +352,9 @@ public: return _blocked_requests_counter; } private: - // Make sure we get a notification and can call release_requests when one of our ancestors that - // used to block us is no longer under memory pressure. - void subscribe_for_ancestor_available_memory_notification(region_group *ancestor) { - if ((this == ancestor) || (_waiting_on_ancestor)) { - return; // already subscribed, or no need to - } - - _waiting_on_ancestor = ancestor; - - with_gate(_asynchronous_gate, [this] { - // We reevaluate _waiting_on_ancestor here so we make sure there is no deferring point - // between determining the ancestor and registering with it for a notification. We start - // with _waiting_on_ancestor set to the initial value, and after we are notified, we - // will set _waiting_on_ancestor to nullptr to force this lambda to reevaluate it. - auto evaluate_ancestor_and_stop = [this] { - if (!_waiting_on_ancestor) { - auto new_blocking_point = do_for_each_parent(this, [] (auto rg) { - return (rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes; - }); - if (!new_blocking_point) { - release_requests(); - } - _waiting_on_ancestor = (new_blocking_point == this) ? nullptr : new_blocking_point; - } - return _waiting_on_ancestor == nullptr; - }; - - return do_until(evaluate_ancestor_and_stop, [this] { - if (!_waiting_on_ancestor->_descendant_blocked_requests) { - _waiting_on_ancestor->_descendant_blocked_requests = shared_promise<>(); - } - return _waiting_on_ancestor->_descendant_blocked_requests->get_shared_future().then([this] { - _waiting_on_ancestor = nullptr; - }); - }); - }); - } + // Returns true if and only if constraints of this group are not violated. + // That's taking into account any constraints imposed by enclosing (parent) groups. + bool execution_permitted() noexcept; // Executes the function func for each region_group upwards in the hierarchy, starting with the // parameter node. The function func may return stop_iteration::no, in which case it proceeds to @@ -433,11 +374,10 @@ private: } return nullptr; } - inline bool execution_permitted() const { - return _total_memory <= _reclaimer.throttle_threshold(); - } - void release_requests() noexcept; + inline bool under_pressure() const { + return _reclaimer.under_pressure(); + } uint64_t top_region_evictable_space() const;