diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 5b3de64987..a12a11805d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -157,6 +157,9 @@ public: bool _shutdown = false; semaphore _new_segment_semaphore; + semaphore _write_semaphore; + semaphore _flush_semaphore; + scollectd::registrations _regs; // TODO: verify that we're ok with not-so-great granularity @@ -174,6 +177,9 @@ public: uint64_t segments_destroyed = 0; uint64_t pending_writes = 0; uint64_t pending_flushes = 0; + uint64_t pending_allocations = 0; + uint64_t write_limit_exceeded = 0; + uint64_t flush_limit_exceeded = 0; uint64_t total_size = 0; uint64_t buffer_list_bytes = 0; uint64_t total_size_on_disk = 0; @@ -181,54 +187,73 @@ public: stats totals; - void begin_write() { + future<> begin_write() { _gate.enter(); - ++totals.pending_writes; + ++totals.pending_writes; // redundant, given semaphore. but easier to read + if (totals.pending_writes >= cfg.max_active_writes) { + ++totals.write_limit_exceeded; + logger.trace("Write ops overflow: {}. Will block.", totals.pending_writes); + } + return _write_semaphore.wait(); } void end_write() { + _write_semaphore.signal(); --totals.pending_writes; _gate.leave(); } - void begin_flush() { + future<> begin_flush() { _gate.enter(); ++totals.pending_flushes; + if (totals.pending_flushes >= cfg.max_active_flushes) { + ++totals.flush_limit_exceeded; + logger.trace("Flush ops overflow: {}. Will block.", totals.pending_flushes); + } + return _flush_semaphore.wait(); } void end_flush() { + _flush_semaphore.signal(); --totals.pending_flushes; _gate.leave(); } + bool should_wait_for_write() const { + return _write_semaphore.waiters() > 0 || _flush_semaphore.waiters() > 0; + } + segment_manager(config c) - : cfg(c), max_size( - std::min(std::numeric_limits::max(), - std::max(cfg.commitlog_segment_size_in_mb, - 1) * 1024 * 1024)), max_mutation_size( - max_size >> 1), max_disk_size( - size_t( - std::ceil( - cfg.commitlog_total_space_in_mb - / double(smp::count))) * 1024 * 1024) + : cfg([&c] { + config cfg(c); + + if (cfg.commit_log_location.empty()) { + cfg.commit_log_location = "/var/lib/scylla/commitlog"; + } + + if (cfg.max_active_writes == 0) { + cfg.max_active_writes = // TODO: call someone to get an idea... + 25 * smp::count; + } + cfg.max_active_writes = std::max(uint64_t(1), cfg.max_active_writes / smp::count); + if (cfg.max_active_flushes == 0) { + cfg.max_active_flushes = // TODO: call someone to get an idea... + 5 * smp::count; + } + cfg.max_active_flushes = std::max(uint64_t(1), cfg.max_active_flushes / smp::count); + + return cfg; + }()) + , max_size(std::min(std::numeric_limits::max(), std::max(cfg.commitlog_segment_size_in_mb, 1) * 1024 * 1024)) + , max_mutation_size(max_size >> 1) + , max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024) + , _write_semaphore(cfg.max_active_writes) + , _flush_semaphore(cfg.max_active_flushes) { assert(max_size > 0); - if (cfg.commit_log_location.empty()) { - cfg.commit_log_location = "/var/lib/scylla/commitlog"; - } - - if (cfg.max_active_writes == 0) { - cfg.max_active_writes = // TODO: call someone to get an idea... - 25 * smp::count; - } - cfg.max_active_writes = std::max(uint64_t(1), cfg.max_active_writes / smp::count); - if (cfg.max_active_flushes == 0) { - cfg.max_active_flushes = // TODO: call someone to get an idea... - 5 * smp::count; - } - cfg.max_active_flushes = std::max(uint64_t(1), cfg.max_active_flushes / smp::count); logger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)", cfg.commit_log_location, max_disk_size / (1024 * 1024), smp::count); + _regs = create_counters(); } ~segment_manager() { @@ -367,11 +392,39 @@ class db::commitlog::segment: public enable_lw_shared_from_this { std::unordered_map _cf_dirty; time_point _sync_time; seastar::gate _gate; + uint64_t _write_waiters = 0; + semaphore _queue; std::unordered_set _known_schema_versions; friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; + + future<> begin_flush() { + // This is maintaining the semantica of only using the write-lock + // as a gate for flushing, i.e. once we've begun a flush for position X + // we are ok with writes to positions > X + return _dwrite.write_lock().then(std::bind(&segment_manager::begin_flush, _segment_manager)).finally([this] { + _dwrite.write_unlock(); + }); + } + + void end_flush() { + _segment_manager->end_flush(); + } + + future<> begin_write() { + // This is maintaining the semantica of only using the write-lock + // as a gate for flushing, i.e. once we've begun a flush for position X + // we are ok with writes to positions > X + return _dwrite.read_lock().then(std::bind(&segment_manager::begin_write, _segment_manager)); + } + + void end_write() { + _segment_manager->end_write(); + _dwrite.read_unlock(); + } + public: struct cf_mark { const segment& s; @@ -393,7 +446,7 @@ public: segment(segment_manager* m, const descriptor& d, file && f, bool active) : _segment_manager(m), _desc(std::move(d)), _file(std::move(f)), _sync_time( - clock_type::now()) + clock_type::now()), _queue(0) { ++_segment_manager->totals.segments_created; logger.debug("Created new {} segment {}", active ? "active" : "reserve", *this); @@ -478,15 +531,13 @@ public: // This is not 100% neccesary, we really only need the ones below our flush pos, // but since we pretty much assume that task ordering will make this the case anyway... - return _dwrite.write_lock().then( + return begin_flush().then( [this, me, pos]() mutable { - _dwrite.write_unlock(); // release it already. pos = std::max(pos, _file_pos); if (pos <= _flush_pos) { logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos); return make_ready_future(std::move(me)); } - _segment_manager->begin_flush(); return _file.flush().then_wrapped([this, pos, me](future<> f) { try { f.get(); @@ -500,16 +551,50 @@ public: logger.error("Failed to flush commits to disk: {}", std::current_exception()); throw; } - }).finally([this, me] { - _segment_manager->end_flush(); }); - }); + }).finally([this] { + end_flush(); + }); } + /** + * Allocate a new buffer + */ + void new_buffer(size_t s) { + assert(_buffer.empty()); + + auto overhead = segment_overhead_size; + if (_file_pos == 0) { + overhead += descriptor_header_size; + } + + auto a = align_up(s + overhead, alignment); + auto k = std::max(a, default_size); + + for (;;) { + try { + _buffer = _segment_manager->acquire_buffer(k); + break; + } catch (std::bad_alloc&) { + logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024); + if (k > a) { + k = std::max(a, k / 2); + logger.debug("Trying reduced size: {} k", k / 1024); + continue; + } + throw; + } + } + _buf_pos = overhead; + auto * p = reinterpret_cast(_buffer.get_write()); + std::fill(p, p + overhead, 0); + _segment_manager->totals.total_size += k; + } + /** * Send any buffer contents to disk and get a new tmp buffer */ // See class comment for info - future cycle(size_t s = 0) { + future cycle() { auto size = clear_buffer_slack(); auto buf = std::move(_buffer); auto off = _file_pos; @@ -517,36 +602,6 @@ public: _file_pos += size; _buf_pos = 0; - // if we need new buffer, get one. - // TODO: keep a queue of available buffers? - if (s > 0) { - auto overhead = segment_overhead_size; - if (_file_pos == 0) { - overhead += descriptor_header_size; - } - - auto a = align_up(s + overhead, alignment); - auto k = std::max(a, default_size); - - for (;;) { - try { - _buffer = _segment_manager->acquire_buffer(k); - break; - } catch (std::bad_alloc&) { - logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024); - if (k > a) { - k = std::max(a, k / 2); - logger.debug("Trying reduced size: {} k", k / 1024); - continue; - } - throw; - } - } - _buf_pos = overhead; - auto * p = reinterpret_cast(_buffer.get_write()); - std::fill(p, p + overhead, 0); - _segment_manager->totals.total_size += k; - } auto me = shared_from_this(); assert(!me.owned()); @@ -586,10 +641,9 @@ public: forget_schema_versions(); // acquire read lock - return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable { + return begin_write().then([this, size, off, buf = std::move(buf), me]() mutable { auto written = make_lw_shared(0); auto p = buf.get(); - _segment_manager->begin_write(); return repeat([this, size, off, written, p]() mutable { auto& priority_class = service::get_local_commitlog_priority(); return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future&& f) { @@ -616,12 +670,51 @@ public: }); }).finally([this, buf = std::move(buf)]() mutable { _segment_manager->release_buffer(std::move(buf)); - _segment_manager->end_write(); }); }).then([me] { return make_ready_future(std::move(me)); }).finally([me, this]() { - _dwrite.read_unlock(); // release + end_write(); // release + }); + } + + future maybe_wait_for_write(future f) { + if (_segment_manager->should_wait_for_write()) { + ++_write_waiters; + logger.trace("Too many pending writes. Must wait."); + return f.finally([this] { + if (--_write_waiters == 0) { + _queue.signal(_queue.waiters()); + } + }); + } + return make_ready_future(shared_from_this()); + } + + /** + * If an allocation causes a write, and the write causes a block, + * any allocations post that need to wait for this to finish, + * other wise we will just continue building up more write queue + * eventually (+ loose more ordering) + * + * Some caution here, since maybe_wait_for_write actually + * releases _all_ queued up ops when finishing, we could get + * "bursts" of alloc->write, causing build-ups anyway. + * This should be measured properly. For now I am hoping this + * will work out as these should "block as a group". However, + * buffer memory usage might grow... + */ + bool must_wait_for_alloc() { + return _write_waiters > 0; + } + + future wait_for_alloc() { + auto me = shared_from_this(); + ++_segment_manager->totals.pending_allocations; + logger.trace("Previous allocation is blocking. Must wait."); + return _queue.wait().then([me] { // TODO: do we need a finally? + --me->_segment_manager->totals.pending_allocations; + return make_ready_future(me); }); } @@ -638,23 +731,26 @@ public: + " bytes is too large for the maxiumum size of " + std::to_string(_segment_manager->max_mutation_size))); } - // would we make the file too big? - for (;;) { - if (position() + s > _segment_manager->max_size) { - // do this in next segment instead. - return finish_and_get_new().then( - [id, writer = std::move(writer)] (sseg_ptr new_seg) mutable { - return new_seg->allocate(id, std::move(writer)); - }); - } - // enough data? - if (s > (_buffer.size() - _buf_pos)) { - // TODO: iff we have to many writes running, maybe we should - // wait for this? - cycle(s); - continue; // re-check file size overflow - } - break; + + std::experimental::optional> op; + + if (must_sync()) { + op = sync(); + } else if (must_wait_for_alloc()) { + op = wait_for_alloc(); + } else if (position() + s > _segment_manager->max_size) { // would we make the file too big? + // do this in next segment instead. + op = finish_and_get_new(); + } else if (_buffer.empty()) { + new_buffer(s); + } else if (s > (_buffer.size() - _buf_pos)) { // enough data? + op = maybe_wait_for_write(cycle()); + } + + if (op) { + return op->then([id, writer = std::move(writer)] (sseg_ptr new_seg) mutable { + return new_seg->allocate(id, std::move(writer)); + }); } _gate.enter(); // this might throw. I guess we accept this? @@ -686,13 +782,6 @@ public: _gate.leave(); - // finally, check if we're required to sync. - if (must_sync()) { - return sync().then([rp](sseg_ptr seg) { - return make_ready_future(rp); - }); - } - return make_ready_future(rp); } @@ -880,6 +969,16 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() { , per_cpu_plugin_instance, "queue_length", "pending_flushes") , make_typed(data_type::GAUGE, totals.pending_flushes) ), + + add_polled_metric(type_instance_id("commitlog" + , per_cpu_plugin_instance, "total_operations", "write_limit_exceeded") + , make_typed(data_type::DERIVE, totals.write_limit_exceeded) + ), + add_polled_metric(type_instance_id("commitlog" + , per_cpu_plugin_instance, "total_operations", "flush_limit_exceeded") + , make_typed(data_type::DERIVE, totals.flush_limit_exceeded) + ), + add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "memory", "total_size") , make_typed(data_type::GAUGE, totals.total_size) @@ -1568,6 +1667,18 @@ uint64_t db::commitlog::get_pending_flushes() const { return _segment_manager->totals.pending_flushes; } +uint64_t db::commitlog::get_pending_allocations() const { + return _segment_manager->totals.pending_allocations; +} + +uint64_t db::commitlog::get_write_limit_exceeded_count() const { + return _segment_manager->totals.write_limit_exceeded; +} + +uint64_t db::commitlog::get_flush_limit_exceeded_count() const { + return _segment_manager->totals.flush_limit_exceeded; +} + uint64_t db::commitlog::get_num_segments_created() const { return _segment_manager->totals.segments_created; } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 6e090546ec..a6b2fb4f86 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -247,6 +247,9 @@ public: uint64_t get_pending_tasks() const; uint64_t get_pending_writes() const; uint64_t get_pending_flushes() const; + uint64_t get_pending_allocations() const; + uint64_t get_write_limit_exceeded_count() const; + uint64_t get_flush_limit_exceeded_count() const; uint64_t get_num_segments_created() const; uint64_t get_num_segments_destroyed() const; /** @@ -268,19 +271,10 @@ public: * Return max allowed pending writes (per this shard) */ uint64_t max_active_writes() const; - /** - * Set max allowed pending writes (per this shard) - */ - void max_active_writes(uint64_t); - /** * Return max allowed pending flushes (per this shard) */ uint64_t max_active_flushes() const; - /** - * Set max allowed pending flushes (per this shard) - */ - void max_active_flushes(uint64_t); future<> clear();