diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 704cacc4c9..6a3148d4a4 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -202,6 +202,7 @@ public: // Divide the size-on-disk threshold by #cpus used, since we assume // we distribute stuff more or less equally across shards. const uint64_t max_disk_size; // per-shard + const uint64_t disk_usage_threshold; bool _shutdown = false; std::optional> _shutdown_promise = {}; @@ -258,8 +259,10 @@ public: uint64_t segments_destroyed = 0; uint64_t pending_flushes = 0; uint64_t flush_limit_exceeded = 0; - uint64_t total_size = 0; uint64_t buffer_list_bytes = 0; + // size on disk, actually used - i.e. containing data (allocate+cycle) + uint64_t active_size_on_disk = 0; + // size allocated on disk - i.e. files created (new, reserve, recycled) uint64_t total_size_on_disk = 0; uint64_t requests_blocked_memory = 0; }; @@ -326,6 +329,7 @@ public: future<> do_pending_deletes(); future<> delete_segments(std::vector); + future<> delete_file(const sstring&); void discard_unused_segments(); void discard_completed_segments(const cf_id_type&); @@ -444,6 +448,7 @@ class db::commitlog::segment : public enable_shared_from_this, public c uint64_t _file_pos = 0; uint64_t _flush_pos = 0; + uint64_t _size_on_disk = 0; bool _closed = false; // Not the same as _closed since files can be reused @@ -503,10 +508,11 @@ public: // TODO : tune initial / default size static constexpr size_t default_size = align_up(128 * 1024, alignment); - segment(::shared_ptr m, descriptor d, file && f) + segment(::shared_ptr m, descriptor&& d, file&& f, uint64_t initial_disk_size) : _segment_manager(std::move(m)), _desc(std::move(d)), _file(std::move(f)), - _file_name(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()), _sync_time( - clock_type::now()), _pending_ops(true) // want exception propagation + _file_name(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()), + _size_on_disk(initial_disk_size), + _sync_time(clock_type::now()), _pending_ops(true) // want exception propagation { ++_segment_manager->totals.segments_created; clogger.debug("Created new segment {}", *this); @@ -515,11 +521,13 @@ public: if (!_closed_file) { _segment_manager->add_file_to_close(std::move(_file)); } + + _segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes(); + if (is_clean()) { clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); ++_segment_manager->totals.segments_destroyed; - _segment_manager->totals.total_size_on_disk -= size_on_disk(); - _segment_manager->totals.total_size -= (size_on_disk() + _buffer.size_bytes()); + _segment_manager->totals.active_size_on_disk -= file_position(); _segment_manager->add_file_to_delete(_file_name, _desc); } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { clogger.warn("Segment {} is dirty and is left on disk.", *this); @@ -619,7 +627,7 @@ public: // block before actual file end. // we should only get here when all actual data is // already flushed (see below, close()). - if (size_on_disk() < _segment_manager->max_size) { + if (file_position() < _segment_manager->max_size) { clogger.trace("{} is closed but not terminated.", *this); if (_buffer.empty()) { new_buffer(0); @@ -678,7 +686,7 @@ public: _buffer_ostream = _buffer.get_ostream(); auto out = _buffer_ostream.write_substream(overhead); out.fill('\0', overhead); - _segment_manager->totals.total_size += k; + _segment_manager->totals.buffer_list_bytes += _buffer.size_bytes(); } bool buffer_is_empty() const { @@ -763,10 +771,10 @@ public: try { auto bytes = std::get<0>(f.get()); _segment_manager->totals.bytes_written += bytes; - _segment_manager->totals.total_size_on_disk += bytes; + _segment_manager->totals.active_size_on_disk += bytes; ++_segment_manager->totals.cycle_count; if (bytes == view.size_bytes()) { - clogger.debug("Final write of {} to {}: {}/{} bytes at {}", bytes, *this, size, size, off); + clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, *this, size, size, off); return make_ready_future(stop_iteration::yes); } // gah, partial write. should always get here with dma chunk sized @@ -774,7 +782,7 @@ public: bytes = align_down(bytes, alignment); off += bytes; view.remove_prefix(bytes); - clogger.debug("Partial write of {} to {}: {}/{} bytes at at {}", bytes, *this, size - view.size_bytes(), size, off - bytes); + clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, *this, size - view.size_bytes(), size, off - bytes); return make_ready_future(stop_iteration::no); // TODO: retry/ignore/fail/stop - optional behaviour in origin. // we fast-fail the whole commit. @@ -785,7 +793,12 @@ public: }); }); }).finally([this, buf = std::move(buf), size] { - _segment_manager->notify_memory_written(size); + _segment_manager->notify_memory_written(size); + _segment_manager->totals.buffer_list_bytes -= buf.size_bytes(); + if (_size_on_disk < _file_pos) { + _segment_manager->totals.total_size_on_disk += (_file_pos - _size_on_disk); + _size_on_disk = _file_pos; + } }); }, [me, flush_after, top, rp] { // lambda instead of bind, so we keep "me" alive. assert(me->_pending_ops.has_operation(rp)); @@ -934,7 +947,7 @@ public: return position_type(_file_pos + buffer_position()); } - size_t size_on_disk() const { + size_t file_position() const { return _file_pos; } @@ -1036,6 +1049,8 @@ db::commitlog::segment_manager::segment_manager(config c) , max_size(std::min(std::numeric_limits::max(), std::max(cfg.commitlog_segment_size_in_mb, 1) * 1024 * 1024)) , max_mutation_size(max_size >> 1) , max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024) + // our threshold for trying to force a flush. needs heristics, for now max - segment_size/2. + , disk_usage_threshold(max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0)) , _flush_semaphore(cfg.max_active_flushes) // That is enough concurrency to allow for our largest mutation (max_mutation_size), plus // an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger @@ -1067,6 +1082,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() { return make_ready_future<>(); } return with_gate(_gate, [this] { + // note: if we were strict with disk size, we would refuse to do this + // unless disk footprint is lower than threshold. but we cannot (yet?) + // trust that flush logic will absolutely free up an existing + // segment (because colocation stuff etc), so always allow a new + // file if needed. That and performance stuff... return allocate_segment().then([this](sseg_ptr s) { auto ret = _reserve_segments.push(std::move(s)); if (!ret) { @@ -1211,7 +1231,11 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate "A non-zero value indicates that there are too many pending flush operations (see pending_flushes) and some of " "them will be blocked till the total amount of pending flush operations drops below {}.", cfg.max_active_flushes))), - sm::make_gauge("disk_total_bytes", totals.total_size, + sm::make_gauge("disk_total_bytes", totals.total_size_on_disk, + sm::description("Holds a size of disk space in bytes reserved for data so far. " + "A too high value indicates that we have some bottleneck in the writing to sstables path.")), + + sm::make_gauge("disk_active_bytes", totals.active_size_on_disk, sm::description("Holds a size of disk space in bytes used for data so far. " "A too high value indicates that we have some bottleneck in the writing to sstables path.")), @@ -1297,6 +1321,9 @@ future db::commitlog::segment_manager: if (existing_size >= max_size) { return f.truncate(max_size); } + + totals.total_size_on_disk += (max_size - existing_size); + clogger.trace("Pre-writing {} of {} KB to segment {}", (max_size - existing_size)/1024, max_size/1024, filename); return f.allocate(existing_size, max_size - existing_size).then([this, existing_size, f]() mutable { static constexpr size_t buf_size = 4 * segment::alignment; @@ -1328,8 +1355,8 @@ future db::commitlog::segment_manager: } else { fut = f.truncate(max_size); } - return fut.then([this, d = std::move(d), f = std::move(f)] () mutable { - auto s = make_shared(shared_from_this(), std::move(d), std::move(f)); + return fut.then([this, d, f, filename] () mutable { + auto s = make_shared(shared_from_this(), std::move(d), std::move(f), max_size); return make_ready_future(s); }); }); @@ -1374,7 +1401,8 @@ future db::commitlog::segment_manager: ++_new_counter; - if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments)) { + // don't increase reserve count if we are at max, or we would go over disk limit. + if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments) && (totals.total_size_on_disk + max_size) <= max_disk_size) { _reserve_segments.set_max_size(_reserve_segments.max_size() + 1); clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size()); } @@ -1496,7 +1524,7 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() { return parallel_for_each(i, e, [this](const sstring& filename) { clogger.debug("Deleting recycled segment file {}", filename); - return commit_io_check(&seastar::remove_file, filename); + return delete_file(filename); }).finally([this, re = std::move(re)] { return do_pending_deletes(); }); @@ -1562,6 +1590,15 @@ void db::commitlog::segment_manager::add_file_to_close(file f) { _files_to_close.emplace_back(std::move(f)); } +future<> db::commitlog::segment_manager::delete_file(const sstring& filename) { + return seastar::file_size(filename).then([this, filename](uint64_t size) { + clogger.debug("Deleting segment file {}", filename); + return commit_io_check(&seastar::remove_file, filename).then([this, size] { + totals.total_size_on_disk -= size; + }); + }); +} + future<> db::commitlog::segment_manager::delete_segments(std::vector files) { auto i = files.begin(); auto e = files.end(); @@ -1576,10 +1613,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi } return f.finally([&] { // We allow reuse of the segment if the current disk size is less than shard max. - // We however don't know the exact size of this file (or the others on the recycle - // list, so assume they are max_size large. - auto usage = totals.total_size_on_disk + (1 + _recycled_segments.size()) * max_size; - if (!_shutdown && cfg.reuse_segments && usage < max_disk_size) { + auto usage = totals.total_size_on_disk; + if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) { descriptor d(next_id(), "Recycled-" + cfg.fname_prefix); auto dst = this->filename(d); @@ -1591,11 +1626,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi _recycled_segments.emplace_back(dst); return make_ready_future<>(); }).handle_exception([this, filename](auto&&) { - return commit_io_check(&seastar::remove_file, filename); + return delete_file(filename); }); } - clogger.debug("Deleting segment file {}", filename); - return commit_io_check(&seastar::remove_file, filename); + return delete_file(filename); }).handle_exception([&filename](auto ep) { clogger.error("Could not delete segment {}: {}", filename, ep); }); @@ -1654,11 +1688,11 @@ void db::commitlog::segment_manager::on_timer() { // IFF a new segment was put in use since last we checked, and we're // above threshold, request flush. if (_new_counter > 0) { - auto max = max_disk_size; - auto cur = totals.total_size_on_disk; + auto max = disk_usage_threshold; + auto cur = totals.active_size_on_disk; if (max != 0 && cur >= max) { _new_counter = 0; - clogger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024)); + clogger.debug("Used size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024)); flush_segments(); } } @@ -2117,7 +2151,7 @@ std::vector db::commitlog::get_active_segment_names() const { } uint64_t db::commitlog::get_total_size() const { - return _segment_manager->totals.total_size; + return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes; } uint64_t db::commitlog::get_completed_tasks() const {