From 0c31f3e626ceefd9004d5200ff758837e8e14da2 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 20 Apr 2016 19:09:21 -0400 Subject: [PATCH 1/5] database: move memtable throttler to the LSA throttler The LSA infrastructure, through the use of its region groups, now have a throttler mechanism built-in. This patch converts the current throttlers so that the LSA throttler is used instead. Signed-off-by: Glauber Costa --- database.cc | 83 ++++++++++++----------------------------------------- database.hh | 36 +++-------------------- 2 files changed, 23 insertions(+), 96 deletions(-) diff --git a/database.cc b/database.cc index fbc176ac31..0ffc30cf76 100644 --- a/database.cc +++ b/database.cc @@ -1329,14 +1329,12 @@ database::database(const db::config& cfg) return memtable_total_space; }()) , _streaming_memtable_total_space(_memtable_total_space / 4) - , _streaming_dirty_memory_region_group(&_dirty_memory_region_group) + , _dirty_memory_region_group_reclaimer(_memtable_total_space) + , _streaming_dirty_memory_region_group_reclaimer(_streaming_memtable_total_space) + , _dirty_memory_region_group(_dirty_memory_region_group_reclaimer) + , _streaming_dirty_memory_region_group(&_dirty_memory_region_group, _streaming_dirty_memory_region_group_reclaimer) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) - , _memtables_throttler(_memtable_total_space, _dirty_memory_region_group) - , _streaming_throttler(_streaming_memtable_total_space, - _streaming_dirty_memory_region_group, - &_memtables_throttler - ) { _compaction_manager.start(); setup_collectd(); @@ -2109,14 +2107,15 @@ column_family::check_valid_rp(const db::replay_position& rp) const { } } -future<> database::apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { - try { - auto& cf = find_column_family(m.column_family_id()); - cf.apply(m, m_schema, rp); - } catch (no_such_column_family&) { - dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); - } - return make_ready_future<>(); +future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { + return _dirty_memory_region_group.run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { + try { + auto& cf = find_column_family(m.column_family_id()); + cf.apply(m, m_schema, rp); + } catch (no_such_column_family&) { + dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); + } + }); } future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { @@ -2148,43 +2147,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { return apply_in_memory(m, s, db::replay_position()); } -future<> throttle_state::throttle() { - if (!should_throttle() && _throttled_requests.empty()) { - // All is well, go ahead - return make_ready_future<>(); - } - // We must throttle, wait a bit - if (_throttled_requests.empty()) { - _throttling_timer.arm_periodic(10ms); - } - _throttled_requests.emplace_back(); - return _throttled_requests.back().get_future(); -} - -void throttle_state::unthrottle() { - // Release one request per free 1MB we have - // FIXME: improve this - if (should_throttle()) { - return; - } - size_t avail = std::max((_max_space - _region_group.memory_used()) >> 20, size_t(1)); - avail = std::min(_throttled_requests.size(), avail); - for (size_t i = 0; i < avail; ++i) { - _throttled_requests.front().set_value(); - _throttled_requests.pop_front(); - } - if (_throttled_requests.empty()) { - _throttling_timer.cancel(); - } -} - future<> database::apply(schema_ptr s, const frozen_mutation& m) { if (dblog.is_enabled(logging::log_level::trace)) { dblog.trace("apply {}", m.pretty_printer(s)); } - return _memtables_throttler.throttle().then([this, &m, s = std::move(s)] { - return do_apply(std::move(s), m); - }).then([this, s = _stats] { + return do_apply(std::move(s), m).then([this, s = _stats] { ++s->total_writes; }); } @@ -2194,23 +2161,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - - // TODO (maybe): This will use the same memory region group as memtables, so when - // one of them throttles, both will. - // - // It would be possible to provide further QoS for CQL originated memtables - // by keeping the streaming memtables into a different region group, with its own - // separate limit. - // - // Because, however, there are many other limits in play that may kick in, - // I am not convinced that this will ever be a problem. - // - // If we do find ourselves in the situation that we are throttling incoming - // writes due to high level of streaming writes, and we are sure that this - // is the best solution, we can just change the memtable creation method so - // that each kind of memtable creates from a different region group - and then - // update the throttle conditions accordingly. - return _streaming_throttler.throttle().then([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_region_group.run_when_memory_available([this, &m, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); cf.apply_streaming_mutation(s, std::move(m)); @@ -2316,6 +2267,10 @@ database::stop() { return parallel_for_each(_column_families, [this] (auto& val_pair) { return val_pair.second->stop(); }); + }).then([this] { + return _dirty_memory_region_group.shutdown(); + }).then([this] { + return _streaming_dirty_memory_region_group.shutdown(); }); } diff --git a/database.hh b/database.hh index 5c113fe08a..b437a8c197 100644 --- a/database.hh +++ b/database.hh @@ -99,34 +99,6 @@ void make(database& db, bool durable, bool volatile_testing_only); } } -class throttle_state { - size_t _max_space; - logalloc::region_group& _region_group; - throttle_state* _parent; - - circular_buffer> _throttled_requests; - timer<> _throttling_timer{[this] { unthrottle(); }}; - void unthrottle(); - bool should_throttle() const { - if (_region_group.memory_used() > _max_space) { - return true; - } - if (_parent) { - return _parent->should_throttle(); - } - return false; - } -public: - throttle_state(size_t max_space, logalloc::region_group& region, throttle_state* parent = nullptr) - : _max_space(max_space) - , _region_group(region) - , _parent(parent) - {} - - future<> throttle(); -}; - - class replay_position_reordered_exception : public std::exception {}; // We could just add all memtables, regardless of types, to a single list, and @@ -888,6 +860,9 @@ class database { std::unique_ptr _cfg; size_t _memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20; + logalloc::region_group_reclaimer _dirty_memory_region_group_reclaimer; + logalloc::region_group_reclaimer _streaming_dirty_memory_region_group_reclaimer; + logalloc::region_group _dirty_memory_region_group; logalloc::region_group _streaming_dirty_memory_region_group; semaphore _read_concurrency_sem{max_concurrent_reads()}; @@ -906,7 +881,7 @@ class database { bool _enable_incremental_backups = false; future<> init_commitlog(); - future<> apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position&); + future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position); future<> populate(sstring datadir); future<> populate_keyspace(sstring datadir, sstring ks_name); @@ -918,9 +893,6 @@ private: friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only); void setup_collectd(); - throttle_state _memtables_throttler; - throttle_state _streaming_throttler; - future<> do_apply(schema_ptr, const frozen_mutation&); public: static utils::UUID empty_version; From d41fcd45d12ce4db5da81a7707767b86b9825325 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 20 Jun 2016 20:22:23 -0400 Subject: [PATCH 2/5] memtables: make memtable inherit from region The LSA memory pressure mechanism will let us know which region is the best candidate for eviction when under pressure. We need to somehow then translate region -> memtable -> column family. The easiest way to convert from region to memtable, is having memtable inherit from region. Despite the fact that this requires multiple inheritance, which always raise a flag a bit, the other class we inherit from is enable_shared_from_this, which has a very simple and well defined interface. So I think it is worthy for us to do it. Once we have the memtable, grabing the column family is easy provided we have a database object. We can grab it from the schema. Signed-off-by: Glauber Costa --- memtable.cc | 32 ++++++++++++++++---------------- memtable.hh | 10 +++++++--- row_cache.cc | 2 +- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/memtable.cc b/memtable.cc index c00cbf0053..c22e4e41b3 100644 --- a/memtable.cc +++ b/memtable.cc @@ -26,20 +26,20 @@ namespace stdx = std::experimental; memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group) - : _schema(std::move(schema)) - , _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) + : logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) + , _schema(std::move(schema)) , partitions(memtable_entry::compare(_schema)) { } memtable::~memtable() { - with_allocator(_region.allocator(), [this] { + with_allocator(allocator(), [this] { partitions.clear_and_dispose(current_deleter()); }); } partition_entry& memtable::find_or_create_partition_slow(partition_key_view key) { - assert(!_region.reclaiming_enabled()); + assert(!reclaiming_enabled()); // FIXME: Perform lookup using std::pair // to avoid unconditional copy of the partition key. @@ -59,7 +59,7 @@ memtable::find_or_create_partition_slow(partition_key_view key) { partition_entry& memtable::find_or_create_partition(const dht::decorated_key& key) { - assert(!_region.reclaiming_enabled()); + assert(!reclaiming_enabled()); // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key, memtable_entry::compare(_schema)); @@ -127,7 +127,7 @@ private: } void update_iterators() { // We must be prepared that iterators may get invalidated during compaction. - auto current_reclaim_counter = _memtable->_region.reclaim_counter(); + auto current_reclaim_counter = _memtable->reclaim_counter(); auto cmp = memtable_entry::compare(_memtable->_schema); if (_last) { if (current_reclaim_counter != _last_reclaim_counter || @@ -177,7 +177,7 @@ public: return _delegate(); } - logalloc::reclaim_lock _(_memtable->_region); + logalloc::reclaim_lock _(*_memtable); managed_bytes::linearization_context_guard lcg; update_iterators(); if (_i == _end) { @@ -202,7 +202,7 @@ memtable::make_reader(schema_ptr s, if (query::is_single_partition(range)) { const query::ring_position& pos = range.start()->value(); - return _read_section(_region, [&] { + return _read_section(*this, [&] { managed_bytes::linearization_context_guard lcg; auto i = partitions.find(pos, memtable_entry::compare(_schema)); if (i != partitions.end()) { @@ -236,8 +236,8 @@ memtable::apply(memtable& mt) { void memtable::apply(const mutation& m, const db::replay_position& rp) { - with_allocator(_region.allocator(), [this, &m] { - _allocating_section(_region, [&, this] { + with_allocator(allocator(), [this, &m] { + _allocating_section(*this, [&, this] { with_linearized_managed_bytes([&] { auto& p = find_or_create_partition(m.decorated_key()); p.apply(*_schema, m.partition(), *m.schema()); @@ -249,8 +249,8 @@ memtable::apply(const mutation& m, const db::replay_position& rp) { void memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) { - with_allocator(_region.allocator(), [this, &m, &m_schema] { - _allocating_section(_region, [&, this] { + with_allocator(allocator(), [this, &m, &m_schema] { + _allocating_section(*this, [&, this] { with_linearized_managed_bytes([&] { auto& p = find_or_create_partition_slow(m.key(*_schema)); p.apply(*_schema, m.partition(), *m_schema); @@ -261,7 +261,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db:: } logalloc::occupancy_stats memtable::occupancy() const { - return _region.occupancy(); + return logalloc::region::occupancy(); } mutation_source memtable::as_data_source() { @@ -308,13 +308,13 @@ memtable_entry::read(lw_shared_ptr mtbl, const schema_ptr& target_sche } auto& cr = ck_filtering.get_ranges(_key.key()); auto snp = _pe.read(_schema); - return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, mtbl->_region, mtbl->_read_section, mtbl); + return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, *mtbl, mtbl->_read_section, mtbl); } void memtable::upgrade_entry(memtable_entry& e) { if (e._schema != _schema) { - assert(!_region.reclaiming_enabled()); - with_allocator(_region.allocator(), [this, &e] { + assert(!reclaiming_enabled()); + with_allocator(allocator(), [this, &e] { with_linearized_managed_bytes([&] { e.partition().upgrade(e._schema, _schema); e._schema = _schema; diff --git a/memtable.hh b/memtable.hh index 93d486e283..8f4cb6ae39 100644 --- a/memtable.hh +++ b/memtable.hh @@ -91,7 +91,7 @@ public: }; // Managed by lw_shared_ptr<>. -class memtable final : public enable_lw_shared_from_this { +class memtable final : public enable_lw_shared_from_this, private logalloc::region { public: using partitions_type = bi::set, &memtable_entry::_link>, @@ -99,7 +99,6 @@ public: private: schema_ptr _schema; logalloc::allocating_section _read_section; - mutable logalloc::region _region; logalloc::allocating_section _allocating_section; partitions_type partitions; db::replay_position _replay_position; @@ -123,8 +122,13 @@ public: void apply(const mutation& m, const db::replay_position& = db::replay_position()); // The mutation is upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); + + static memtable& from_region(logalloc::region& r) { + return static_cast(r); + } + const logalloc::region& region() const { - return _region; + return *this; } public: size_t partition_count() const; diff --git a/row_cache.cc b/row_cache.cc index b5dce8c6a3..b90a7ebeee 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -698,7 +698,7 @@ future<> row_cache::clear() { } future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) { - _tracker.region().merge(m._region); // Now all data in memtable belongs to cache + _tracker.region().merge(m); // Now all data in memtable belongs to cache auto attr = seastar::thread_attributes(); attr.scheduling_group = &_update_thread_scheduling_group; auto t = seastar::thread(attr, [this, &m, presence_checker = std::move(presence_checker)] { From c358947284f55c0299353cac2024d64b643fac74 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 5 Jul 2016 15:29:04 -0400 Subject: [PATCH 3/5] database: wrap semaphore and region group into a new dirty memory manager We currently have a semaphore in the column family level that protects us against multiple concurrent sstable flushes. However, storing that semaphore into the CF, not the database, was a (implementation, not design) mistake. One comment in particular makes it quite clear: // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind // would take care of the rest. But that still has issues, so we'll limit parallelism to some // number (4), that we will hopefully reduce to 1 when write behind works. So I aimed for the shard, but ended up coding it into the CF because that's closer to the flush point - my bad. This patch fixes this while paving the way for active reclaim to take place. It wraps the semaphore and the region group in a new structure, the dirty_memory_manager. The immediate benefit is that we don't need to be passing both the semaphore and the region group downwards in the DB -> CF path. The long term benefit is that we now have a one unified structure that can hold shared flush data in all of the CFs. Signed-off-by: Glauber Costa --- database.cc | 36 +++++++++------- database.hh | 119 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 99 insertions(+), 56 deletions(-) diff --git a/database.cc b/database.cc index 0ffc30cf76..fb5761ba24 100644 --- a/database.cc +++ b/database.cc @@ -92,21 +92,21 @@ lw_shared_ptr column_family::make_memory_only_memtable_list() { auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_streaming_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); } column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) @@ -1329,10 +1329,8 @@ database::database(const db::config& cfg) return memtable_total_space; }()) , _streaming_memtable_total_space(_memtable_total_space / 4) - , _dirty_memory_region_group_reclaimer(_memtable_total_space) - , _streaming_dirty_memory_region_group_reclaimer(_streaming_memtable_total_space) - , _dirty_memory_region_group(_dirty_memory_region_group_reclaimer) - , _streaming_dirty_memory_region_group(&_dirty_memory_region_group, _streaming_dirty_memory_region_group_reclaimer) + , _dirty_memory_manager(_memtable_total_space) + , _streaming_dirty_memory_manager(&_dirty_memory_manager, _streaming_memtable_total_space) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) { @@ -1349,7 +1347,7 @@ database::setup_collectd() { , scollectd::per_cpu_plugin_instance , "bytes", "dirty") , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { - return _dirty_memory_region_group.memory_used(); + return dirty_memory_region_group().memory_used(); }))); _collectd.push_back( @@ -1778,8 +1776,8 @@ keyspace::make_column_family_config(const schema& s) const { cfg.enable_cache = _config.enable_cache; cfg.max_memtable_size = _config.max_memtable_size; cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; - cfg.dirty_memory_region_group = _config.dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = _config.dirty_memory_manager; + cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager; cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2107,8 +2105,14 @@ column_family::check_valid_rp(const db::replay_position& rp) const { } } +future<> dirty_memory_manager::shutdown() { + return _waiting_flush_gate.close().then([this] { + return _region_group.shutdown(); + }); +} + future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { - return _dirty_memory_region_group.run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { + return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { try { auto& cf = find_column_family(m.column_family_id()); cf.apply(m, m_schema, rp); @@ -2161,7 +2165,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - return _streaming_dirty_memory_region_group.run_when_memory_available([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); cf.apply_streaming_mutation(s, std::move(m)); @@ -2193,8 +2197,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { // All writes should go to the main memtable list if we're not durable cfg.max_streaming_memtable_size = 0; } - cfg.dirty_memory_region_group = &_dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = &_dirty_memory_manager; + cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager; cfg.read_concurrency_config.sem = &_read_concurrency_sem; cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. @@ -2268,9 +2272,9 @@ database::stop() { return val_pair.second->stop(); }); }).then([this] { - return _dirty_memory_region_group.shutdown(); + return _dirty_memory_manager.shutdown(); }).then([this] { - return _streaming_dirty_memory_region_group.shutdown(); + return _streaming_dirty_memory_manager.shutdown(); }); } diff --git a/database.hh b/database.hh index b437a8c197..cf554bd4d2 100644 --- a/database.hh +++ b/database.hh @@ -101,6 +101,73 @@ void make(database& db, bool durable, bool volatile_testing_only); class replay_position_reordered_exception : public std::exception {}; +using shared_memtable = lw_shared_ptr; + +class dirty_memory_manager: private logalloc::region_group_reclaimer { + logalloc::region_group _region_group; + + // We would like to serialize the flushing of memtables. While flushing many memtables + // simultaneously can sustain high levels of throughput, the memory is not freed until the + // memtable is totally gone. That means that if we have throttled requests, they will stay + // throttled for a long time. Even when we have virtual dirty, that only provides a rough + // estimate, and we can't release requests that early. + // + // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind + // would take care of the rest. But that still has issues, so we'll limit parallelism to some + // number (4), that we will hopefully reduce to 1 when write behind works. + // + // When streaming is going on, we'll separate half of that for the streaming code, which + // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O + // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have + // to do for the moment. Hopefully we can set both to 1 soon (with write behind) + // + // FIXME: enable write behind and set both to 1. Right now we will take advantage of the fact + // that memtables and streaming will use different specialized classes here and set them as + // default values here. + size_t _concurrency; + semaphore _flush_serializer; + + seastar::gate _waiting_flush_gate; + std::vector _pending_flushes; +public: + future<> shutdown(); + dirty_memory_manager(size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _region_group(*this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} + + dirty_memory_manager(dirty_memory_manager *parent, size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _region_group(&parent->_region_group, *this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} + logalloc::region_group& region_group() { + return _region_group; + } + + const logalloc::region_group& region_group() const { + return _region_group; + } + + template + future<> serialize_flush(Func&& func) { + return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable { + return with_semaphore(_flush_serializer, 1, func); + }); + } +}; + +class streaming_dirty_memory_manager: public dirty_memory_manager { +public: + streaming_dirty_memory_manager(dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(parent, threshold, 2) {} +}; + +class memtable_dirty_memory_manager: public dirty_memory_manager { +public: + memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 4) {} +}; + // We could just add all memtables, regardless of types, to a single list, and // then filter them out when we read them. Here's why I have chosen not to do // it: @@ -123,21 +190,18 @@ class memtable_list { public: enum class flush_behavior { delayed, immediate }; private: - using shared_memtable = lw_shared_ptr; std::vector _memtables; std::function (flush_behavior)> _seal_fn; std::function _current_schema; size_t _max_memtable_size; - logalloc::region_group* _dirty_memory_region_group; - semaphore& _region_group_serializer; + dirty_memory_manager* _dirty_memory_manager; public: - memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem) + memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager) : _memtables({}) , _seal_fn(seal_fn) , _current_schema(cs) , _max_memtable_size(max_memtable_size) - , _dirty_memory_region_group(region_group) - , _region_group_serializer(sem) { + , _dirty_memory_manager(dirty_memory_manager) { add_memtable(); } @@ -163,11 +227,7 @@ public: if (behavior == flush_behavior::delayed) { return _seal_fn(behavior); } - return _region_group_serializer.wait().then([this] { - return _seal_fn(flush_behavior::immediate); - }).finally([this] { - _region_group_serializer.signal(); - }); + return _dirty_memory_manager->serialize_flush([this] { return _seal_fn(flush_behavior::immediate); }); } auto begin() noexcept { @@ -207,7 +267,7 @@ public: } private: lw_shared_ptr new_memtable() { - return make_lw_shared(_current_schema(), _dirty_memory_region_group); + return make_lw_shared(_current_schema(), &(_dirty_memory_manager->region_group())); } }; @@ -234,8 +294,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -268,24 +328,6 @@ private: config _config; stats _stats; - // We would like to serialize the flushing of memtables. While flushing many memtables - // simultaneously can sustain high levels of throughput, the memory is not freed until the - // memtable is totally gone. That means that if we have throttled requests, they will stay - // throttled for a long time. Even when we have virtual dirty, that only provides a rough - // estimate, and we can't release requests that early. - // - // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind - // would take care of the rest. But that still has issues, so we'll limit parallelism to some - // number (4), that we will hopefully reduce to 1 when write behind works. - // - // When streaming is going on, we'll separate half of that for the streaming code, which - // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O - // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have - // to do for the moment. Hopefully we can set both to 1 soon (with write behind) - // - // FIXME: enable write behind and set both to 1. - semaphore _memtables_serializer = { 4 }; - semaphore _streaming_serializer = { 2 }; lw_shared_ptr _memtables; // In older incarnations, we simply commited the mutations to memtables. @@ -765,8 +807,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -860,11 +902,8 @@ class database { std::unique_ptr _cfg; size_t _memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20; - logalloc::region_group_reclaimer _dirty_memory_region_group_reclaimer; - logalloc::region_group_reclaimer _streaming_dirty_memory_region_group_reclaimer; - - logalloc::region_group _dirty_memory_region_group; - logalloc::region_group _streaming_dirty_memory_region_group; + memtable_dirty_memory_manager _dirty_memory_manager; + streaming_dirty_memory_manager _streaming_dirty_memory_manager; semaphore _read_concurrency_sem{max_concurrent_reads()}; restricted_mutation_reader_config _read_concurrency_config; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; @@ -1005,7 +1044,7 @@ public: future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func); const logalloc::region_group& dirty_memory_region_group() const { - return _dirty_memory_region_group; + return _dirty_memory_manager.region_group(); } std::unordered_set get_initial_tokens(); From 7169b727ea7576edf7e724d52f941b9ec4b336b3 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 5 Jul 2016 16:52:53 -0400 Subject: [PATCH 4/5] move system tables to its own region In the spirit of what we are doing for the read semaphore, this patch moves system writes to its own dirty memory manager. Not only will it make sure that system tables will not be serialized by its own semaphore, but it will also put system tables in its own region group. Moving system tables to its own region group has the advantage that system requests won't be waiting during throttle behind a potentially big queue of user requests, since requests are tended to in FIFO order within the same region group. However, system tables being more controlled and predictable, we can actually go a step further and give them some extra reservation so they may not necessarily block even if under pressure (up to 10 MB more). Signed-off-by: Glauber Costa --- database.cc | 12 +++++++++++- database.hh | 6 +++++- db/system_keyspace.cc | 2 ++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/database.cc b/database.cc index fb5761ba24..0213625298 100644 --- a/database.cc +++ b/database.cc @@ -1329,7 +1329,15 @@ database::database(const db::config& cfg) return memtable_total_space; }()) , _streaming_memtable_total_space(_memtable_total_space / 4) - , _dirty_memory_manager(_memtable_total_space) + // Allow system tables a pool of 10 MB extra memory to write over the threshold. Under normal + // circumnstances it won't matter, but when we throttle, some system requests will be able to + // keep being serviced even if user requests are not. + // + // Note that even if we didn't allow extra memory, we would still want to keep system requests + // in a different region group. This is because throttled requests are serviced in FIFO order, + // and we don't want system requests to be waiting for a long time behind user requests. + , _system_dirty_memory_manager(_memtable_total_space + (10 << 20)) + , _dirty_memory_manager(&_system_dirty_memory_manager, _memtable_total_space) , _streaming_dirty_memory_manager(&_dirty_memory_manager, _streaming_memtable_total_space) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) @@ -2271,6 +2279,8 @@ database::stop() { return parallel_for_each(_column_families, [this] (auto& val_pair) { return val_pair.second->stop(); }); + }).then([this] { + return _system_dirty_memory_manager.shutdown(); }).then([this] { return _dirty_memory_manager.shutdown(); }).then([this] { diff --git a/database.hh b/database.hh index cf554bd4d2..592e0a975b 100644 --- a/database.hh +++ b/database.hh @@ -165,7 +165,10 @@ public: class memtable_dirty_memory_manager: public dirty_memory_manager { public: - memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 4) {} + memtable_dirty_memory_manager(dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(parent, threshold, 4) {} + // This constructor will be called for the system tables (no parent). Its flushes are usually drive by us + // and not the user, and tend to be small in size. So we'll allow only two slots. + memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 2) {} }; // We could just add all memtables, regardless of types, to a single list, and @@ -902,6 +905,7 @@ class database { std::unique_ptr _cfg; size_t _memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20; + memtable_dirty_memory_manager _system_dirty_memory_manager; memtable_dirty_memory_manager _dirty_memory_manager; streaming_dirty_memory_manager _streaming_dirty_memory_manager; semaphore _read_concurrency_sem{max_concurrent_reads()}; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 2333dfad96..82d8c19d5a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1035,6 +1035,8 @@ void make(database& db, bool durable, bool volatile_testing_only) { kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem(); kscfg.read_concurrency_config.timeout = {}; kscfg.read_concurrency_config.max_queue_length = std::numeric_limits::max(); + // don't make system keyspace writes wait for user writes (if under pressure) + kscfg.dirty_memory_manager = &db._system_dirty_memory_manager; keyspace _ks{ksm, std::move(kscfg)}; auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options())); _ks.set_replication_strategy(std::move(rs)); From b0932ceb0476e8b8a9bbe1aa273c1d8bcf059ffc Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 5 Jul 2016 17:45:27 -0400 Subject: [PATCH 5/5] database: act on LSA pressure notification Issue 1195 describes a scenario with a fairly easy reproducer in which we can freeze the database. That involves writing simultaneously to multiple CFs, such that the sum of all the memory they are using is larger than the dirty memory limit, without not any of them individually being larger than the memtable size. Because we will never reach the individual memtable seal size for any of them, none of them will initiate a flush leading the database to a halt. The LSA has now gained infrastructure that allow us to be notified when pressure conditions mount. What we will do in this case is initiate a flush ourselves. Fixes #1195 Signed-off-by: Glauber Costa --- database.cc | 49 ++++++++++++++++++++++++++++++++++++++++++++++--- database.hh | 33 ++++++++++++++++++++++++++------- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/database.cc b/database.cc index 0213625298..a5e3e56d9d 100644 --- a/database.cc +++ b/database.cc @@ -1336,9 +1336,9 @@ database::database(const db::config& cfg) // Note that even if we didn't allow extra memory, we would still want to keep system requests // in a different region group. This is because throttled requests are serviced in FIFO order, // and we don't want system requests to be waiting for a long time behind user requests. - , _system_dirty_memory_manager(_memtable_total_space + (10 << 20)) - , _dirty_memory_manager(&_system_dirty_memory_manager, _memtable_total_space) - , _streaming_dirty_memory_manager(&_dirty_memory_manager, _streaming_memtable_total_space) + , _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20)) + , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space) + , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) { @@ -2114,11 +2114,54 @@ column_family::check_valid_rp(const db::replay_position& rp) const { } future<> dirty_memory_manager::shutdown() { + _db_shutdown_requested = true; return _waiting_flush_gate.close().then([this] { return _region_group.shutdown(); }); } +void dirty_memory_manager::maybe_do_active_flush() { + if (!under_pressure() || _db_shutdown_requested) { + return; + } + + // Flush already ongoing. We don't need to initiate an active flush at this moment. + if (_flush_serializer.current() != _concurrency) { + return; + } + + // There are many criteria that can be used to select what is the best memtable to + // flush. Most of the time we want some coordination with the commitlog to allow us to + // release commitlog segments as early as we can. + // + // But during pressure condition, we'll just pick the CF that holds the largest + // memtable. The advantage of doing this is that this is objectively the one that will + // release the biggest amount of memory and is less likely to be generating tiny + // SSTables. The disadvantage is that right now, because we only release memory when the + // SSTable is fully written, that may take a bit of time to happen. + // + // However, since we'll very soon have a mechanism in place to account for the memory + // that was already written in one form or another, that disadvantage is mitigated. + memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region()); + auto& biggest_cf = _db.find_column_family(biggest_memtable.schema()); + memtable_list& mtlist = get_memtable_list(biggest_cf); + // Please note that this will eventually take the semaphore and prevent two concurrent flushes. + // We don't need any other extra protection. + mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate); +} + +memtable_list& memtable_dirty_memory_manager::get_memtable_list(column_family& cf) { + return *(cf._memtables); +} + +memtable_list& streaming_dirty_memory_manager::get_memtable_list(column_family& cf) { + return *(cf._streaming_memtables); +} + +void dirty_memory_manager::start_reclaiming() { + maybe_do_active_flush(); +} + future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { try { diff --git a/database.hh b/database.hh index 592e0a975b..68e61159c6 100644 --- a/database.hh +++ b/database.hh @@ -102,8 +102,14 @@ void make(database& db, bool durable, bool volatile_testing_only); class replay_position_reordered_exception : public std::exception {}; using shared_memtable = lw_shared_ptr; +class memtable_list; -class dirty_memory_manager: private logalloc::region_group_reclaimer { +class dirty_memory_manager: public logalloc::region_group_reclaimer { + // We need a separate boolean, because from the LSA point of view, pressure may still be + // mounting, in which case the pressure flag could be set back on if we force it off. + bool _db_shutdown_requested = false; + + database& _db; logalloc::region_group _region_group; // We would like to serialize the flushing of memtables. While flushing many memtables @@ -129,16 +135,22 @@ class dirty_memory_manager: private logalloc::region_group_reclaimer { seastar::gate _waiting_flush_gate; std::vector _pending_flushes; + void maybe_do_active_flush(); +protected: + virtual memtable_list& get_memtable_list(column_family& cf) = 0; + virtual void start_reclaiming() override; public: future<> shutdown(); - dirty_memory_manager(size_t threshold, size_t concurrency) + dirty_memory_manager(database& db, size_t threshold, size_t concurrency) : logalloc::region_group_reclaimer(threshold) + , _db(db) , _region_group(*this) , _concurrency(concurrency) , _flush_serializer(concurrency) {} - dirty_memory_manager(dirty_memory_manager *parent, size_t threshold, size_t concurrency) + dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold, size_t concurrency) : logalloc::region_group_reclaimer(threshold) + , _db(db) , _region_group(&parent->_region_group, *this) , _concurrency(concurrency) , _flush_serializer(concurrency) {} @@ -153,22 +165,26 @@ public: template future<> serialize_flush(Func&& func) { return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable { - return with_semaphore(_flush_serializer, 1, func); + return with_semaphore(_flush_serializer, 1, func).finally([this] { + maybe_do_active_flush(); + }); }); } }; class streaming_dirty_memory_manager: public dirty_memory_manager { + virtual memtable_list& get_memtable_list(column_family& cf) override; public: - streaming_dirty_memory_manager(dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(parent, threshold, 2) {} + streaming_dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 2) {} }; class memtable_dirty_memory_manager: public dirty_memory_manager { + virtual memtable_list& get_memtable_list(column_family& cf) override; public: - memtable_dirty_memory_manager(dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(parent, threshold, 4) {} + memtable_dirty_memory_manager(database& db, dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 4) {} // This constructor will be called for the system tables (no parent). Its flushes are usually drive by us // and not the user, and tend to be small in size. So we'll allow only two slots. - memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 2) {} + memtable_dirty_memory_manager(database& db, size_t threshold) : dirty_memory_manager(db, threshold, 2) {} }; // We could just add all memtables, regardless of types, to a single list, and @@ -352,6 +368,9 @@ private: // server. lw_shared_ptr _streaming_memtables; + friend class memtable_dirty_memory_manager; + friend class streaming_dirty_memory_manager; + lw_shared_ptr make_memory_only_memtable_list(); lw_shared_ptr make_memtable_list(); lw_shared_ptr make_streaming_memtable_list();