diff --git a/memtable.cc b/memtable.cc index f9d0dc57aa..4fae971747 100644 --- a/memtable.cc +++ b/memtable.cc @@ -21,6 +21,8 @@ memtable::~memtable() { mutation_partition& memtable::find_or_create_partition_slow(partition_key_view key) { + assert(!_region.compaction_enabled()); + // FIXME: Perform lookup using std::pair // to avoid unconditional copy of the partition key. // We can't do it right now because std::map<> which holds @@ -37,6 +39,8 @@ memtable::find_or_create_partition_slow(partition_key_view key) { mutation_partition& memtable::find_or_create_partition(const dht::decorated_key& key) { + assert(!_region.compaction_enabled()); + // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key, partition_entry::compare(_schema)); if (i == partitions.end() || !key.equal(*_schema, i->key())) { @@ -119,6 +123,7 @@ public: { } virtual future operator()() override { + logalloc::compaction_lock _(_memtable->_region); update_iterators(); if (_i == _end) { return make_ready_future(stdx::nullopt); @@ -140,6 +145,7 @@ memtable::make_reader(const query::partition_range& range) const { const query::ring_position& pos = range.start()->value(); auto i = partitions.find(pos, partition_entry::compare(_schema)); if (i != partitions.end()) { + logalloc::compaction_lock _(_region); return make_reader_returning(mutation(_schema, i->key(), i->partition())); } else { return make_empty_reader(); @@ -159,6 +165,7 @@ memtable::update(const db::replay_position& rp) { void memtable::apply(const mutation& m, const db::replay_position& rp) { with_allocator(_region.allocator(), [this, &m] { + logalloc::compaction_lock _(_region); mutation_partition& p = find_or_create_partition(m.decorated_key()); p.apply(*_schema, m.partition()); }); @@ -168,6 +175,7 @@ memtable::apply(const mutation& m, const db::replay_position& rp) { void memtable::apply(const frozen_mutation& m, const db::replay_position& rp) { with_allocator(_region.allocator(), [this, &m] { + logalloc::compaction_lock _(_region); mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); p.apply(*_schema, m.partition()); }); diff --git a/memtable.hh b/memtable.hh index 9f78315ec2..3d4b0fa52a 100644 --- a/memtable.hh +++ b/memtable.hh @@ -74,7 +74,7 @@ public: bi::compare>; private: schema_ptr _schema; - logalloc::region _region; + mutable logalloc::region _region; partitions_type partitions; db::replay_position _replay_position; void update(const db::replay_position&); diff --git a/row_cache.cc b/row_cache.cc index 2f912dd20d..d5171e594e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -122,6 +122,7 @@ row_cache::make_reader(const query::partition_range& range) { cache_entry& e = *i; _tracker.touch(e); ++_stats.hits; + logalloc::compaction_lock lock(_tracker.region()); return make_reader_returning(mutation(_schema, dk, e.partition())); } else { ++_stats.misses; @@ -141,6 +142,7 @@ row_cache::~row_cache() { void row_cache::populate(const mutation& m) { with_allocator(_tracker.allocator(), [this, &m] { + logalloc::compaction_lock _(_tracker.region()); auto i = _partitions.lower_bound(m.decorated_key(), cache_entry::compare(_schema)); if (i == _partitions.end() || !i->key().equal(*_schema, m.decorated_key())) { cache_entry* entry = current_allocator().construct(m.decorated_key(), m.partition()); @@ -158,6 +160,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec _tracker.region().merge(m._region); // Now all data in memtable belongs to cache return repeat([this, &m, presence_checker = std::move(presence_checker)] () mutable { return with_allocator(_tracker.allocator(), [this, &m, &presence_checker] () { + logalloc::compaction_lock _(_tracker.region()); unsigned quota = 30; auto i = m.partitions.begin(); const schema& s = *m.schema(); diff --git a/utils/allocation_strategy.hh b/utils/allocation_strategy.hh index cbb0845743..1953ff7cd3 100644 --- a/utils/allocation_strategy.hh +++ b/utils/allocation_strategy.hh @@ -23,8 +23,8 @@ void standard_migrator(void* src, void* dst, size_t) noexcept { // // Managed objects may be moved by the allocator during compaction, which // invalidates any references to those objects. Compaction may be started -// asynchronously by the reclaimer or invoked explicitly from code. Compaction -// never happens synchronously with allocation/deallocation. +// synchronously with allocations. To ensure that references remain valid, use +// logalloc::compaction_lock. // // Because references may get invalidated, managing allocators can't be used // with standard containers, because they assume the reference is valid until freed. @@ -58,12 +58,12 @@ public: // // Throws std::bad_alloc on allocation failure. // - // Doesn't invalidate references to allocated objects. + // Doesn't invalidate references to objects allocated with this strategy. // virtual void* alloc(migrate_fn, size_t size, size_t alignment) = 0; // Releases storage for the object. Doesn't invoke object's destructor. - // Doesn't invalidate references to allocated objects. + // Doesn't invalidate references to objects allocated with this strategy. virtual void free(void*) = 0; // Like alloc() but also constructs the object with a migrator using diff --git a/utils/logalloc.cc b/utils/logalloc.cc index f31fc3888e..d9c5b17f22 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -26,7 +26,24 @@ static thread_local tracker tracker_instance; class tracker::impl { std::vector _regions; scollectd::registrations _collectd_registrations; + bool _reclaiming_enabled = true; private: + // Prevents tracker's reclaimer from running while live. Reclaimer may be + // invoked synchronously with allocator. This guard ensures that this + // object is not re-entered while inside one of the tracker's methods. + struct reclaiming_lock { + impl& _ref; + bool _prev; + reclaiming_lock(impl& ref) + : _ref(ref) + , _prev(ref._reclaiming_enabled) + { + _ref._reclaiming_enabled = false; + } + ~reclaiming_lock() { + _ref._reclaiming_enabled = _prev; + } + }; void register_collectd_metrics(); public: impl() { @@ -39,16 +56,16 @@ public: void unregister_region(region::impl*); size_t reclaim(size_t bytes); void full_compaction(); - occupancy_stats occupancy() const; + occupancy_stats occupancy(); }; tracker::tracker() : _impl(std::make_unique()) - , _reclaimer([this] { - reclaim(10*1024*1024); - // FIXME: be accurate - return memory::reclaiming_result::reclaimed_something; - }) + , _reclaimer([this] () { + return reclaim(10*1024*1024) + ? memory::reclaiming_result::reclaimed_something + : memory::reclaiming_result::reclaimed_nothing; + }, memory::reclaimer_scope::sync) { } tracker::~tracker() { @@ -58,7 +75,7 @@ size_t tracker::reclaim(size_t bytes) { return _impl->reclaim(bytes); } -occupancy_stats tracker::occupancy() const { +occupancy_stats tracker::occupancy() { return _impl->occupancy(); } @@ -508,6 +525,19 @@ private: uint64_t _compaction_counter = 0; eviction_fn _eviction_fn; private: + struct compaction_lock { + region_impl& _region; + bool _prev; + compaction_lock(region_impl& r) + : _region(r) + , _prev(r._compaction_enabled) + { + _region._compaction_enabled = false; + } + ~compaction_lock() { + _region._compaction_enabled = _prev; + } + }; void* alloc_small(allocation_strategy::migrate_fn migrator, segment::size_type size, size_t alignment) { assert(alignment < obj_flags::max_alignment); @@ -678,6 +708,7 @@ public: } virtual void* alloc(allocation_strategy::migrate_fn migrator, size_t size, size_t alignment) override { + compaction_lock _(*this); if (size > max_managed_object_size) { return standard_allocator().alloc(migrator, size, alignment); } else { @@ -686,6 +717,7 @@ public: } virtual void free(void* obj) override { + compaction_lock _(*this); segment* seg = shard_segment_pool.containing_segment(obj); if (!seg) { @@ -718,6 +750,8 @@ public: // Merges another region into this region. The other region is left empty. // Doesn't invalidate references to allocated objects. void merge(region_impl& other) { + compaction_lock dct1(*this); + compaction_lock dct2(other); degroup_temporarily dgt1(this); degroup_temporarily dgt2(&other); @@ -753,6 +787,8 @@ public: return; } + compaction_lock _(*this); + auto in_use = shard_segment_pool.segments_in_use(); while (shard_segment_pool.segments_in_use() >= in_use) { @@ -767,6 +803,7 @@ public: // Compacts everything. Mainly for testing. // Invalidates references to allocated objects. void full_compaction() { + compaction_lock _(*this); logger.debug("Full compaction, {}", occupancy()); close_and_open(); segment_heap all; @@ -870,7 +907,8 @@ std::ostream& operator<<(std::ostream& out, const occupancy_stats& stats) { stats.used_fraction() * 100, stats.used_space(), stats.total_space()); } -occupancy_stats tracker::impl::occupancy() const { +occupancy_stats tracker::impl::occupancy() { + reclaiming_lock _(*this); occupancy_stats total{}; for (auto&& r: _regions) { total += r->occupancy(); @@ -879,6 +917,8 @@ occupancy_stats tracker::impl::occupancy() const { } void tracker::impl::full_compaction() { + reclaiming_lock _(*this); + logger.debug("Full compaction on all regions, {}", occupancy()); for (region_impl* r : _regions) { @@ -937,6 +977,19 @@ size_t tracker::impl::reclaim(size_t bytes) { // When compaction is not sufficient to reclaim space, we evict data from // evictable regions. // + + // This may run synchronously with allocation, so we should not allocate + // memory, otherwise we may get std::bad_alloc. Currently we only allocate + // in the logger when debug level is enabled. It's disabled during normal + // operation. Having it is still valuable during testing and in most cases + // should work just fine even if allocates. + + if (!_reclaiming_enabled) { + return 0; + } + + reclaiming_lock _(*this); + size_t in_use = shard_segment_pool.segments_in_use(); constexpr auto max_bytes = std::numeric_limits::max() - segment::size; @@ -1002,11 +1055,13 @@ size_t tracker::impl::reclaim(size_t bytes) { } void tracker::impl::register_region(region::impl* r) { + reclaiming_lock _(*this); _regions.push_back(r); logger.debug("Registered region @{} with id={}", r, r->id()); } void tracker::impl::unregister_region(region::impl* r) { + reclaiming_lock _(*this); logger.debug("Unregistering region, id={}", r->id()); _regions.erase(std::remove(_regions.begin(), _regions.end(), r)); } diff --git a/utils/logalloc.hh b/utils/logalloc.hh index 2214fdf649..729510a2ff 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -92,7 +92,7 @@ public: void full_compaction(); // Returns aggregate statistics for all pools. - occupancy_stats occupancy() const; + occupancy_stats occupancy(); }; tracker& shard_tracker();