/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Modified by Cloudius Systems * Copyright 2015 Cloudius Systems */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "commitlog.hh" #include "db/config.hh" #include "utils/data_input.hh" #include "utils/crc.hh" #include "utils/runtime.hh" #include "log.hh" static logging::logger logger("commitlog"); class crc32_nbo { crc32 _c; public: template void process(T t) { _c.process(net::hton(t)); } uint32_t checksum() const { return _c.get(); } void process_bytes(const uint8_t* data, size_t size) { return _c.process(data, size); } void process_bytes(const int8_t* data, size_t size) { return _c.process(reinterpret_cast(data), size); } void process_bytes(const char* data, size_t size) { return _c.process(reinterpret_cast(data), size); } }; db::commitlog::config::config(const db::config& cfg) : commit_log_location(cfg.commitlog_directory()) , commitlog_total_space_in_mb(cfg.commitlog_total_space_in_mb()) , commitlog_segment_size_in_mb(cfg.commitlog_segment_size_in_mb()) , commitlog_sync_period_in_ms(cfg.commitlog_sync_batch_window_in_ms()) , mode(cfg.commitlog_sync() == "batch" ? sync_mode::BATCH : sync_mode::PERIODIC) {} db::commitlog::descriptor::descriptor(segment_id_type i, uint32_t v) : id(i), ver(v) { } db::commitlog::descriptor::descriptor(replay_position p) : descriptor(p.id) { } db::commitlog::descriptor::descriptor(std::pair p) : descriptor(p.first, p.second) { } db::commitlog::descriptor::descriptor(sstring filename) : descriptor([filename]() { std::smatch m; // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. std::regex rx("(?:.*/)?" + FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); std::string sfilename = filename; if (!std::regex_match(sfilename, m, rx)) { throw std::domain_error("Cannot parse the version of the file: " + filename); } if (m[3].length() == 0) { // CMH. Can most likely ignore this throw std::domain_error("Commitlog segment is too old to open; upgrade to 1.2.5+ first"); } segment_id_type id = std::stoull(m[3].str().substr(1)); uint32_t ver = std::stoul(m[2].str()); return std::make_pair(id, ver); }()) { } sstring db::commitlog::descriptor::filename() const { return FILENAME_PREFIX + std::to_string(ver) + SEPARATOR + std::to_string(id) + FILENAME_EXTENSION; } db::commitlog::descriptor::operator db::replay_position() const { return replay_position(id); } const std::string db::commitlog::descriptor::SEPARATOR("-"); const std::string db::commitlog::descriptor::FILENAME_PREFIX( "CommitLog" + SEPARATOR); const std::string db::commitlog::descriptor::FILENAME_EXTENSION(".log"); class db::commitlog::segment_manager { public: config cfg; const uint64_t max_size; const uint64_t max_mutation_size; // Divide the size-on-disk threshold by #cpus used, since we assume // we distribute stuff more or less equally across shards. const uint64_t max_disk_size; // per-shard bool _shutdown = false; semaphore _new_segment_semaphore; scollectd::registrations _regs; // TODO: verify that we're ok with not-so-great granularity using clock_type = lowres_clock; using time_point = clock_type::time_point; using sseg_ptr = lw_shared_ptr; struct stats { uint64_t cycle_count = 0; uint64_t flush_count = 0; uint64_t allocation_count = 0; uint64_t bytes_written = 0; uint64_t bytes_slack = 0; uint64_t segments_created = 0; uint64_t segments_destroyed = 0; uint64_t pending_operations = 0; uint64_t total_size = 0; uint64_t buffer_list_bytes = 0; uint64_t total_size_on_disk = 0; }; stats totals; segment_manager(config c) : cfg(c), max_size( std::min(std::numeric_limits::max(), std::max(cfg.commitlog_segment_size_in_mb, 1) * 1024 * 1024)), max_mutation_size( max_size >> 1), max_disk_size( size_t( std::ceil( cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024) { assert(max_size > 0); if (cfg.commit_log_location.empty()) { cfg.commit_log_location = "/var/lib/scylla/commitlog"; } logger.trace("Commitlog maximum disk size: {} MB / cpu ({} cpus)", max_disk_size / (1024*1024)); _regs = create_counters(); } uint64_t next_id() { return ++_ids; } future<> init(); future new_segment(); future active_segment(); future<> clear(); future<> sync_all_segments(); future<> shutdown(); scollectd::registrations create_counters(); void discard_unused_segments(); void discard_completed_segments(const cf_id_type& id, const replay_position& pos); void sync(); void arm() { _timer.arm(std::chrono::milliseconds(cfg.commitlog_sync_period_in_ms)); } std::vector get_active_names() const; using buffer_type = temporary_buffer; buffer_type acquire_buffer(size_t s); void release_buffer(buffer_type&&); future> list_descriptors(sstring dir); flush_handler_id add_flush_handler(flush_handler h) { auto id = ++_flush_ids; _flush_handlers[id] = std::move(h); return id; } void remove_flush_handler(flush_handler_id id) { _flush_handlers.erase(id); } void flush_segments(bool = false); private: segment_id_type _ids = 0; std::vector _segments; std::vector _temp_buffers; std::unordered_map _flush_handlers; flush_handler_id _flush_ids = 0; replay_position _flush_position; timer _timer; }; /* * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. */ class db::commitlog::segment: public enable_lw_shared_from_this { segment_manager* _segment_manager; descriptor _desc; file _file; uint64_t _file_pos = 0; uint64_t _flush_pos = 0; uint64_t _buf_pos = 0; bool _closed = false; using buffer_type = segment_manager::buffer_type; using sseg_ptr = segment_manager::sseg_ptr; using clock_type = segment_manager::clock_type; using time_point = segment_manager::time_point; buffer_type _buffer; rwlock _dwrite; // used as a barrier between write & flush std::unordered_map _cf_dirty; time_point _sync_time; seastar::gate _gate; friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; public: struct cf_mark { const segment& s; }; friend std::ostream& operator<<(std::ostream&, const cf_mark&); // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum) static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t descriptor_header_size = 4 * sizeof(uint32_t); // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position]) static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t); static constexpr size_t alignment = 4096; // TODO : tune initial / default size static constexpr size_t default_size = align_up(128 * 1024, alignment); segment(segment_manager* m, const descriptor& d, file && f) : _segment_manager(m), _desc(std::move(d)), _file(std::move(f)), _sync_time( clock_type::now()) { ++_segment_manager->totals.segments_created; logger.debug("Created new segment {}", *this); } ~segment() { if (is_clean()) { logger.debug("Segment {} is no longer active and will be deleted now", *this); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.total_size_on_disk -= size_on_disk(); _segment_manager->totals.total_size -= (size_on_disk() + _buffer.size()); ::unlink( (_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str()); } else { logger.warn("Segment {} is dirty and is left on disk.", *this); } } bool must_sync() { if (_segment_manager->cfg.mode == sync_mode::BATCH) { return true; } auto now = clock_type::now(); auto ms = std::chrono::duration_cast( now - _sync_time).count(); if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) { logger.debug("Need sync. {} ms elapsed", ms); return true; } return false; } /** * Finalize this segment and get a new one */ future finish_and_get_new() { _closed = true; sync(); return _segment_manager->active_segment(); } future sync() { // Note: this is not a marker for when sync was finished. // It is when it was initiated _sync_time = clock_type::now(); if (position() <= _flush_pos) { logger.trace("Sync not needed : ({} / {})", position(), _flush_pos); return make_ready_future(shared_from_this()); } return cycle().then([](auto seg) { return seg->flush(); }); } future<> shutdown() { return _gate.close(); } future flush(uint64_t pos = 0) { auto me = shared_from_this(); if (pos == 0) { pos = _file_pos; } if (pos != 0 && pos <= _flush_pos) { logger.trace("Already synced! ({} < {})", pos, _flush_pos); return make_ready_future(std::move(me)); } logger.trace("Syncing {} -> {}", _flush_pos, pos); // Make sure all disk writes are done. // This is not 100% neccesary, we really only need the ones below our flush pos, // but since we pretty much assume that task ordering will make this the case anyway... return _dwrite.write_lock().then( [this, me = std::move(me), pos]() mutable { _dwrite.write_unlock(); // release it already. pos = std::max(pos, _file_pos); if (pos <= _flush_pos) { logger.trace("Already synced! ({} < {})", pos, _flush_pos); return make_ready_future(std::move(me)); } ++_segment_manager->totals.pending_operations; return _file.flush().handle_exception([](auto ex) { try { std::rethrow_exception(ex); // TODO: retry/ignore/fail/stop - optional behaviour in origin. // we fast-fail the whole commit. } catch (std::exception& e) { logger.error("Failed to flush commits to disk: {}", e.what()); throw; } catch (...) { logger.error("Failed to flush commits to disk."); throw; } }).then([this, pos, me = std::move(me)]() { _flush_pos = std::max(pos, _flush_pos); ++_segment_manager->totals.flush_count; logger.trace("Synced to {}", _flush_pos); return make_ready_future(std::move(me)); }).finally([this] { --_segment_manager->totals.pending_operations; }); }); } /** * Send any buffer contents to disk and get a new tmp buffer */ future cycle(size_t s = 0) { auto size = clear_buffer_slack(); auto buf = std::move(_buffer); auto off = _file_pos; _file_pos += size; _buf_pos = 0; // if we need new buffer, get one. // TODO: keep a queue of available buffers? if (s > 0) { auto overhead = segment_overhead_size; if (_file_pos == 0) { overhead += descriptor_header_size; } auto a = align_up(s + overhead, alignment); auto k = std::max(a, default_size); for (;;) { try { _buffer = _segment_manager->acquire_buffer(k); break; } catch (std::bad_alloc&) { logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024); if (k > a) { k = std::max(a, k / 2); logger.debug("Trying reduced size: {} k", k / 1024); continue; } throw; } } _buf_pos = overhead; auto * p = reinterpret_cast(_buffer.get_write()); std::fill(p, p + overhead, 0); _segment_manager->totals.total_size += k; } auto me = shared_from_this(); if (size == 0) { return make_ready_future(std::move(me)); } auto * p = buf.get_write(); assert(std::count(p, p + 2 * sizeof(uint32_t), 0) == 2 * sizeof(uint32_t)); data_output out(p, p + buf.size()); auto header_size = 0; if (off == 0) { // first block. write file header. out.write(_desc.ver); out.write(_desc.id); crc32_nbo crc; crc.process(_desc.ver); crc.process(_desc.id & 0xffffffff); crc.process(_desc.id >> 32); out.write(crc.checksum()); header_size = descriptor_header_size; } // write chunk header crc32_nbo crc; crc.process(_desc.id & 0xffffffff); crc.process(_desc.id >> 32); crc.process(uint32_t(off + header_size)); out.write(uint32_t(_file_pos)); out.write(crc.checksum()); // acquire read lock return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable { ++_segment_manager->totals.pending_operations; auto written = make_lw_shared(0); auto p = buf.get(); return repeat([this, size, off, written, p]() mutable { return _file.dma_write(off + *written, p + *written, size - *written).then_wrapped([this, size, written](auto&& f) { try { auto bytes = std::get<0>(f.get()); *written += bytes; _segment_manager->totals.bytes_written += bytes; _segment_manager->totals.total_size_on_disk += bytes; ++_segment_manager->totals.cycle_count; if (*written == size) { return make_ready_future(stop_iteration::yes); } // gah, partial write. should always get here with dma chunk sized // "bytes", but lets make sure... logger.debug("Partial write: {}/{} bytes", *written, size); *written = align_down(*written, alignment); return make_ready_future(stop_iteration::no); // TODO: retry/ignore/fail/stop - optional behaviour in origin. // we fast-fail the whole commit. } catch (std::exception& e) { logger.error("Failed to persist commits to disk: {}", e.what()); throw; } catch (...) { logger.error("Failed to persist commits to disk."); throw; } }); }).finally([this, buf = std::move(buf)]() mutable { _segment_manager->release_buffer(std::move(buf)); }); }).then([me] { return make_ready_future(std::move(me)); }).finally([me, this]() { --_segment_manager->totals.pending_operations; _dwrite.read_unlock(); // release }); } /** * Add a "mutation" to the segment. */ future allocate(const cf_id_type& id, size_t size, serializer_func func) { const auto s = size + entry_overhead_size; // total size if (s > _segment_manager->max_mutation_size) { return make_exception_future( std::invalid_argument( "Mutation of " + std::to_string(s) + " bytes is too large for the maxiumum size of " + std::to_string(_segment_manager->max_mutation_size))); } // would we make the file too big? if (position() + s > _segment_manager->max_size) { // do this in next segment instead. return finish_and_get_new().then( [id, size, func = std::move(func)](auto new_seg) { return new_seg->allocate(id, size, func); }); } // enough data? if (s > (_buffer.size() - _buf_pos)) { // TODO: iff we have to many writes running, maybe we should // wait for this? cycle(s); } _gate.enter(); // this might throw. I guess we accept this? replay_position rp(_desc.id, position()); auto pos = _buf_pos; _buf_pos += s; _cf_dirty[id] = rp.pos; auto * p = _buffer.get_write() + pos; auto * e = _buffer.get_write() + pos + s - sizeof(uint32_t); data_output out(p, e); crc32_nbo crc; out.write(uint32_t(s)); crc.process(uint32_t(s)); out.write(crc.checksum()); // actual data func(out); crc.process_bytes(p + 2 * sizeof(uint32_t), size); out = data_output(e, sizeof(uint32_t)); out.write(crc.checksum()); ++_segment_manager->totals.allocation_count; _gate.leave(); // finally, check if we're required to sync. if (must_sync()) { return sync().then([rp](auto seg) { return make_ready_future(rp); }); } return make_ready_future(rp); } position_type position() const { return position_type(_file_pos + _buf_pos); } size_t size_on_disk() const { return _file_pos; } // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded // a.k.a. zero the tail. size_t clear_buffer_slack() { auto size = align_up(_buf_pos, alignment); std::fill(_buffer.get_write() + _buf_pos, _buffer.get_write() + size, 0); _segment_manager->totals.bytes_slack += (size - _buf_pos); return size; } void mark_clean(const cf_id_type& id, position_type pos) { auto i = _cf_dirty.find(id); if (i != _cf_dirty.end() && i->second <= pos) { _cf_dirty.erase(i); } } void mark_clean(const cf_id_type& id, const replay_position& pos) { if (pos.id == _desc.id) { mark_clean(id, pos.pos); } else if (pos.id > _desc.id) { mark_clean(id, std::numeric_limits::max()); } } void mark_clean() { _cf_dirty.clear(); } bool is_still_allocating() const { return !_closed && position() < _segment_manager->max_size; } bool is_clean() { return _cf_dirty.empty(); } bool is_unused() { return !is_still_allocating() && is_clean(); } bool contains(const replay_position& pos) { return pos.id == _desc.id; } sstring get_segment_name() const { return _desc.filename(); } }; const size_t db::commitlog::segment::default_size; future> db::commitlog::segment_manager::list_descriptors(sstring dirname) { struct helper { sstring _dirname; file _file; subscription _list; std::vector _result; helper(helper&&) = default; helper(sstring n, file && f) : _dirname(std::move(n)), _file(std::move(f)), _list( _file.list_directory( std::bind(&helper::process, this, std::placeholders::_1))) { } future<> process(directory_entry de) { auto entry_type = [this](const directory_entry & de) { if (!de.type && !de.name.empty()) { return engine().file_type(_dirname + "/" + de.name); } return make_ready_future>(de.type); }; return entry_type(de).then([this, de](auto type) { if (type == directory_entry_type::regular && de.name[0] != '.') { try { _result.emplace_back(de.name); } catch (std::domain_error& e) { logger.warn(e.what()); } } return make_ready_future<>(); }); } future<> done() { return _list.done(); } }; return engine().open_directory(dirname).then([this, dirname](auto dir) { auto h = make_lw_shared(std::move(dirname), std::move(dir)); return h->done().then([h]() { return make_ready_future>(std::move(h->_result)); }).finally([h] {}); }); } future<> db::commitlog::segment_manager::init() { return list_descriptors(cfg.commit_log_location).then([this](auto descs) { segment_id_type id = std::chrono::duration_cast(runtime::get_boot_time().time_since_epoch()).count() + 1; for (auto& d : descs) { id = std::max(id, replay_position(d.id).base_id()); } // base id counter is [ | ] _ids = replay_position(engine().cpu_id(), id).id; if (cfg.mode != sync_mode::BATCH) { _timer.set_callback(std::bind(&segment_manager::sync, this)); this->arm(); } }); } scollectd::registrations db::commitlog::segment_manager::create_counters() { using scollectd::add_polled_metric; using scollectd::make_typed; using scollectd::type_instance_id; using scollectd::per_cpu_plugin_instance; using scollectd::data_type; return { add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "queue_length", "segments") , make_typed(data_type::GAUGE , std::bind(&decltype(_segments)::size, &_segments)) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "queue_length", "allocating_segments") , make_typed(data_type::GAUGE , [this]() { return std::count_if(_segments.begin(), _segments.end(), [](const sseg_ptr & s) { return s->is_still_allocating(); }); }) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "queue_length", "unused_segments") , make_typed(data_type::GAUGE , [this]() { return std::count_if(_segments.begin(), _segments.end(), [](const sseg_ptr & s) { return s->is_unused(); }); }) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "total_operations", "alloc") , make_typed(data_type::DERIVE, totals.allocation_count) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "total_operations", "cycle") , make_typed(data_type::DERIVE, totals.cycle_count) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "total_operations", "flush") , make_typed(data_type::DERIVE, totals.flush_count) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "total_bytes", "written") , make_typed(data_type::DERIVE, totals.bytes_written) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "total_bytes", "slack") , make_typed(data_type::DERIVE, totals.bytes_slack) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "queue_length", "pending_operations") , make_typed(data_type::GAUGE, totals.pending_operations) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "memory", "total_size") , make_typed(data_type::GAUGE, totals.total_size) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "memory", "buffer_list_bytes") , make_typed(data_type::GAUGE, totals.buffer_list_bytes) ), }; } void db::commitlog::segment_manager::flush_segments(bool force) { if (_segments.empty()) { return; } // defensive copy. auto callbacks = boost::copy_range>(_flush_handlers | boost::adaptors::map_values); auto& active = _segments.back(); // RP at "start" of segment we leave untouched. replay_position high(active->_desc.id, 0); // But if all segments are closed or we force-flush, // include all. if (force || !active->is_still_allocating()) { high = replay_position(high.id + 1, 0); } // Now get a set of used CF ids: std::unordered_set ids; std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) { for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) { ids.insert(id); } }); logger.debug("Flushing ({}) to {}", force, high); // For each CF id: for each callback c: call c(id, high) for (auto& f : callbacks) { for (auto& id : ids) { try { f(id, high); } catch (...) { logger.error("Exception during flush request {}/{}: {}", id, high, std::current_exception()); } } } } future db::commitlog::segment_manager::new_segment() { if (_shutdown) { throw std::runtime_error("Commitlog has been shut down. Cannot add data"); } descriptor d(next_id()); return engine().open_file_dma(cfg.commit_log_location + "/" + d.filename(), open_flags::wo | open_flags::create).then([this, d](file f) { _segments.emplace_back(make_lw_shared(this, d, std::move(f))); auto max = max_disk_size; auto cur = totals.total_size_on_disk; if (max != 0 && cur >= max) { logger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024)); flush_segments(); } }).then([this] { return make_ready_future(_segments.back()); }); } future db::commitlog::segment_manager::active_segment() { if (_segments.empty() || !_segments.back()->is_still_allocating()) { return _new_segment_semaphore.wait().then([this]() { if (_segments.empty() || !_segments.back()->is_still_allocating()) { return new_segment(); } return make_ready_future(_segments.back()); }).finally([this]() { _new_segment_semaphore.signal(); }); } return make_ready_future(_segments.back()); } /** * go through all segments, clear id up to pos. if segment becomes clean and unused by this, * it is discarded. */ void db::commitlog::segment_manager::discard_completed_segments( const cf_id_type& id, const replay_position& pos) { logger.debug("discard completed log segments for {}, table {}", pos, id); for (auto&s : _segments) { s->mark_clean(id, pos); } discard_unused_segments(); } std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment& s) { return out << "commit log segment (" << s._desc.filename() << ")"; } std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment::cf_mark& m) { return out << (m.s._cf_dirty | boost::adaptors::map_keys); } std::ostream& db::operator<<(std::ostream& out, const db::replay_position& p) { return out << "{" << p.shard_id() << ", " << p.base_id() << ", " << p.pos << "}"; } void db::commitlog::segment_manager::discard_unused_segments() { auto i = std::remove_if(_segments.begin(), _segments.end(), [=](auto& s) { if (s->is_unused()) { logger.debug("{} is unused", *s); return true; } logger.debug("Not safe to delete {}; dirty is {}", s, segment::cf_mark {*s}); return false; }); if (i != _segments.end()) { _segments.erase(i, _segments.end()); } } future<> db::commitlog::segment_manager::sync_all_segments() { logger.debug("Issuing sync for all segments"); return parallel_for_each(_segments, [this](sseg_ptr s) { return s->sync().then([](sseg_ptr s) { logger.debug("Synced {}", *s); }); }); } future<> db::commitlog::segment_manager::shutdown() { _shutdown = true; return parallel_for_each(_segments, [this](sseg_ptr s) { return s->shutdown(); }); } /* * Sync all segments, then clear them out. To ensure all ops are done. * (Assumes you have barriered adding ops!) * Only use from tests. */ future<> db::commitlog::segment_manager::clear() { logger.debug("Clearing all segments"); flush_segments(true); return sync_all_segments().then([this] { for (auto& s : _segments) { s->mark_clean(); } _segments.clear(); }); } /** * Called by timer in periodic mode. */ void db::commitlog::segment_manager::sync() { for (auto& s : _segments) { s->sync(); // we do not care about waiting... } arm(); } std::vector db::commitlog::segment_manager::get_active_names() const { std::vector res; for (auto i: _segments) { if (!i->is_unused()) { // Each shared is located in its own directory res.push_back(cfg.commit_log_location + "/" + i->get_segment_name()); } } return res; } db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acquire_buffer(size_t s) { auto i = _temp_buffers.begin(); auto e = _temp_buffers.end(); while (i != e) { if (i->size() >= s) { auto r = std::move(*i); _temp_buffers.erase(i); totals.buffer_list_bytes -= r.size(); return r; } ++i; } auto a = ::memalign(segment::alignment, s); if (a == nullptr) { throw std::bad_alloc(); } return buffer_type(reinterpret_cast(a), s, make_free_deleter(a)); } void db::commitlog::segment_manager::release_buffer(buffer_type&& b) { _temp_buffers.emplace_back(std::move(b)); std::sort(_temp_buffers.begin(), _temp_buffers.end(), [](const buffer_type& b1, const buffer_type& b2) { return b1.size() < b2.size(); }); constexpr const size_t max_temp_buffers = 4; if (_temp_buffers.size() > max_temp_buffers) { _temp_buffers.erase(_temp_buffers.begin() + max_temp_buffers, _temp_buffers.end()); } totals.buffer_list_bytes = std::accumulate(_temp_buffers.begin(), _temp_buffers.end(), size_t(0), std::plus()); } /** * Add mutation. */ future db::commitlog::add(const cf_id_type& id, size_t size, serializer_func func) { return _segment_manager->active_segment().then([=](auto s) { return s->allocate(id, size, std::move(func)); }); } db::commitlog::commitlog(config cfg) : _segment_manager(new segment_manager(std::move(cfg))) { } db::commitlog::commitlog(commitlog&& v) : _segment_manager(std::move(v._segment_manager)) { } db::commitlog::~commitlog() { } future db::commitlog::create_commitlog(config cfg) { commitlog c(std::move(cfg)); auto f = c._segment_manager->init(); return f.then([c = std::move(c)]() mutable { return make_ready_future(std::move(c)); }); } db::commitlog::flush_handler_anchor::flush_handler_anchor(flush_handler_anchor&& f) : _cl(f._cl), _id(f._id) { f._id = 0; } db::commitlog::flush_handler_anchor::flush_handler_anchor(commitlog& cl, flush_handler_id id) : _cl(cl), _id(id) {} db::commitlog::flush_handler_anchor::~flush_handler_anchor() { unregister(); } db::commitlog::flush_handler_id db::commitlog::flush_handler_anchor::release() { flush_handler_id id = 0; std::swap(_id, id); return id; } void db::commitlog::flush_handler_anchor::unregister() { auto id = release(); if (id != 0) { _cl.remove_flush_handler(id); } } db::commitlog::flush_handler_anchor db::commitlog::add_flush_handler(flush_handler h) { return flush_handler_anchor(*this, _segment_manager->add_flush_handler(std::move(h))); } void db::commitlog::remove_flush_handler(flush_handler_id id) { _segment_manager->remove_flush_handler(id); } void db::commitlog::discard_completed_segments(const cf_id_type& id, const replay_position& pos) { _segment_manager->discard_completed_segments(id, pos); } future<> db::commitlog::sync_all_segments() { return _segment_manager->sync_all_segments(); } future<> db::commitlog::shutdown() { return _segment_manager->shutdown(); } size_t db::commitlog::max_record_size() const { return _segment_manager->max_mutation_size - segment::entry_overhead_size; } future<> db::commitlog::clear() { return _segment_manager->clear(); } const db::commitlog::config& db::commitlog::active_config() const { return _segment_manager->cfg; } future, db::replay_position>> db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next, position_type off) { return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next), off](file f) { return read_log_file(std::move(f), std::move(next), off); }); } subscription, db::replay_position> db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) { struct work { file f; stream, replay_position> s; input_stream fin; input_stream r; uint64_t id = 0; size_t pos = 0; size_t next = 0; size_t start_off = 0; size_t skip_to = 0; bool eof = false; bool header = true; work(file f, position_type o = 0) : f(f), fin(make_file_input_stream(f)), start_off(o) { } work(work&&) = default; bool advance(const temporary_buffer& buf) { pos += buf.size(); if (buf.size() == 0) { eof = true; } return !eof; } bool end_of_file() const { return eof; } bool end_of_chunk() const { return eof || next == pos; } future<> skip(size_t bytes) { skip_to = pos + bytes; return do_until([this] { return pos == skip_to || eof; }, [this, bytes] { auto s = std::min(4096, skip_to - pos); // should eof be an error here? return fin.read_exactly(s).then([this](auto buf) { this->advance(buf); }); }); } future<> read_header() { return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer buf) { advance(buf); // Will throw if we got eof data_input in(buf); auto ver = in.read(); auto id = in.read(); auto checksum = in.read(); crc32_nbo crc; crc.process(ver); crc.process(id & 0xffffffff); crc.process(id >> 32); auto cs = crc.checksum(); if (cs != checksum) { throw std::runtime_error("Checksum error in file header"); } this->id = id; this->next = 0; if (start_off > pos) { return skip(start_off - pos); } return make_ready_future<>(); }); } future<> read_chunk() { return fin.read_exactly(segment::segment_overhead_size).then([this](temporary_buffer buf) { auto start = pos; if (!advance(buf)) { return make_ready_future<>(); } data_input in(buf); auto next = in.read(); auto checksum = in.read(); crc32_nbo crc; crc.process(id & 0xffffffff); crc.process(id >> 32); crc.process(start); auto cs = crc.checksum(); if (cs != checksum) { throw std::runtime_error("Checksum error in chunk header"); } this->next = next; return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this)); }); } future<> read_entry() { static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t); return fin.read_exactly(entry_header_size).then([this](temporary_buffer buf) { replay_position rp(id, position_type(pos)); if (!advance(buf)) { return make_ready_future<>(); } data_input in(buf); auto size = in.read(); auto checksum = in.read(); if (size == 0) { // special urchin case: zero padding due to dma blocks auto slack = next - pos; return skip(slack); } if (size < 3 * sizeof(uint32_t)) { throw std::runtime_error("Invalid entry size"); } return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer buf) { advance(buf); data_input in(buf); auto data_size = size - segment::entry_overhead_size; in.skip(data_size); auto checksum = in.read(); crc32_nbo crc; crc.process(size); crc.process_bytes(buf.get(), data_size); if (crc.checksum() != checksum) { throw std::runtime_error("Checksum error in data entry"); } return s.produce(buf.share(0, data_size), rp); }); }); } future<> read_file() { return read_header().then( [this] { return do_until(std::bind(&work::end_of_file, this), std::bind(&work::read_chunk, this)); }); } }; auto w = make_lw_shared(std::move(f), off); auto ret = w->s.listen(std::move(next)); w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] { w->s.close(); }); return ret; } std::vector db::commitlog::get_active_segment_names() const { return _segment_manager->get_active_names(); } uint64_t db::commitlog::get_total_size() const { return _segment_manager->totals.total_size; } uint64_t db::commitlog::get_completed_tasks() const { return _segment_manager->totals.allocation_count; } uint64_t db::commitlog::get_pending_tasks() const { return _segment_manager->totals.pending_operations; } future> db::commitlog::list_existing_descriptors() const { return list_existing_descriptors(active_config().commit_log_location); } future> db::commitlog::list_existing_descriptors(const sstring& dir) const { return _segment_manager->list_descriptors(dir); } future> db::commitlog::list_existing_segments() const { return list_existing_segments(active_config().commit_log_location); } future> db::commitlog::list_existing_segments(const sstring& dir) const { return list_existing_descriptors(dir).then([dir](auto descs) { std::vector paths; std::transform(descs.begin(), descs.end(), std::back_inserter(paths), [&](auto& d) { return dir + "/" + d.filename(); }); return make_ready_future>(std::move(paths)); }); }