From d20fae96a2da5d0fe381c91c42be6253503eb795 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 27 Aug 2015 16:57:50 +0200 Subject: [PATCH] lsa: Make reclaimer run synchronously with allocations The goal is to make allocation less likely to fail. With async reclaimer there is an implicit bound on the amount of memory that can be allocated between deferring points. This bound is difficult to enforce though. Sync reclaimer lifts this limitation off. Also, allocations which could not be satisfied before because of fragmentation now will have higher chances of succeeding, although depending on how much memory is fragmented, that could involve evicting a lot of segments from cache, so we should still avoid them. Downside of sync reclaiming is that now references into regions may be invalidated not only across deferring points but at any allocation site. compaction_lock can be used to pin data, preferably just temporarily. --- memtable.cc | 8 ++++ memtable.hh | 2 +- row_cache.cc | 3 ++ utils/allocation_strategy.hh | 8 ++-- utils/logalloc.cc | 71 ++++++++++++++++++++++++++++++++---- utils/logalloc.hh | 2 +- 6 files changed, 80 insertions(+), 14 deletions(-) diff --git a/memtable.cc b/memtable.cc index f9d0dc57aa..4fae971747 100644 --- a/memtable.cc +++ b/memtable.cc @@ -21,6 +21,8 @@ memtable::~memtable() { mutation_partition& memtable::find_or_create_partition_slow(partition_key_view key) { + assert(!_region.compaction_enabled()); + // FIXME: Perform lookup using std::pair // to avoid unconditional copy of the partition key. // We can't do it right now because std::map<> which holds @@ -37,6 +39,8 @@ memtable::find_or_create_partition_slow(partition_key_view key) { mutation_partition& memtable::find_or_create_partition(const dht::decorated_key& key) { + assert(!_region.compaction_enabled()); + // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key, partition_entry::compare(_schema)); if (i == partitions.end() || !key.equal(*_schema, i->key())) { @@ -119,6 +123,7 @@ public: { } virtual future operator()() override { + logalloc::compaction_lock _(_memtable->_region); update_iterators(); if (_i == _end) { return make_ready_future(stdx::nullopt); @@ -140,6 +145,7 @@ memtable::make_reader(const query::partition_range& range) const { const query::ring_position& pos = range.start()->value(); auto i = partitions.find(pos, partition_entry::compare(_schema)); if (i != partitions.end()) { + logalloc::compaction_lock _(_region); return make_reader_returning(mutation(_schema, i->key(), i->partition())); } else { return make_empty_reader(); @@ -159,6 +165,7 @@ memtable::update(const db::replay_position& rp) { void memtable::apply(const mutation& m, const db::replay_position& rp) { with_allocator(_region.allocator(), [this, &m] { + logalloc::compaction_lock _(_region); mutation_partition& p = find_or_create_partition(m.decorated_key()); p.apply(*_schema, m.partition()); }); @@ -168,6 +175,7 @@ memtable::apply(const mutation& m, const db::replay_position& rp) { void memtable::apply(const frozen_mutation& m, const db::replay_position& rp) { with_allocator(_region.allocator(), [this, &m] { + logalloc::compaction_lock _(_region); mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); p.apply(*_schema, m.partition()); }); diff --git a/memtable.hh b/memtable.hh index 9f78315ec2..3d4b0fa52a 100644 --- a/memtable.hh +++ b/memtable.hh @@ -74,7 +74,7 @@ public: bi::compare>; private: schema_ptr _schema; - logalloc::region _region; + mutable logalloc::region _region; partitions_type partitions; db::replay_position _replay_position; void update(const db::replay_position&); diff --git a/row_cache.cc b/row_cache.cc index 2f912dd20d..d5171e594e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -122,6 +122,7 @@ row_cache::make_reader(const query::partition_range& range) { cache_entry& e = *i; _tracker.touch(e); ++_stats.hits; + logalloc::compaction_lock lock(_tracker.region()); return make_reader_returning(mutation(_schema, dk, e.partition())); } else { ++_stats.misses; @@ -141,6 +142,7 @@ row_cache::~row_cache() { void row_cache::populate(const mutation& m) { with_allocator(_tracker.allocator(), [this, &m] { + logalloc::compaction_lock _(_tracker.region()); auto i = _partitions.lower_bound(m.decorated_key(), cache_entry::compare(_schema)); if (i == _partitions.end() || !i->key().equal(*_schema, m.decorated_key())) { cache_entry* entry = current_allocator().construct(m.decorated_key(), m.partition()); @@ -158,6 +160,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec _tracker.region().merge(m._region); // Now all data in memtable belongs to cache return repeat([this, &m, presence_checker = std::move(presence_checker)] () mutable { return with_allocator(_tracker.allocator(), [this, &m, &presence_checker] () { + logalloc::compaction_lock _(_tracker.region()); unsigned quota = 30; auto i = m.partitions.begin(); const schema& s = *m.schema(); diff --git a/utils/allocation_strategy.hh b/utils/allocation_strategy.hh index cbb0845743..1953ff7cd3 100644 --- a/utils/allocation_strategy.hh +++ b/utils/allocation_strategy.hh @@ -23,8 +23,8 @@ void standard_migrator(void* src, void* dst, size_t) noexcept { // // Managed objects may be moved by the allocator during compaction, which // invalidates any references to those objects. Compaction may be started -// asynchronously by the reclaimer or invoked explicitly from code. Compaction -// never happens synchronously with allocation/deallocation. +// synchronously with allocations. To ensure that references remain valid, use +// logalloc::compaction_lock. // // Because references may get invalidated, managing allocators can't be used // with standard containers, because they assume the reference is valid until freed. @@ -58,12 +58,12 @@ public: // // Throws std::bad_alloc on allocation failure. // - // Doesn't invalidate references to allocated objects. + // Doesn't invalidate references to objects allocated with this strategy. // virtual void* alloc(migrate_fn, size_t size, size_t alignment) = 0; // Releases storage for the object. Doesn't invoke object's destructor. - // Doesn't invalidate references to allocated objects. + // Doesn't invalidate references to objects allocated with this strategy. virtual void free(void*) = 0; // Like alloc() but also constructs the object with a migrator using diff --git a/utils/logalloc.cc b/utils/logalloc.cc index f31fc3888e..d9c5b17f22 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -26,7 +26,24 @@ static thread_local tracker tracker_instance; class tracker::impl { std::vector _regions; scollectd::registrations _collectd_registrations; + bool _reclaiming_enabled = true; private: + // Prevents tracker's reclaimer from running while live. Reclaimer may be + // invoked synchronously with allocator. This guard ensures that this + // object is not re-entered while inside one of the tracker's methods. + struct reclaiming_lock { + impl& _ref; + bool _prev; + reclaiming_lock(impl& ref) + : _ref(ref) + , _prev(ref._reclaiming_enabled) + { + _ref._reclaiming_enabled = false; + } + ~reclaiming_lock() { + _ref._reclaiming_enabled = _prev; + } + }; void register_collectd_metrics(); public: impl() { @@ -39,16 +56,16 @@ public: void unregister_region(region::impl*); size_t reclaim(size_t bytes); void full_compaction(); - occupancy_stats occupancy() const; + occupancy_stats occupancy(); }; tracker::tracker() : _impl(std::make_unique()) - , _reclaimer([this] { - reclaim(10*1024*1024); - // FIXME: be accurate - return memory::reclaiming_result::reclaimed_something; - }) + , _reclaimer([this] () { + return reclaim(10*1024*1024) + ? memory::reclaiming_result::reclaimed_something + : memory::reclaiming_result::reclaimed_nothing; + }, memory::reclaimer_scope::sync) { } tracker::~tracker() { @@ -58,7 +75,7 @@ size_t tracker::reclaim(size_t bytes) { return _impl->reclaim(bytes); } -occupancy_stats tracker::occupancy() const { +occupancy_stats tracker::occupancy() { return _impl->occupancy(); } @@ -508,6 +525,19 @@ private: uint64_t _compaction_counter = 0; eviction_fn _eviction_fn; private: + struct compaction_lock { + region_impl& _region; + bool _prev; + compaction_lock(region_impl& r) + : _region(r) + , _prev(r._compaction_enabled) + { + _region._compaction_enabled = false; + } + ~compaction_lock() { + _region._compaction_enabled = _prev; + } + }; void* alloc_small(allocation_strategy::migrate_fn migrator, segment::size_type size, size_t alignment) { assert(alignment < obj_flags::max_alignment); @@ -678,6 +708,7 @@ public: } virtual void* alloc(allocation_strategy::migrate_fn migrator, size_t size, size_t alignment) override { + compaction_lock _(*this); if (size > max_managed_object_size) { return standard_allocator().alloc(migrator, size, alignment); } else { @@ -686,6 +717,7 @@ public: } virtual void free(void* obj) override { + compaction_lock _(*this); segment* seg = shard_segment_pool.containing_segment(obj); if (!seg) { @@ -718,6 +750,8 @@ public: // Merges another region into this region. The other region is left empty. // Doesn't invalidate references to allocated objects. void merge(region_impl& other) { + compaction_lock dct1(*this); + compaction_lock dct2(other); degroup_temporarily dgt1(this); degroup_temporarily dgt2(&other); @@ -753,6 +787,8 @@ public: return; } + compaction_lock _(*this); + auto in_use = shard_segment_pool.segments_in_use(); while (shard_segment_pool.segments_in_use() >= in_use) { @@ -767,6 +803,7 @@ public: // Compacts everything. Mainly for testing. // Invalidates references to allocated objects. void full_compaction() { + compaction_lock _(*this); logger.debug("Full compaction, {}", occupancy()); close_and_open(); segment_heap all; @@ -870,7 +907,8 @@ std::ostream& operator<<(std::ostream& out, const occupancy_stats& stats) { stats.used_fraction() * 100, stats.used_space(), stats.total_space()); } -occupancy_stats tracker::impl::occupancy() const { +occupancy_stats tracker::impl::occupancy() { + reclaiming_lock _(*this); occupancy_stats total{}; for (auto&& r: _regions) { total += r->occupancy(); @@ -879,6 +917,8 @@ occupancy_stats tracker::impl::occupancy() const { } void tracker::impl::full_compaction() { + reclaiming_lock _(*this); + logger.debug("Full compaction on all regions, {}", occupancy()); for (region_impl* r : _regions) { @@ -937,6 +977,19 @@ size_t tracker::impl::reclaim(size_t bytes) { // When compaction is not sufficient to reclaim space, we evict data from // evictable regions. // + + // This may run synchronously with allocation, so we should not allocate + // memory, otherwise we may get std::bad_alloc. Currently we only allocate + // in the logger when debug level is enabled. It's disabled during normal + // operation. Having it is still valuable during testing and in most cases + // should work just fine even if allocates. + + if (!_reclaiming_enabled) { + return 0; + } + + reclaiming_lock _(*this); + size_t in_use = shard_segment_pool.segments_in_use(); constexpr auto max_bytes = std::numeric_limits::max() - segment::size; @@ -1002,11 +1055,13 @@ size_t tracker::impl::reclaim(size_t bytes) { } void tracker::impl::register_region(region::impl* r) { + reclaiming_lock _(*this); _regions.push_back(r); logger.debug("Registered region @{} with id={}", r, r->id()); } void tracker::impl::unregister_region(region::impl* r) { + reclaiming_lock _(*this); logger.debug("Unregistering region, id={}", r->id()); _regions.erase(std::remove(_regions.begin(), _regions.end(), r)); } diff --git a/utils/logalloc.hh b/utils/logalloc.hh index 2214fdf649..729510a2ff 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -92,7 +92,7 @@ public: void full_compaction(); // Returns aggregate statistics for all pools. - occupancy_stats occupancy() const; + occupancy_stats occupancy(); }; tracker& shard_tracker();