diff --git a/database.cc b/database.cc index 7535720a7d..63ee2dac2d 100644 --- a/database.cc +++ b/database.cc @@ -91,21 +91,21 @@ lw_shared_ptr column_family::make_memory_only_memtable_list() { auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); } lw_shared_ptr column_family::make_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); } lw_shared_ptr column_family::make_streaming_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer); } column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) diff --git a/database.hh b/database.hh index 8695d4e433..488a206801 100644 --- a/database.hh +++ b/database.hh @@ -156,13 +156,15 @@ private: std::function _current_schema; size_t _max_memtable_size; logalloc::region_group* _dirty_memory_region_group; + semaphore& _region_group_serializer; public: - memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, logalloc::region_group* region_group) + memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem) : _memtables({}) , _seal_fn(seal_fn) , _current_schema(cs) , _max_memtable_size(max_memtable_size) - , _dirty_memory_region_group(region_group) { + , _dirty_memory_region_group(region_group) + , _region_group_serializer(sem) { add_memtable(); } @@ -183,7 +185,16 @@ public: } future<> seal_active_memtable(flush_behavior behavior) { - return _seal_fn(behavior); + // We may need a lot of delayed flushes to get to the point where we force an immediate + // flush. So just let it go immediately. + if (behavior == flush_behavior::delayed) { + return _seal_fn(behavior); + } + return _region_group_serializer.wait().then([this] { + return _seal_fn(flush_behavior::immediate); + }).finally([this] { + _region_group_serializer.signal(); + }); } auto begin() noexcept { @@ -282,6 +293,25 @@ private: schema_ptr _schema; config _config; stats _stats; + + // We would like to serialize the flushing of memtables. While flushing many memtables + // simultaneously can sustain high levels of throughput, the memory is not freed until the + // memtable is totally gone. That means that if we have throttled requests, they will stay + // throttled for a long time. Even when we have virtual dirty, that only provides a rough + // estimate, and we can't release requests that early. + // + // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind + // would take care of the rest. But that still has issues, so we'll limit parallelism to some + // number (4), that we will hopefully reduce to 1 when write behind works. + // + // When streaming is going on, we'll separate half of that for the streaming code, which + // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O + // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have + // to do for the moment. Hopefully we can set both to 1 soon (with write behind) + // + // FIXME: enable write behind and set both to 1. + semaphore _memtables_serializer = { 4 }; + semaphore _streaming_serializer = { 2 }; lw_shared_ptr _memtables; // In older incarnations, we simply commited the mutations to memtables.