diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index d2c605d51c..becec9a35f 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -176,7 +176,7 @@ public: cfg.commit_log_location = "/var/lib/scylla/commitlog"; } logger.trace("Commitlog maximum disk size: {} MB / cpu ({} cpus)", - max_disk_size / (1024*1024), smp::count); + max_disk_size / (1024*1024)); _regs = create_counters(); } @@ -187,8 +187,6 @@ public: future<> init(); future new_segment(); future active_segment(); - future allocate_segment(bool active); - future<> clear(); future<> sync_all_segments(); future<> shutdown(); @@ -198,7 +196,6 @@ public: void discard_unused_segments(); void discard_completed_segments(const cf_id_type& id, const replay_position& pos); - void on_timer(); void sync(); void arm() { _timer.arm(std::chrono::milliseconds(cfg.commitlog_sync_period_in_ms)); @@ -227,21 +224,11 @@ public: private: segment_id_type _ids = 0; std::vector _segments; - std::deque _reserve_segments; std::vector _temp_buffers; std::unordered_map _flush_handlers; flush_handler_id _flush_ids = 0; replay_position _flush_position; timer _timer; - size_t _reserve_allocating = 0; - // # segments to try to keep available in reserve - // i.e. the amount of segments we expect to consume inbetween timer - // callbacks. - // The idea is that since the files are 0 len at start, and thus cost little, - // it is easier to adapt this value compared to timer freq. - size_t _num_reserve_segments = 0; - seastar::gate _gate; - uint64_t _new_counter = 0; }; /* @@ -292,12 +279,12 @@ public: // TODO : tune initial / default size static constexpr size_t default_size = align_up(128 * 1024, alignment); - segment(segment_manager* m, const descriptor& d, file && f, bool active) + segment(segment_manager* m, const descriptor& d, file && f) : _segment_manager(m), _desc(std::move(d)), _file(std::move(f)), _sync_time( clock_type::now()) { ++_segment_manager->totals.segments_created; - logger.debug("Created new {} segment {}", active ? "active" : "reserve", *this); + logger.debug("Created new segment {}", *this); } ~segment() { if (is_clean()) { @@ -320,7 +307,7 @@ public: auto ms = std::chrono::duration_cast( now - _sync_time).count(); if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) { - logger.debug("{} needs sync. {} ms elapsed", *this, ms); + logger.debug("Need sync. {} ms elapsed", ms); return true; } return false; @@ -338,7 +325,7 @@ public: // It is when it was initiated _sync_time = clock_type::now(); if (position() <= _flush_pos) { - logger.trace("Sync not needed {}: ({} / {})", *this, position(), _flush_pos); + logger.trace("Sync not needed : ({} / {})", position(), _flush_pos); return make_ready_future(shared_from_this()); } return cycle().then([](auto seg) { @@ -354,7 +341,7 @@ public: pos = _file_pos; } if (pos != 0 && pos <= _flush_pos) { - logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos); + logger.trace("Already synced! ({} < {})", pos, _flush_pos); return make_ready_future(std::move(me)); } logger.trace("Syncing {} -> {}", _flush_pos, pos); @@ -366,7 +353,7 @@ public: _dwrite.write_unlock(); // release it already. pos = std::max(pos, _file_pos); if (pos <= _flush_pos) { - logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos); + logger.trace("Already synced! ({} < {})", pos, _flush_pos); return make_ready_future(std::move(me)); } ++_segment_manager->totals.pending_operations; @@ -385,7 +372,7 @@ public: }).then([this, pos, me = std::move(me)]() { _flush_pos = std::max(pos, _flush_pos); ++_segment_manager->totals.flush_count; - logger.trace("{} synced to {}", *this, _flush_pos); + logger.trace("Synced to {}", _flush_pos); return make_ready_future(std::move(me)); }).finally([this] { --_segment_manager->totals.pending_operations; @@ -484,13 +471,16 @@ public: } // gah, partial write. should always get here with dma chunk sized // "bytes", but lets make sure... - logger.debug("Partial write {}: {}/{} bytes", *this, *written, size); + logger.debug("Partial write: {}/{} bytes", *written, size); *written = align_down(*written, alignment); return make_ready_future(stop_iteration::no); // TODO: retry/ignore/fail/stop - optional behaviour in origin. // we fast-fail the whole commit. + } catch (std::exception& e) { + logger.error("Failed to persist commits to disk: {}", e.what()); + throw; } catch (...) { - logger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception()); + logger.error("Failed to persist commits to disk."); throw; } }); @@ -681,9 +671,11 @@ future<> db::commitlog::segment_manager::init() { // base id counter is [ | ] _ids = replay_position(engine().cpu_id(), id).id; - // always run the timer now, since we need to handle segment pre-alloc etc as well. - _timer.set_callback(std::bind(&segment_manager::on_timer, this)); - this->arm(); + + if (cfg.mode != sync_mode::BATCH) { + _timer.set_callback(std::bind(&segment_manager::sync, this)); + this->arm(); + } }); } @@ -794,36 +786,22 @@ void db::commitlog::segment_manager::flush_segments(bool force) { } } -future db::commitlog::segment_manager::allocate_segment(bool active) { - descriptor d(next_id()); - return engine().open_file_dma(cfg.commit_log_location + "/" + d.filename(), open_flags::wo | open_flags::create).then([this, d, active](file f) { - auto s = make_lw_shared(this, d, std::move(f), active); - return make_ready_future(s); - }); -} - future db::commitlog::segment_manager::new_segment() { if (_shutdown) { throw std::runtime_error("Commitlog has been shut down. Cannot add data"); } - - ++_new_counter; - - if (_reserve_segments.empty()) { - if (_num_reserve_segments < cfg.max_reserve_segments) { - ++_num_reserve_segments; - logger.trace("Increased segment reserve count to {}", _num_reserve_segments); + descriptor d(next_id()); + return engine().open_file_dma(cfg.commit_log_location + "/" + d.filename(), open_flags::wo | open_flags::create).then([this, d](file f) { + _segments.emplace_back(make_lw_shared(this, d, std::move(f))); + auto max = max_disk_size; + auto cur = totals.total_size_on_disk; + if (max != 0 && cur >= max) { + logger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024)); + flush_segments(); } - return allocate_segment(true).then([this](sseg_ptr s) { - _segments.push_back(s); - return make_ready_future(s); - }); - } - - _segments.push_back(_reserve_segments.front()); - _reserve_segments.pop_front(); - logger.trace("Acquired segment {} from reserve", _segments.back()); - return make_ready_future(_segments.back()); + }).then([this] { + return make_ready_future(_segments.back()); + }); } future db::commitlog::segment_manager::active_segment() { @@ -846,7 +824,7 @@ future db::commitlog::segment_manager: */ void db::commitlog::segment_manager::discard_completed_segments( const cf_id_type& id, const replay_position& pos) { - logger.debug("Discard completed segments for {}, table {}", pos, id); + logger.debug("discard completed log segments for {}, table {}", pos, id); for (auto&s : _segments) { s->mark_clean(id, pos); } @@ -854,7 +832,7 @@ void db::commitlog::segment_manager::discard_completed_segments( } std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment& s) { - return out << s._desc.filename(); + return out << "commit log segment (" << s._desc.filename() << ")"; } std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment::cf_mark& m) { @@ -868,10 +846,10 @@ std::ostream& db::operator<<(std::ostream& out, const db::replay_position& p) { void db::commitlog::segment_manager::discard_unused_segments() { auto i = std::remove_if(_segments.begin(), _segments.end(), [=](auto& s) { if (s->is_unused()) { - logger.debug("Segment {} is unused", *s); + logger.debug("{} is unused", *s); return true; } - logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + logger.debug("Not safe to delete {}; dirty is {}", s, segment::cf_mark {*s}); return false; }); if (i != _segments.end()) { @@ -883,22 +861,16 @@ future<> db::commitlog::segment_manager::sync_all_segments() { logger.debug("Issuing sync for all segments"); return parallel_for_each(_segments, [this](sseg_ptr s) { return s->sync().then([](sseg_ptr s) { - logger.debug("Synced segment {}", *s); + logger.debug("Synced {}", *s); }); }); } future<> db::commitlog::segment_manager::shutdown() { - if (!_shutdown) { - _shutdown = true; - _timer.cancel(); - return _gate.close().then([this] { - return parallel_for_each(_segments, [this](sseg_ptr s) { - return s->shutdown(); - }); - }); - } - return make_ready_future<>(); + _shutdown = true; + return parallel_for_each(_segments, [this](sseg_ptr s) { + return s->shutdown(); + }); } @@ -909,8 +881,6 @@ future<> db::commitlog::segment_manager::shutdown() { */ future<> db::commitlog::segment_manager::clear() { logger.debug("Clearing all segments"); - _shutdown = true; - _timer.cancel(); flush_segments(true); return sync_all_segments().then([this] { for (auto& s : _segments) { @@ -926,51 +896,6 @@ void db::commitlog::segment_manager::sync() { for (auto& s : _segments) { s->sync(); // we do not care about waiting... } -} - -void db::commitlog::segment_manager::on_timer() { - if (cfg.mode != sync_mode::BATCH) { - sync(); - } - // IFF a new segment was put in use since last we checked, and we're - // above threshold, request flush. - if (_new_counter > 0) { - auto max = max_disk_size; - auto cur = totals.total_size_on_disk; - if (max != 0 && cur >= max) { - _new_counter = 0; - logger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024)); - flush_segments(); - } - } - // Gate, because we are starting potentially blocking ops - // without waiting for them, so segement_manager could be shut down - // while they are running. - seastar::with_gate(_gate, [this] { - // take outstanding allocations into regard. This is paranoid, - // but if for some reason the file::open takes longer than timer period, - // we could flood the reserve list with new segments - auto n = _reserve_segments.size() + _reserve_allocating; - return parallel_for_each(boost::irange(n, _num_reserve_segments), [this, n](auto i) { - ++_reserve_allocating; - return this->allocate_segment(false).then([this](sseg_ptr s) { - if (!_shutdown) { - // insertion sort. - auto i = std::upper_bound(_reserve_segments.begin(), _reserve_segments.end(), s, [](auto s1, auto s2) { - const descriptor& d1 = s1->_desc; - const descriptor& d2 = s2->_desc; - return d1.id < d2.id; - }); - i = _reserve_segments.emplace(i, std::move(s)); - logger.trace("Added reserve segment {}", *i); - } - }).finally([this] { - --_reserve_allocating; - }); - }); - }).handle_exception([](auto ep) { - logger.warn("Exception in segment reservation: {}", ep); - }); arm(); } @@ -1294,14 +1219,6 @@ uint64_t db::commitlog::get_pending_tasks() const { return _segment_manager->totals.pending_operations; } -uint64_t db::commitlog::get_num_segments_created() const { - return _segment_manager->totals.segments_created; -} - -uint64_t db::commitlog::get_num_segments_destroyed() const { - return _segment_manager->totals.segments_destroyed; -} - future> db::commitlog::list_existing_descriptors() const { return list_existing_descriptors(active_config().commit_log_location); } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 17cde0f379..1ad7e69f70 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -94,9 +94,6 @@ public: uint64_t commitlog_total_space_in_mb = 0; uint64_t commitlog_segment_size_in_mb = 32; uint64_t commitlog_sync_period_in_ms = 10 * 1000; //TODO: verify default! - // Max number of segments to keep in pre-alloc reserve. - // Not (yet) configurable from scylla.conf. - uint64_t max_reserve_segments = 12; sync_mode mode = sync_mode::PERIODIC; }; @@ -215,8 +212,6 @@ public: uint64_t get_total_size() const; uint64_t get_completed_tasks() const; uint64_t get_pending_tasks() const; - uint64_t get_num_segments_created() const; - uint64_t get_num_segments_destroyed() const; /** * Returns the largest amount of data that can be written in a single "mutation". diff --git a/tests/commitlog_test.cc b/tests/commitlog_test.cc index 5caaa7cdf3..698d107262 100644 --- a/tests/commitlog_test.cc +++ b/tests/commitlog_test.cc @@ -293,15 +293,13 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit){ cfg.commitlog_segment_size_in_mb = 2; cfg.commitlog_total_space_in_mb = 1; return make_commitlog(cfg).then([](tmplog_ptr log) { - auto sem = make_lw_shared(0); // add a flush handler that simply says we're done with the range. - auto r = log->second.add_flush_handler([log, sem](cf_id_type id, replay_position pos) { + auto r = log->second.add_flush_handler([log](cf_id_type id, replay_position pos) { log->second.discard_completed_segments(id, pos); - sem->signal(); }); auto set = make_lw_shared>(); auto uuid = utils::UUID_gen::get_time_UUID(); - return do_until([set, sem]() {return set->size() > 1 && sem->try_wait();}, + return do_until([set]() {return set->size() > 1;}, [log, set, uuid]() { sstring tmp = "hej bubba cow"; return log->second.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { @@ -312,9 +310,8 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit){ }); }).then([log]() { auto n = log->second.get_active_segment_names().size(); - auto d = log->second.get_num_segments_destroyed(); BOOST_REQUIRE(n > 0); - BOOST_REQUIRE(d > 0); + BOOST_REQUIRE(n < 2); }).finally([log, r = std::move(r)]() { return log->second.clear().then([log] {}); });