Files
scylladb/db/commitlog/commitlog.cc
Calle Wilund 9ba84e458a Commitlog: Handle partial writes in segment::cycle
* Fixes #247
* Re-introduce test_allocation_failure, but allow for the "failure" to not
  happen. I.e. if run with low memory settings, the test will check that
  allocation failure is graceful. With lots of memory it will check partial
  write.
2015-08-31 20:02:05 +03:00

1072 lines
38 KiB
C++

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by Cloudius Systems
* Copyright 2015 Cloudius Systems
*/
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <malloc.h>
#include <regex>
#include <boost/range/adaptor/map.hpp>
#include "core/align.hh"
#include "core/reactor.hh"
#include "core/scollectd.hh"
#include "core/future-util.hh"
#include "core/file.hh"
#include "core/rwlock.hh"
#include "core/fstream.hh"
#include "net/byteorder.hh"
#include "commitlog.hh"
#include "db/config.hh"
#include "utils/data_input.hh"
#include "utils/crc.hh"
#include "utils/runtime.hh"
#include "log.hh"
static logging::logger logger("commitlog");
class crc32_nbo {
crc32 _c;
public:
template <typename T>
void process(T t) {
_c.process(net::hton(t));
}
uint32_t checksum() const {
return _c.get();
}
void process_bytes(const uint8_t* data, size_t size) {
return _c.process(data, size);
}
void process_bytes(const int8_t* data, size_t size) {
return _c.process(reinterpret_cast<const uint8_t*>(data), size);
}
void process_bytes(const char* data, size_t size) {
return _c.process(reinterpret_cast<const uint8_t*>(data), size);
}
};
db::commitlog::config::config(const db::config& cfg)
: commit_log_location(cfg.commitlog_directory())
, commitlog_total_space_in_mb(cfg.commitlog_total_space_in_mb())
, commitlog_segment_size_in_mb(cfg.commitlog_segment_size_in_mb())
, commitlog_sync_period_in_ms(cfg.commitlog_sync_batch_window_in_ms())
, mode(cfg.commitlog_sync() == "batch" ? sync_mode::BATCH : sync_mode::PERIODIC)
{}
db::commitlog::descriptor::descriptor(segment_id_type i, uint32_t v)
: id(i), ver(v) {
}
db::commitlog::descriptor::descriptor(replay_position p)
: descriptor(p.id) {
}
db::commitlog::descriptor::descriptor(std::pair<uint64_t, uint32_t> p)
: descriptor(p.first, p.second) {
}
db::commitlog::descriptor::descriptor(sstring filename)
: descriptor([filename]() {
std::smatch m;
// match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
std::regex rx("(?:.*/)?" + FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
std::string sfilename = filename;
if (!std::regex_match(sfilename, m, rx)) {
throw std::runtime_error("Cannot parse the version of the file: " + filename);
}
if (m[3].length() == 0) {
// CMH. Can most likely ignore this
throw std::domain_error("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
}
segment_id_type id = std::stoull(m[3].str().substr(1));
uint32_t ver = std::stoul(m[2].str());
return std::make_pair(id, ver);
}()) {
}
sstring db::commitlog::descriptor::filename() const {
return FILENAME_PREFIX + std::to_string(ver) + SEPARATOR
+ std::to_string(id) + FILENAME_EXTENSION;
}
db::commitlog::descriptor::operator db::replay_position() const {
return replay_position(id);
}
const std::string db::commitlog::descriptor::SEPARATOR("-");
const std::string db::commitlog::descriptor::FILENAME_PREFIX(
"CommitLog" + SEPARATOR);
const std::string db::commitlog::descriptor::FILENAME_EXTENSION(".log");
class db::commitlog::segment_manager {
public:
config cfg;
const uint64_t max_size;
const uint64_t max_mutation_size;
semaphore _new_segment_semaphore;
scollectd::registrations _regs;
// TODO: verify that we're ok with not-so-great granularity
using clock_type = lowres_clock;
using time_point = clock_type::time_point;
using sseg_ptr = lw_shared_ptr<segment>;
struct stats {
uint64_t cycle_count = 0;
uint64_t flush_count = 0;
uint64_t allocation_count = 0;
uint64_t bytes_written = 0;
uint64_t bytes_slack = 0;
uint64_t segments_created = 0;
uint64_t segments_destroyed = 0;
};
stats totals;
segment_manager(config cfg)
: cfg(cfg), max_size(
std::min<size_t>(std::numeric_limits<position_type>::max(),
std::max<size_t>(cfg.commitlog_segment_size_in_mb,
1) * 1024 * 1024)), max_mutation_size(
max_size >> 1)
{
assert(max_size > 0);
if (cfg.commit_log_location.empty()) {
cfg.commit_log_location = "/var/lib/scylla/commitlog";
}
_regs = create_counters();
}
uint64_t next_id() {
return ++_ids;
}
future<> process(directory_entry de) {
auto entry_type = [this](const directory_entry & de) {
if (!de.type && !de.name.empty()) {
return engine().file_type(cfg.commit_log_location + "/" + de.name);
}
return make_ready_future<std::experimental::optional<directory_entry_type>>(de.type);
};
return entry_type(de).then([de, this](auto type) {
if (type == directory_entry_type::regular) {
descriptor d(de.name);
_ids = std::max(_ids, d.id);
}
return make_ready_future<>();
});
}
future<> init();
future<sseg_ptr> new_segment();
future<sseg_ptr> active_segment();
future<> clear();
scollectd::registrations create_counters();
void discard_unused_segments();
void discard_completed_segments(const cf_id_type& id,
const replay_position& pos);
void sync();
void arm() {
_timer.arm(std::chrono::milliseconds(cfg.commitlog_sync_period_in_ms));
}
std::vector<sstring> get_active_names() const;
using buffer_type = temporary_buffer<char>;
buffer_type acquire_buffer(size_t s);
void release_buffer(buffer_type&&);
future<std::vector<descriptor>> list_descriptors(sstring dir);
private:
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
std::vector<buffer_type> _temp_buffers;
timer<clock_type> _timer;
};
/*
* A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*/
class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
segment_manager* _segment_manager;
descriptor _desc;
file _file;
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _buf_pos = 0;
bool _closed = false;
using buffer_type = segment_manager::buffer_type;
using sseg_ptr = segment_manager::sseg_ptr;
using clock_type = segment_manager::clock_type;
using time_point = segment_manager::time_point;
buffer_type _buffer;
rwlock _dwrite; // used as a barrier between write & flush
std::unordered_map<cf_id_type, position_type> _cf_dirty;
time_point _sync_time;
friend std::ostream& operator<<(std::ostream&, const segment&);
public:
struct cf_mark {
const segment& s;
};
friend std::ostream& operator<<(std::ostream&, const cf_mark&);
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t);
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
static constexpr size_t descriptor_header_size = 4 * sizeof(uint32_t);
// The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t);
static constexpr size_t alignment = 4096;
// TODO : tune initial / default size
static constexpr size_t default_size = align_up<size_t>(128 * 1024, alignment);
segment(segment_manager* m, const descriptor& d, file && f)
: _segment_manager(m), _desc(std::move(d)), _file(std::move(f))
{
++_segment_manager->totals.segments_created;
}
~segment() {
logger.debug("Segment {} is no longer active and will be deleted now", *this);
++_segment_manager->totals.segments_destroyed;
::unlink(
(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str());
}
bool must_sync() {
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
return true;
}
auto now = clock_type::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - _sync_time).count();
return _segment_manager->cfg.commitlog_sync_period_in_ms < uint64_t(ms);
}
/**
* Finalize this segment and get a new one
*/
future<sseg_ptr> finish_and_get_new() {
_closed = true;
return sync().then([](auto seg) {
return seg->_segment_manager->active_segment();
});
}
future<sseg_ptr> sync() {
if (position() <= _flush_pos) {
return make_ready_future<sseg_ptr>(shared_from_this());
}
return cycle().then([](auto seg) {
return seg->flush();
});
}
future<sseg_ptr> flush(uint64_t pos = 0) {
auto me = shared_from_this();
if (pos == 0) {
pos = _file_pos;
}
if (pos != 0 && pos <= _flush_pos) {
return make_ready_future<sseg_ptr>(std::move(me));
}
// Make sure all disk writes are done.
// This is not 100% neccesary, we really only need the ones below our flush pos,
// but since we pretty much assume that task ordering will make this the case anyway...
return _dwrite.write_lock().then(
[this, me = std::move(me), pos]() mutable {
_dwrite.write_unlock(); // release it already.
pos = std::max(pos, _file_pos);
if (pos <= _flush_pos) {
return make_ready_future<sseg_ptr>(std::move(me));
}
_sync_time = clock_type::now();
return _file.flush().handle_exception([](auto ex) {
try {
std::rethrow_exception(ex);
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
} catch (std::exception& e) {
logger.error("Failed to flush commits to disk: {}", e.what());
throw;
} catch (...) {
logger.error("Failed to flush commits to disk.");
throw;
}
}).then([this, pos, me = std::move(me)]() {
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
return make_ready_future<sseg_ptr>(std::move(me));
});
});
}
/**
* Send any buffer contents to disk and get a new tmp buffer
*/
future<sseg_ptr> cycle(size_t s = 0) {
auto size = clear_buffer_slack();
auto buf = std::move(_buffer);
auto off = _file_pos;
_file_pos += size;
_buf_pos = 0;
// if we need new buffer, get one.
// TODO: keep a queue of available buffers?
if (s > 0) {
auto overhead = segment_overhead_size;
if (_file_pos == 0) {
overhead += descriptor_header_size;
}
auto a = align_up(s + overhead, alignment);
auto k = std::max(a, default_size);
for (;;) {
try {
_buffer = _segment_manager->acquire_buffer(k);
break;
} catch (std::bad_alloc&) {
logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024);
if (k > a) {
k = std::max(a, k / 2);
logger.debug("Trying reduced size: {} k", k / 1024);
continue;
}
throw;
}
}
_buf_pos = overhead;
auto * p = reinterpret_cast<uint32_t *>(_buffer.get_write());
std::fill(p, p + overhead, 0);
}
auto me = shared_from_this();
if (size == 0) {
return make_ready_future<sseg_ptr>(std::move(me));
}
auto * p = buf.get_write();
assert(std::count(p, p + 2 * sizeof(uint32_t), 0) == 2 * sizeof(uint32_t));
data_output out(p, p + buf.size());
auto header_size = 0;
if (off == 0) {
// first block. write file header.
out.write(_desc.ver);
out.write(_desc.id);
crc32_nbo crc;
crc.process(_desc.ver);
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
out.write(crc.checksum());
header_size = descriptor_header_size;
}
// write chunk header
crc32_nbo crc;
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
crc.process(uint32_t(off + header_size));
out.write(uint32_t(_file_pos));
out.write(crc.checksum());
// acquire read lock
return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable {
auto written = make_lw_shared<size_t>(0);
auto p = buf.get();
return repeat([this, size, off, written, p]() mutable {
return _file.dma_write(off + *written, p + *written, size - *written).then_wrapped([this, size, written](auto&& f) {
try {
auto bytes = std::get<0>(f.get());
*written += bytes;
_segment_manager->totals.bytes_written += bytes;
++_segment_manager->totals.cycle_count;
if (*written == size) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
// gah, partial write. should always get here with dma chunk sized
// "bytes", but lets make sure...
logger.debug("Partial write: {}/{} bytes", *written, size);
*written = align_down(*written, alignment);
return make_ready_future<stop_iteration>(stop_iteration::no);
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
} catch (std::exception& e) {
logger.error("Failed to persist commits to disk: {}", e.what());
throw;
} catch (...) {
logger.error("Failed to persist commits to disk.");
throw;
}
});
}).finally([this, buf = std::move(buf)]() mutable {
_segment_manager->release_buffer(std::move(buf));
});
}).then([me] {
return make_ready_future<sseg_ptr>(std::move(me));
}).finally([me, this]() {
_dwrite.read_unlock(); // release
});
}
/**
* Add a "mutation" to the segment.
*/
future<replay_position> allocate(const cf_id_type& id, size_t size,
serializer_func func) {
const auto s = size + entry_overhead_size; // total size
if (s > _segment_manager->max_mutation_size) {
return make_exception_future<replay_position>(
std::invalid_argument(
"Mutation of " + std::to_string(s)
+ " bytes is too large for the maxiumum size of "
+ std::to_string(_segment_manager->max_mutation_size)));
}
// would we make the file too big?
if (position() + s > _segment_manager->max_size) {
// do this in next segment instead.
return finish_and_get_new().then(
[id, size, func = std::move(func)](auto new_seg) {
return new_seg->allocate(id, size, func);
});
}
// enough data?
if (s > (_buffer.size() - _buf_pos)) {
// TODO: iff we have to many writes running, maybe we should
// wait for this?
cycle(s);
}
replay_position rp(_desc.id, position());
auto pos = _buf_pos;
_buf_pos += s;
_cf_dirty[id] = rp.pos;
auto * p = _buffer.get_write() + pos;
auto * e = _buffer.get_write() + pos + s - sizeof(uint32_t);
data_output out(p, e);
crc32_nbo crc;
out.write(uint32_t(s));
crc.process(uint32_t(s));
out.write(crc.checksum());
// actual data
func(out);
crc.process_bytes(p + 2 * sizeof(uint32_t), size);
out = data_output(e, sizeof(uint32_t));
out.write(crc.checksum());
++_segment_manager->totals.allocation_count;
// finally, check if we're required to sync.
if (must_sync()) {
return sync().then([rp](auto seg) {
return make_ready_future<replay_position>(rp);
});
}
return make_ready_future<replay_position>(rp);
}
position_type position() const {
return position_type(_file_pos + _buf_pos);
}
size_t size_on_disk() const {
return _file_pos;
}
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
// a.k.a. zero the tail.
size_t clear_buffer_slack() {
auto size = align_up(_buf_pos, alignment);
std::fill(_buffer.get_write() + _buf_pos, _buffer.get_write() + size,
0);
_segment_manager->totals.bytes_slack += (size - _buf_pos);
return size;
}
void mark_clean(const cf_id_type& id, position_type pos) {
auto i = _cf_dirty.find(id);
if (i != _cf_dirty.end() && i->second <= pos) {
_cf_dirty.erase(i);
}
}
void mark_clean(const cf_id_type& id, const replay_position& pos) {
if (pos.id == _desc.id) {
mark_clean(id, pos.pos);
} else if (pos.id > _desc.id) {
mark_clean(id, std::numeric_limits<position_type>::max());
}
}
void mark_clean() {
_cf_dirty.clear();
}
bool is_still_allocating() const {
return !_closed && position() < _segment_manager->max_size;
}
bool is_clean() {
return _cf_dirty.empty();
}
bool is_unused() {
return !is_still_allocating() && is_clean();
}
bool contains(const replay_position& pos) {
return pos.id == _desc.id;
}
sstring get_segment_name() const {
return _desc.filename();
}
};
const size_t db::commitlog::segment::default_size;
future<std::vector<db::commitlog::descriptor>>
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
struct helper {
sstring _dirname;
file _file;
subscription<directory_entry> _list;
std::vector<db::commitlog::descriptor> _result;
helper(helper&&) = default;
helper(sstring n, file && f)
: _dirname(std::move(n)), _file(std::move(f)), _list(
_file.list_directory(
std::bind(&helper::process, this,
std::placeholders::_1))) {
}
future<> process(directory_entry de) {
auto entry_type = [this](const directory_entry & de) {
if (!de.type && !de.name.empty()) {
return engine().file_type(_dirname + "/" + de.name);
}
return make_ready_future<std::experimental::optional<directory_entry_type>>(de.type);
};
return entry_type(de).then([this, de](auto type) {
if (type == directory_entry_type::regular) {
try {
_result.emplace_back(de.name);
} catch (std::domain_error &) {
}
}
return make_ready_future<>();
});
}
future<> done() {
return _list.done();
}
};
return engine().open_directory(dirname).then([this, dirname](auto dir) {
auto h = make_lw_shared<helper>(std::move(dirname), std::move(dir));
return h->done().then([h]() {
return make_ready_future<std::vector<db::commitlog::descriptor>>(std::move(h->_result));
}).finally([h] {});
});
}
future<> db::commitlog::segment_manager::init() {
return list_descriptors(cfg.commit_log_location).then([this](auto descs) {
segment_id_type id = std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count() + 1;
for (auto& d : descs) {
id = std::max(id, replay_position(d.id).base_id());
}
// base id counter is [ <shard> | <base> ]
_ids = replay_position(engine().cpu_id(), id).id;
if (cfg.mode != sync_mode::BATCH) {
_timer.set_callback(std::bind(&segment_manager::sync, this));
this->arm();
}
});
}
scollectd::registrations db::commitlog::segment_manager::create_counters() {
using scollectd::add_polled_metric;
using scollectd::make_typed;
using scollectd::type_instance_id;
using scollectd::per_cpu_plugin_instance;
using scollectd::data_type;
return {
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "queue_length", "segments")
, make_typed(data_type::GAUGE
, std::bind(&decltype(_segments)::size, &_segments))
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "queue_length", "allocating_segments")
, make_typed(data_type::GAUGE
, [this]() {
return std::count_if(_segments.begin(), _segments.end(), [](const sseg_ptr & s) {
return s->is_still_allocating();
});
})
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "queue_length", "unused_segments")
, make_typed(data_type::GAUGE
, [this]() {
return std::count_if(_segments.begin(), _segments.end(), [](const sseg_ptr & s) {
return s->is_unused();
});
})
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "alloc")
, make_typed(data_type::DERIVE, totals.allocation_count)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "cycle")
, make_typed(data_type::DERIVE, totals.cycle_count)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "flush")
, make_typed(data_type::DERIVE, totals.flush_count)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_bytes", "written")
, make_typed(data_type::GAUGE, totals.bytes_written)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_bytes", "slack")
, make_typed(data_type::GAUGE, totals.bytes_slack)
),
};
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::new_segment() {
descriptor d(next_id());
return engine().open_file_dma(cfg.commit_log_location + "/" + d.filename(), open_flags::wo|open_flags::create).then(
[this, d](file f) {
if (cfg.commitlog_total_space_in_mb != 0) {
auto i = _segments.rbegin();
auto e = _segments.rend();
size_t s = 0, n = 0;
while (i != e) {
auto& seg = *i;
s += seg->size_on_disk();
if (!seg->is_still_allocating() && s >= cfg.commitlog_total_space_in_mb) {
seg->mark_clean();
++n;
}
++i;
}
if (n > 0) {
discard_unused_segments();
}
}
_segments.emplace_back(make_lw_shared<segment>(this, d, std::move(f)));
return make_ready_future<sseg_ptr>(_segments.back());
});
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::active_segment() {
if (_segments.empty() || !_segments.back()->is_still_allocating()) {
return _new_segment_semaphore.wait().then([this]() {
if (_segments.empty() || !_segments.back()->is_still_allocating()) {
return new_segment();
}
return make_ready_future<sseg_ptr>(_segments.back());
}).finally([this]() {
_new_segment_semaphore.signal();
});
}
return make_ready_future<sseg_ptr>(_segments.back());
}
/**
* go through all segments, clear id up to pos. if segment becomes clean and unused by this,
* it is discarded.
*/
void db::commitlog::segment_manager::discard_completed_segments(
const cf_id_type& id, const replay_position& pos) {
logger.debug("discard completed log segments for {}, table {}", pos, id);
for (auto&s : _segments) {
s->mark_clean(id, pos);
}
discard_unused_segments();
}
std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment& s) {
return out << "commit log segment (" << s._desc.filename() << ")";
}
std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment::cf_mark& m) {
return out << (m.s._cf_dirty | boost::adaptors::map_keys);
}
std::ostream& db::operator<<(std::ostream& out, const db::replay_position& p) {
return out << "{" << p.shard_id() << ", " << p.base_id() << ", " << p.pos << "}";
}
void db::commitlog::segment_manager::discard_unused_segments() {
auto i = std::remove_if(_segments.begin(), _segments.end(), [=](auto& s) {
if (s->is_unused()) {
logger.debug("{} is unused", *s);
return true;
}
logger.debug("Not safe to delete {}; dirty is {}", s, segment::cf_mark {*s});
return false;
});
if (i != _segments.end()) {
_segments.erase(i, _segments.end());
}
}
/*
* Sync all segments, then clear them out. To ensure all ops are done.
* (Assumes you have barriered adding ops!)
*/
future<> db::commitlog::segment_manager::clear() {
return do_until([this]() {return _segments.empty();}, [this]() {
auto s = _segments.front();
_segments.erase(_segments.begin());
return s->sync().then([](sseg_ptr) {
});
});
}
/**
* Called by timer in periodic mode.
*/
void db::commitlog::segment_manager::sync() {
for (auto& s : _segments) {
if (s->must_sync()) {
s->sync(); // we do not care about waiting...
}
}
arm();
}
std::vector<sstring> db::commitlog::segment_manager::get_active_names() const {
std::vector<sstring> res;
for (auto i: _segments) {
if (!i->is_unused()) {
// Each shared is located in its own directory
res.push_back(cfg.commit_log_location + "/" + i->get_segment_name());
}
}
return res;
}
db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acquire_buffer(size_t s) {
auto i = _temp_buffers.begin();
auto e = _temp_buffers.end();
while (i != e) {
if (i->size() >= s) {
auto r = std::move(*i);
_temp_buffers.erase(i);
return r;
}
++i;
}
auto a = ::memalign(segment::alignment, s);
if (a == nullptr) {
throw std::bad_alloc();
}
return buffer_type(reinterpret_cast<char *>(a), s, make_free_deleter(a));
}
void db::commitlog::segment_manager::release_buffer(buffer_type&& b) {
_temp_buffers.emplace_back(std::move(b));
std::sort(_temp_buffers.begin(), _temp_buffers.end(), [](const buffer_type& b1, const buffer_type& b2) {
return b1.size() < b2.size();
});
constexpr const size_t max_temp_buffers = 4;
if (_temp_buffers.size() > max_temp_buffers) {
_temp_buffers.erase(_temp_buffers.begin() + max_temp_buffers, _temp_buffers.end());
}
}
/**
* Add mutation.
*/
future<db::replay_position> db::commitlog::add(const cf_id_type& id,
size_t size, serializer_func func) {
return _segment_manager->active_segment().then([=](auto s) {
return s->allocate(id, size, std::move(func));
});
}
db::commitlog::commitlog(config cfg)
: _segment_manager(new segment_manager(std::move(cfg))) {
}
db::commitlog::commitlog(commitlog&& v)
: _segment_manager(std::move(v._segment_manager)) {
}
db::commitlog::~commitlog() {
}
future<db::commitlog> db::commitlog::create_commitlog(config cfg) {
commitlog c(std::move(cfg));
auto f = c._segment_manager->init();
return f.then([c = std::move(c)]() mutable {
return make_ready_future<commitlog>(std::move(c));
});
}
void db::commitlog::discard_completed_segments(const cf_id_type& id,
const replay_position& pos) {
_segment_manager->discard_completed_segments(id, pos);
}
size_t db::commitlog::max_record_size() const {
return _segment_manager->max_mutation_size - segment::entry_overhead_size;
}
future<> db::commitlog::clear() {
return _segment_manager->clear();
}
const db::commitlog::config& db::commitlog::active_config() const {
return _segment_manager->cfg;
}
future<subscription<temporary_buffer<char>, db::replay_position>>
db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next, position_type off) {
return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next), off](file f) {
return read_log_file(std::move(f), std::move(next), off);
});
}
subscription<temporary_buffer<char>, db::replay_position>
db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) {
struct work {
file f;
stream<temporary_buffer<char>, replay_position> s;
input_stream<char> fin;
input_stream<char> r;
uint64_t id = 0;
size_t pos = 0;
size_t next = 0;
size_t start_off = 0;
size_t skip_to = 0;
bool eof = false;
bool header = true;
work(file f, position_type o = 0)
: f(f), fin(make_file_input_stream(f)), start_off(o) {
}
work(work&&) = default;
bool advance(const temporary_buffer<char>& buf) {
pos += buf.size();
if (buf.size() == 0) {
eof = true;
}
return !eof;
}
bool end_of_file() const {
return eof;
}
bool end_of_chunk() const {
return eof || next == pos;
}
future<> skip(size_t bytes) {
skip_to = pos + bytes;
return do_until([this] { return pos == skip_to || eof; }, [this, bytes] {
auto s = std::min<size_t>(4096, skip_to - pos);
// should eof be an error here?
return fin.read_exactly(s).then([this](auto buf) {
this->advance(buf);
});
});
}
future<> read_header() {
return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer<char> buf) {
advance(buf);
// Will throw if we got eof
data_input in(buf);
auto ver = in.read<uint32_t>();
auto id = in.read<uint64_t>();
auto checksum = in.read<uint32_t>();
crc32_nbo crc;
crc.process(ver);
crc.process<int32_t>(id & 0xffffffff);
crc.process<int32_t>(id >> 32);
auto cs = crc.checksum();
if (cs != checksum) {
throw std::runtime_error("Checksum error in file header");
}
this->id = id;
this->next = 0;
if (start_off > pos) {
return skip(start_off - pos);
}
return make_ready_future<>();
});
}
future<> read_chunk() {
return fin.read_exactly(segment::segment_overhead_size).then([this](temporary_buffer<char> buf) {
auto start = pos;
if (!advance(buf)) {
return make_ready_future<>();
}
data_input in(buf);
auto next = in.read<uint32_t>();
auto checksum = in.read<uint32_t>();
crc32_nbo crc;
crc.process<int32_t>(id & 0xffffffff);
crc.process<int32_t>(id >> 32);
crc.process<uint32_t>(start);
auto cs = crc.checksum();
if (cs != checksum) {
throw std::runtime_error("Checksum error in chunk header");
}
this->next = next;
return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this));
});
}
future<> read_entry() {
static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t);
return fin.read_exactly(entry_header_size).then([this](temporary_buffer<char> buf) {
replay_position rp(id, position_type(pos));
if (!advance(buf)) {
return make_ready_future<>();
}
data_input in(buf);
auto size = in.read<uint32_t>();
auto checksum = in.read<uint32_t>();
if (size == 0) {
// special urchin case: zero padding due to dma blocks
auto slack = next - pos;
return skip(slack);
}
if (size < 3 * sizeof(uint32_t)) {
throw std::runtime_error("Invalid entry size");
}
return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer<char> buf) {
advance(buf);
data_input in(buf);
auto data_size = size - segment::entry_overhead_size;
in.skip(data_size);
auto checksum = in.read<uint32_t>();
crc32_nbo crc;
crc.process(size);
crc.process_bytes(buf.get(), data_size);
if (crc.checksum() != checksum) {
throw std::runtime_error("Checksum error in data entry");
}
return s.produce(buf.share(0, data_size), rp);
});
});
}
future<> read_file() {
return read_header().then(
[this] {
return do_until(std::bind(&work::end_of_file, this), std::bind(&work::read_chunk, this));
});
}
};
auto w = make_lw_shared<work>(std::move(f), off);
auto ret = w->s.listen(std::move(next));
w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] {
w->s.close();
});
return ret;
}
std::vector<sstring> db::commitlog::get_active_segment_names() const {
return _segment_manager->get_active_names();
}
future<std::vector<db::commitlog::descriptor>> db::commitlog::list_existing_descriptors() const {
return list_existing_descriptors(active_config().commit_log_location);
}
future<std::vector<db::commitlog::descriptor>> db::commitlog::list_existing_descriptors(const sstring& dir) const {
return _segment_manager->list_descriptors(dir);
}
future<std::vector<sstring>> db::commitlog::list_existing_segments() const {
return list_existing_segments(active_config().commit_log_location);
}
future<std::vector<sstring>> db::commitlog::list_existing_segments(const sstring& dir) const {
return list_existing_descriptors(dir).then([dir](auto descs) {
std::vector<sstring> paths;
std::transform(descs.begin(), descs.end(), std::back_inserter(paths), [&](auto& d) {
return dir + "/" + d.filename();
});
return make_ready_future<std::vector<sstring>>(std::move(paths));
});
}