diff --git a/database.cc b/database.cc index fbc176ac31..a5e3e56d9d 100644 --- a/database.cc +++ b/database.cc @@ -92,21 +92,21 @@ lw_shared_ptr column_family::make_memory_only_memtable_list() { auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_streaming_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); } column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) @@ -1329,14 +1329,18 @@ database::database(const db::config& cfg) return memtable_total_space; }()) , _streaming_memtable_total_space(_memtable_total_space / 4) - , _streaming_dirty_memory_region_group(&_dirty_memory_region_group) + // Allow system tables a pool of 10 MB extra memory to write over the threshold. Under normal + // circumnstances it won't matter, but when we throttle, some system requests will be able to + // keep being serviced even if user requests are not. + // + // Note that even if we didn't allow extra memory, we would still want to keep system requests + // in a different region group. This is because throttled requests are serviced in FIFO order, + // and we don't want system requests to be waiting for a long time behind user requests. + , _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20)) + , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space) + , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) - , _memtables_throttler(_memtable_total_space, _dirty_memory_region_group) - , _streaming_throttler(_streaming_memtable_total_space, - _streaming_dirty_memory_region_group, - &_memtables_throttler - ) { _compaction_manager.start(); setup_collectd(); @@ -1351,7 +1355,7 @@ database::setup_collectd() { , scollectd::per_cpu_plugin_instance , "bytes", "dirty") , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { - return _dirty_memory_region_group.memory_used(); + return dirty_memory_region_group().memory_used(); }))); _collectd.push_back( @@ -1780,8 +1784,8 @@ keyspace::make_column_family_config(const schema& s) const { cfg.enable_cache = _config.enable_cache; cfg.max_memtable_size = _config.max_memtable_size; cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; - cfg.dirty_memory_region_group = _config.dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = _config.dirty_memory_manager; + cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager; cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2109,14 +2113,64 @@ column_family::check_valid_rp(const db::replay_position& rp) const { } } -future<> database::apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { - try { - auto& cf = find_column_family(m.column_family_id()); - cf.apply(m, m_schema, rp); - } catch (no_such_column_family&) { - dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); +future<> dirty_memory_manager::shutdown() { + _db_shutdown_requested = true; + return _waiting_flush_gate.close().then([this] { + return _region_group.shutdown(); + }); +} + +void dirty_memory_manager::maybe_do_active_flush() { + if (!under_pressure() || _db_shutdown_requested) { + return; } - return make_ready_future<>(); + + // Flush already ongoing. We don't need to initiate an active flush at this moment. + if (_flush_serializer.current() != _concurrency) { + return; + } + + // There are many criteria that can be used to select what is the best memtable to + // flush. Most of the time we want some coordination with the commitlog to allow us to + // release commitlog segments as early as we can. + // + // But during pressure condition, we'll just pick the CF that holds the largest + // memtable. The advantage of doing this is that this is objectively the one that will + // release the biggest amount of memory and is less likely to be generating tiny + // SSTables. The disadvantage is that right now, because we only release memory when the + // SSTable is fully written, that may take a bit of time to happen. + // + // However, since we'll very soon have a mechanism in place to account for the memory + // that was already written in one form or another, that disadvantage is mitigated. + memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region()); + auto& biggest_cf = _db.find_column_family(biggest_memtable.schema()); + memtable_list& mtlist = get_memtable_list(biggest_cf); + // Please note that this will eventually take the semaphore and prevent two concurrent flushes. + // We don't need any other extra protection. + mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate); +} + +memtable_list& memtable_dirty_memory_manager::get_memtable_list(column_family& cf) { + return *(cf._memtables); +} + +memtable_list& streaming_dirty_memory_manager::get_memtable_list(column_family& cf) { + return *(cf._streaming_memtables); +} + +void dirty_memory_manager::start_reclaiming() { + maybe_do_active_flush(); +} + +future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { + return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { + try { + auto& cf = find_column_family(m.column_family_id()); + cf.apply(m, m_schema, rp); + } catch (no_such_column_family&) { + dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); + } + }); } future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { @@ -2148,43 +2202,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { return apply_in_memory(m, s, db::replay_position()); } -future<> throttle_state::throttle() { - if (!should_throttle() && _throttled_requests.empty()) { - // All is well, go ahead - return make_ready_future<>(); - } - // We must throttle, wait a bit - if (_throttled_requests.empty()) { - _throttling_timer.arm_periodic(10ms); - } - _throttled_requests.emplace_back(); - return _throttled_requests.back().get_future(); -} - -void throttle_state::unthrottle() { - // Release one request per free 1MB we have - // FIXME: improve this - if (should_throttle()) { - return; - } - size_t avail = std::max((_max_space - _region_group.memory_used()) >> 20, size_t(1)); - avail = std::min(_throttled_requests.size(), avail); - for (size_t i = 0; i < avail; ++i) { - _throttled_requests.front().set_value(); - _throttled_requests.pop_front(); - } - if (_throttled_requests.empty()) { - _throttling_timer.cancel(); - } -} - future<> database::apply(schema_ptr s, const frozen_mutation& m) { if (dblog.is_enabled(logging::log_level::trace)) { dblog.trace("apply {}", m.pretty_printer(s)); } - return _memtables_throttler.throttle().then([this, &m, s = std::move(s)] { - return do_apply(std::move(s), m); - }).then([this, s = _stats] { + return do_apply(std::move(s), m).then([this, s = _stats] { ++s->total_writes; }); } @@ -2194,23 +2216,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - - // TODO (maybe): This will use the same memory region group as memtables, so when - // one of them throttles, both will. - // - // It would be possible to provide further QoS for CQL originated memtables - // by keeping the streaming memtables into a different region group, with its own - // separate limit. - // - // Because, however, there are many other limits in play that may kick in, - // I am not convinced that this will ever be a problem. - // - // If we do find ourselves in the situation that we are throttling incoming - // writes due to high level of streaming writes, and we are sure that this - // is the best solution, we can just change the memtable creation method so - // that each kind of memtable creates from a different region group - and then - // update the throttle conditions accordingly. - return _streaming_throttler.throttle().then([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); cf.apply_streaming_mutation(s, std::move(m)); @@ -2242,8 +2248,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { // All writes should go to the main memtable list if we're not durable cfg.max_streaming_memtable_size = 0; } - cfg.dirty_memory_region_group = &_dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = &_dirty_memory_manager; + cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager; cfg.read_concurrency_config.sem = &_read_concurrency_sem; cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. @@ -2316,6 +2322,12 @@ database::stop() { return parallel_for_each(_column_families, [this] (auto& val_pair) { return val_pair.second->stop(); }); + }).then([this] { + return _system_dirty_memory_manager.shutdown(); + }).then([this] { + return _dirty_memory_manager.shutdown(); + }).then([this] { + return _streaming_dirty_memory_manager.shutdown(); }); } diff --git a/database.hh b/database.hh index 5c113fe08a..68e61159c6 100644 --- a/database.hh +++ b/database.hh @@ -99,35 +99,93 @@ void make(database& db, bool durable, bool volatile_testing_only); } } -class throttle_state { - size_t _max_space; - logalloc::region_group& _region_group; - throttle_state* _parent; +class replay_position_reordered_exception : public std::exception {}; - circular_buffer> _throttled_requests; - timer<> _throttling_timer{[this] { unthrottle(); }}; - void unthrottle(); - bool should_throttle() const { - if (_region_group.memory_used() > _max_space) { - return true; - } - if (_parent) { - return _parent->should_throttle(); - } - return false; - } +using shared_memtable = lw_shared_ptr; +class memtable_list; + +class dirty_memory_manager: public logalloc::region_group_reclaimer { + // We need a separate boolean, because from the LSA point of view, pressure may still be + // mounting, in which case the pressure flag could be set back on if we force it off. + bool _db_shutdown_requested = false; + + database& _db; + logalloc::region_group _region_group; + + // We would like to serialize the flushing of memtables. While flushing many memtables + // simultaneously can sustain high levels of throughput, the memory is not freed until the + // memtable is totally gone. That means that if we have throttled requests, they will stay + // throttled for a long time. Even when we have virtual dirty, that only provides a rough + // estimate, and we can't release requests that early. + // + // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind + // would take care of the rest. But that still has issues, so we'll limit parallelism to some + // number (4), that we will hopefully reduce to 1 when write behind works. + // + // When streaming is going on, we'll separate half of that for the streaming code, which + // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O + // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have + // to do for the moment. Hopefully we can set both to 1 soon (with write behind) + // + // FIXME: enable write behind and set both to 1. Right now we will take advantage of the fact + // that memtables and streaming will use different specialized classes here and set them as + // default values here. + size_t _concurrency; + semaphore _flush_serializer; + + seastar::gate _waiting_flush_gate; + std::vector _pending_flushes; + void maybe_do_active_flush(); +protected: + virtual memtable_list& get_memtable_list(column_family& cf) = 0; + virtual void start_reclaiming() override; public: - throttle_state(size_t max_space, logalloc::region_group& region, throttle_state* parent = nullptr) - : _max_space(max_space) - , _region_group(region) - , _parent(parent) - {} + future<> shutdown(); + dirty_memory_manager(database& db, size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _db(db) + , _region_group(*this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} - future<> throttle(); + dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _db(db) + , _region_group(&parent->_region_group, *this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} + logalloc::region_group& region_group() { + return _region_group; + } + + const logalloc::region_group& region_group() const { + return _region_group; + } + + template + future<> serialize_flush(Func&& func) { + return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable { + return with_semaphore(_flush_serializer, 1, func).finally([this] { + maybe_do_active_flush(); + }); + }); + } }; +class streaming_dirty_memory_manager: public dirty_memory_manager { + virtual memtable_list& get_memtable_list(column_family& cf) override; +public: + streaming_dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 2) {} +}; -class replay_position_reordered_exception : public std::exception {}; +class memtable_dirty_memory_manager: public dirty_memory_manager { + virtual memtable_list& get_memtable_list(column_family& cf) override; +public: + memtable_dirty_memory_manager(database& db, dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 4) {} + // This constructor will be called for the system tables (no parent). Its flushes are usually drive by us + // and not the user, and tend to be small in size. So we'll allow only two slots. + memtable_dirty_memory_manager(database& db, size_t threshold) : dirty_memory_manager(db, threshold, 2) {} +}; // We could just add all memtables, regardless of types, to a single list, and // then filter them out when we read them. Here's why I have chosen not to do @@ -151,21 +209,18 @@ class memtable_list { public: enum class flush_behavior { delayed, immediate }; private: - using shared_memtable = lw_shared_ptr; std::vector _memtables; std::function (flush_behavior)> _seal_fn; std::function _current_schema; size_t _max_memtable_size; - logalloc::region_group* _dirty_memory_region_group; - semaphore& _region_group_serializer; + dirty_memory_manager* _dirty_memory_manager; public: - memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem) + memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager) : _memtables({}) , _seal_fn(seal_fn) , _current_schema(cs) , _max_memtable_size(max_memtable_size) - , _dirty_memory_region_group(region_group) - , _region_group_serializer(sem) { + , _dirty_memory_manager(dirty_memory_manager) { add_memtable(); } @@ -191,11 +246,7 @@ public: if (behavior == flush_behavior::delayed) { return _seal_fn(behavior); } - return _region_group_serializer.wait().then([this] { - return _seal_fn(flush_behavior::immediate); - }).finally([this] { - _region_group_serializer.signal(); - }); + return _dirty_memory_manager->serialize_flush([this] { return _seal_fn(flush_behavior::immediate); }); } auto begin() noexcept { @@ -235,7 +286,7 @@ public: } private: lw_shared_ptr new_memtable() { - return make_lw_shared(_current_schema(), _dirty_memory_region_group); + return make_lw_shared(_current_schema(), &(_dirty_memory_manager->region_group())); } }; @@ -262,8 +313,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -296,24 +347,6 @@ private: config _config; stats _stats; - // We would like to serialize the flushing of memtables. While flushing many memtables - // simultaneously can sustain high levels of throughput, the memory is not freed until the - // memtable is totally gone. That means that if we have throttled requests, they will stay - // throttled for a long time. Even when we have virtual dirty, that only provides a rough - // estimate, and we can't release requests that early. - // - // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind - // would take care of the rest. But that still has issues, so we'll limit parallelism to some - // number (4), that we will hopefully reduce to 1 when write behind works. - // - // When streaming is going on, we'll separate half of that for the streaming code, which - // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O - // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have - // to do for the moment. Hopefully we can set both to 1 soon (with write behind) - // - // FIXME: enable write behind and set both to 1. - semaphore _memtables_serializer = { 4 }; - semaphore _streaming_serializer = { 2 }; lw_shared_ptr _memtables; // In older incarnations, we simply commited the mutations to memtables. @@ -335,6 +368,9 @@ private: // server. lw_shared_ptr _streaming_memtables; + friend class memtable_dirty_memory_manager; + friend class streaming_dirty_memory_manager; + lw_shared_ptr make_memory_only_memtable_list(); lw_shared_ptr make_memtable_list(); lw_shared_ptr make_streaming_memtable_list(); @@ -793,8 +829,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -888,8 +924,9 @@ class database { std::unique_ptr _cfg; size_t _memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20; - logalloc::region_group _dirty_memory_region_group; - logalloc::region_group _streaming_dirty_memory_region_group; + memtable_dirty_memory_manager _system_dirty_memory_manager; + memtable_dirty_memory_manager _dirty_memory_manager; + streaming_dirty_memory_manager _streaming_dirty_memory_manager; semaphore _read_concurrency_sem{max_concurrent_reads()}; restricted_mutation_reader_config _read_concurrency_config; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; @@ -906,7 +943,7 @@ class database { bool _enable_incremental_backups = false; future<> init_commitlog(); - future<> apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position&); + future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position); future<> populate(sstring datadir); future<> populate_keyspace(sstring datadir, sstring ks_name); @@ -918,9 +955,6 @@ private: friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only); void setup_collectd(); - throttle_state _memtables_throttler; - throttle_state _streaming_throttler; - future<> do_apply(schema_ptr, const frozen_mutation&); public: static utils::UUID empty_version; @@ -1033,7 +1067,7 @@ public: future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func); const logalloc::region_group& dirty_memory_region_group() const { - return _dirty_memory_region_group; + return _dirty_memory_manager.region_group(); } std::unordered_set get_initial_tokens(); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 2333dfad96..82d8c19d5a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1035,6 +1035,8 @@ void make(database& db, bool durable, bool volatile_testing_only) { kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem(); kscfg.read_concurrency_config.timeout = {}; kscfg.read_concurrency_config.max_queue_length = std::numeric_limits::max(); + // don't make system keyspace writes wait for user writes (if under pressure) + kscfg.dirty_memory_manager = &db._system_dirty_memory_manager; keyspace _ks{ksm, std::move(kscfg)}; auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options())); _ks.set_replication_strategy(std::move(rs)); diff --git a/memtable.cc b/memtable.cc index c00cbf0053..c22e4e41b3 100644 --- a/memtable.cc +++ b/memtable.cc @@ -26,20 +26,20 @@ namespace stdx = std::experimental; memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group) - : _schema(std::move(schema)) - , _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) + : logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) + , _schema(std::move(schema)) , partitions(memtable_entry::compare(_schema)) { } memtable::~memtable() { - with_allocator(_region.allocator(), [this] { + with_allocator(allocator(), [this] { partitions.clear_and_dispose(current_deleter()); }); } partition_entry& memtable::find_or_create_partition_slow(partition_key_view key) { - assert(!_region.reclaiming_enabled()); + assert(!reclaiming_enabled()); // FIXME: Perform lookup using std::pair // to avoid unconditional copy of the partition key. @@ -59,7 +59,7 @@ memtable::find_or_create_partition_slow(partition_key_view key) { partition_entry& memtable::find_or_create_partition(const dht::decorated_key& key) { - assert(!_region.reclaiming_enabled()); + assert(!reclaiming_enabled()); // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key, memtable_entry::compare(_schema)); @@ -127,7 +127,7 @@ private: } void update_iterators() { // We must be prepared that iterators may get invalidated during compaction. - auto current_reclaim_counter = _memtable->_region.reclaim_counter(); + auto current_reclaim_counter = _memtable->reclaim_counter(); auto cmp = memtable_entry::compare(_memtable->_schema); if (_last) { if (current_reclaim_counter != _last_reclaim_counter || @@ -177,7 +177,7 @@ public: return _delegate(); } - logalloc::reclaim_lock _(_memtable->_region); + logalloc::reclaim_lock _(*_memtable); managed_bytes::linearization_context_guard lcg; update_iterators(); if (_i == _end) { @@ -202,7 +202,7 @@ memtable::make_reader(schema_ptr s, if (query::is_single_partition(range)) { const query::ring_position& pos = range.start()->value(); - return _read_section(_region, [&] { + return _read_section(*this, [&] { managed_bytes::linearization_context_guard lcg; auto i = partitions.find(pos, memtable_entry::compare(_schema)); if (i != partitions.end()) { @@ -236,8 +236,8 @@ memtable::apply(memtable& mt) { void memtable::apply(const mutation& m, const db::replay_position& rp) { - with_allocator(_region.allocator(), [this, &m] { - _allocating_section(_region, [&, this] { + with_allocator(allocator(), [this, &m] { + _allocating_section(*this, [&, this] { with_linearized_managed_bytes([&] { auto& p = find_or_create_partition(m.decorated_key()); p.apply(*_schema, m.partition(), *m.schema()); @@ -249,8 +249,8 @@ memtable::apply(const mutation& m, const db::replay_position& rp) { void memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { - with_allocator(_region.allocator(), [this, &m, &m_schema] { - _allocating_section(_region, [&, this] { + with_allocator(allocator(), [this, &m, &m_schema] { + _allocating_section(*this, [&, this] { with_linearized_managed_bytes([&] { auto& p = find_or_create_partition_slow(m.key(*_schema)); p.apply(*_schema, m.partition(), *m_schema); @@ -261,7 +261,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db:: } logalloc::occupancy_stats memtable::occupancy() const { - return _region.occupancy(); + return logalloc::region::occupancy(); } mutation_source memtable::as_data_source() { @@ -308,13 +308,13 @@ memtable_entry::read(lw_shared_ptr mtbl, const schema_ptr& target_sche } auto& cr = ck_filtering.get_ranges(_key.key()); auto snp = _pe.read(_schema); - return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, mtbl->_region, mtbl->_read_section, mtbl); + return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, *mtbl, mtbl->_read_section, mtbl); } void memtable::upgrade_entry(memtable_entry& e) { if (e._schema != _schema) { - assert(!_region.reclaiming_enabled()); - with_allocator(_region.allocator(), [this, &e] { + assert(!reclaiming_enabled()); + with_allocator(allocator(), [this, &e] { with_linearized_managed_bytes([&] { e.partition().upgrade(e._schema, _schema); e._schema = _schema; diff --git a/memtable.hh b/memtable.hh index 93d486e283..8f4cb6ae39 100644 --- a/memtable.hh +++ b/memtable.hh @@ -91,7 +91,7 @@ public: }; // Managed by lw_shared_ptr<>. -class memtable final : public enable_lw_shared_from_this { +class memtable final : public enable_lw_shared_from_this, private logalloc::region { public: using partitions_type = bi::set, &memtable_entry::_link>, @@ -99,7 +99,6 @@ public: private: schema_ptr _schema; logalloc::allocating_section _read_section; - mutable logalloc::region _region; logalloc::allocating_section _allocating_section; partitions_type partitions; db::replay_position _replay_position; @@ -123,8 +122,13 @@ public: void apply(const mutation& m, const db::replay_position& = db::replay_position()); // The mutation is upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); + + static memtable& from_region(logalloc::region& r) { + return static_cast(r); + } + const logalloc::region& region() const { - return _region; + return *this; } public: size_t partition_count() const; diff --git a/row_cache.cc b/row_cache.cc index b5dce8c6a3..b90a7ebeee 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -698,7 +698,7 @@ future<> row_cache::clear() { } future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) { - _tracker.region().merge(m._region); // Now all data in memtable belongs to cache + _tracker.region().merge(m); // Now all data in memtable belongs to cache auto attr = seastar::thread_attributes(); attr.scheduling_group = &_update_thread_scheduling_group; auto t = seastar::thread(attr, [this, &m, presence_checker = std::move(presence_checker)] {