Merge "Commitlog shutdown" from Calle

"Refs #293

* Add a commitlog::sync_all_segments, that explicitly forces all pending
  disk writes
* Only delete segments from disk IFF they are marked clean. Thus on partial
  shutdown or whatnot, even if CL is destroyed (destructor runs) disk files
  not yet clean visavi sstables are preserved and replayable
* Do a sync_all_segments first of all in database::stop.

Exactly what to not stop in main I leave up to others discretion, or at least
another patch."
This commit is contained in:
Avi Kivity
2015-09-08 11:11:18 +03:00
3 changed files with 41 additions and 11 deletions

View File

@@ -1417,6 +1417,12 @@ operator<<(std::ostream& os, const atomic_cell& ac) {
future<>
database::stop() {
return _compaction_manager.stop().then([this] {
// try to ensure that CL has done disk flushing
if (_commitlog != nullptr) {
return _commitlog->sync_all_segments();
}
return make_ready_future<>();
}).then([this] {
return parallel_for_each(_column_families, [this] (auto& val_pair) {
return val_pair.second->stop();
});

View File

@@ -184,6 +184,7 @@ public:
future<sseg_ptr> new_segment();
future<sseg_ptr> active_segment();
future<> clear();
future<> sync_all_segments();
scollectd::registrations create_counters();
@@ -278,13 +279,16 @@ public:
++_segment_manager->totals.segments_created;
}
~segment() {
logger.debug("Segment {} is no longer active and will be deleted now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.total_size_on_disk -= size_on_disk();
_segment_manager->totals.total_size -=
(size_on_disk() + _buffer.size());
::unlink(
(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str());
if (is_clean()) {
logger.debug("Segment {} is no longer active and will be deleted now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.total_size_on_disk -= size_on_disk();
_segment_manager->totals.total_size -= (size_on_disk() + _buffer.size());
::unlink(
(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str());
} else {
logger.warn("Segment {} is dirty and is left on disk.", *this);
}
}
bool must_sync() {
@@ -824,17 +828,25 @@ void db::commitlog::segment_manager::discard_unused_segments() {
}
}
future<> db::commitlog::segment_manager::sync_all_segments() {
return parallel_for_each(_segments, [this](sseg_ptr s) {
return s->sync().then([](sseg_ptr) {});
});
}
/*
* Sync all segments, then clear them out. To ensure all ops are done.
* (Assumes you have barriered adding ops!)
* Only use from tests.
*/
future<> db::commitlog::segment_manager::clear() {
logger.debug("Clearing all segments");
flush_segments(true);
return do_until([this]() {return _segments.empty();}, [this]() {
auto s = _segments.front();
_segments.erase(_segments.begin());
return s->sync().then([](sseg_ptr) {});
return sync_all_segments().then([this] {
for (auto& s : _segments) {
s->mark_clean();
}
_segments.clear();
});
}
/**
@@ -964,6 +976,10 @@ void db::commitlog::discard_completed_segments(const cf_id_type& id,
_segment_manager->discard_completed_segments(id, pos);
}
future<> db::commitlog::sync_all_segments() {
return _segment_manager->sync_all_segments();
}
size_t db::commitlog::max_record_size() const {
return _segment_manager->max_mutation_size - segment::entry_overhead_size;
}

View File

@@ -222,6 +222,14 @@ public:
const config& active_config() const;
/**
* Issues disk sync on all (allocating) segments. I.e. ensures that
* all data written up until this call is indeed on disk.
* _However_, if you issue new "add" ops while this is executing,
* those can/will be missed.
*/
future<> sync_all_segments();
future<std::vector<descriptor>> list_existing_descriptors() const;
future<std::vector<descriptor>> list_existing_descriptors(const sstring& dir) const;