Files
scylladb/db/commitlog/commitlog.cc
Michał Chojnowski 5e88421360 commitlog: fix total_size_on_disk accounting after segment file removal
Currently, segment file removal first calls `f.remove_file()` and
does `total_size_on_disk -= f.known_size()` later.
However, `remove_file()` resets `known_size` to 0, so in effect
the freed space in not accounted for.

`total_size_on_disk` is not just a metric. It is also responsible
for deciding whether a segment should be recycled -- it is recycled
only if `total_size_on_disk - known_size < max_disk_size`.
Therefore this bug has dire performance consequences:
if `total_size_on_disk - known_size` ever exceeds `max_disk_size`,
the recycling of commitlog segments will stop permanently, because
`total_size_on_disk - known_size` will never go back below
`max_disk_size` due to the accounting bug. All new segments from this
point will be allocated from scratch.

The bug was uncovered by a QA performance test. It isn't easy to trigger --
it took the test 7 hours of constant high load to step into it.
However, the fact that the effect is permanent, and degrades the
performance of the cluster silently, makes the bug potentially quite severe.

The bug can be easily spotted with Prometheus as infinitely rising
`commitlog_total_size_on_disk` on the affected shards.

Fixes #12645

Closes #12646

(cherry picked from commit fa7e904cd6)
2023-02-01 21:54:52 +02:00

2984 lines
109 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <malloc.h>
#include <regex>
#include <filesystem>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <unordered_map>
#include <unordered_set>
#include <exception>
#include <filesystem>
#include <seastar/core/align.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/file.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/net/byteorder.hh>
#include <seastar/util/defer.hh>
#include "seastarx.hh"
#include "commitlog.hh"
#include "rp_set.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "utils/data_input.hh"
#include "utils/crc.hh"
#include "utils/runtime.hh"
#include "utils/flush_queue.hh"
#include "log.hh"
#include "commitlog_entry.hh"
#include "commitlog_extensions.hh"
#include "service/priority_manager.hh"
#include "serializer.hh"
#include <boost/range/numeric.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include "checked-file-impl.hh"
#include "utils/disk-error-handler.hh"
static logging::logger clogger("commitlog");
using namespace std::chrono_literals;
class crc32_nbo {
utils::crc32 _c;
public:
template <typename T>
void process(T t) {
_c.process_be(t);
}
uint32_t checksum() const {
return _c.get();
}
void process_bytes(const uint8_t* data, size_t size) {
return _c.process(data, size);
}
void process_bytes(const int8_t* data, size_t size) {
return _c.process(reinterpret_cast<const uint8_t*>(data), size);
}
void process_bytes(const char* data, size_t size) {
return _c.process(reinterpret_cast<const uint8_t*>(data), size);
}
template<typename FragmentedBuffer>
requires FragmentRange<FragmentedBuffer>
void process_fragmented(const FragmentedBuffer& buffer) {
return _c.process_fragmented(buffer);
}
};
class db::cf_holder {
public:
virtual ~cf_holder() {};
virtual void release_cf_count(const cf_id_type&) = 0;
};
db::commitlog::config db::commitlog::config::from_db_config(const db::config& cfg, size_t shard_available_memory) {
config c;
c.commit_log_location = cfg.commitlog_directory();
c.metrics_category_name = "commitlog";
c.commitlog_total_space_in_mb = cfg.commitlog_total_space_in_mb() >= 0 ? cfg.commitlog_total_space_in_mb() : (shard_available_memory * smp::count) >> 20;
c.commitlog_segment_size_in_mb = cfg.commitlog_segment_size_in_mb();
c.commitlog_sync_period_in_ms = cfg.commitlog_sync_period_in_ms();
c.mode = cfg.commitlog_sync() == "batch" ? sync_mode::BATCH : sync_mode::PERIODIC;
c.extensions = &cfg.extensions();
c.use_o_dsync = cfg.commitlog_use_o_dsync();
c.allow_going_over_size_limit = !cfg.commitlog_use_hard_size_limit();
if (cfg.commitlog_flush_threshold_in_mb() >= 0) {
c.commitlog_flush_threshold_in_mb = cfg.commitlog_flush_threshold_in_mb();
}
return c;
}
db::commitlog::descriptor::descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v, sstring fname)
: _filename(std::move(fname)), id(i), ver(v), filename_prefix(fname_prefix) {
}
db::commitlog::descriptor::descriptor(replay_position p, const std::string& fname_prefix)
: descriptor(p.id, fname_prefix) {
}
db::commitlog::descriptor::descriptor(const sstring& filename, const std::string& fname_prefix)
: descriptor([&filename, &fname_prefix]() {
std::smatch m;
// match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
std::regex rx("(?:Recycled-)?" + fname_prefix + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
std::string sfilename = filename;
auto cbegin = sfilename.cbegin();
// skip the leading path
// Note: we're using rfind rather than the regex above
// since it may run out of stack in debug builds.
// See https://github.com/scylladb/scylla/issues/4464
auto pos = std::string(filename).rfind('/');
if (pos != std::string::npos) {
cbegin += pos + 1;
}
if (!std::regex_match(cbegin, sfilename.cend(), m, rx)) {
throw std::domain_error("Cannot parse the version of the file: " + filename);
}
if (m[3].length() == 0) {
// CMH. Can most likely ignore this
throw std::domain_error("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
}
segment_id_type id = std::stoull(m[3].str().substr(1));
uint32_t ver = std::stoul(m[2].str());
return descriptor(id, fname_prefix, ver, filename);
}()) {
}
sstring db::commitlog::descriptor::filename() const {
if (!_filename.empty()) {
return _filename;
}
return filename_prefix + std::to_string(ver) + SEPARATOR
+ std::to_string(id) + FILENAME_EXTENSION;
}
db::commitlog::descriptor::operator db::replay_position() const {
return replay_position(id);
}
/**
* virtual dispatch for actually inputting data.
* purposely de/un-templated
*
* Writes N entries to a single segment,
* where each entry has its own header+crc,
* i.e. will be deserialized separately.
*/
struct db::commitlog::entry_writer {
force_sync sync;
size_t num_entries;
explicit entry_writer(force_sync fs, size_t ne = 1)
: sync(fs)
, num_entries(ne)
{}
virtual ~entry_writer() = default;
/** return the CF id for n:th entry */
virtual const cf_id_type& id(size_t) const = 0;
/**
* Returns segment-independent size of all entries combined. Must be >= than segment-dependant total size.
* This is always called first, and should return "worst-case"
* for the complete set of entries
*/
virtual size_t size() const = 0;
/**
* Return the total size of all entries in this given segment
* Called after size(void), once a segment has been chosen.
* Should return the total, exact, size for all entries + overhead (i.e. schema)
* for this segment.
*
* Can be called more than once, if segment switch is neccesary (because race)
*/
virtual size_t size(segment&) = 0;
/**
* return the size of the n:th entry in this given segment
* Only called IFF num_entries > 1, and if so, after size(void)/size(segment&)
* and before write(...)
*/
virtual size_t size(segment&, size_t) = 0;
/* write nth entry */
virtual void write(segment&, output&, size_t) const = 0;
/** the resulting rp_handle for writing a given entry */
virtual void result(size_t, rp_handle) = 0;
};
const std::string db::commitlog::descriptor::SEPARATOR("-");
const std::string db::commitlog::descriptor::FILENAME_PREFIX(
"CommitLog" + SEPARATOR);
const std::string db::commitlog::descriptor::FILENAME_EXTENSION(".log");
class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_manager> {
public:
config cfg;
std::vector<sstring> _segments_to_replay;
const uint64_t max_size;
const uint64_t max_mutation_size;
// Divide the size-on-disk threshold by #cpus used, since we assume
// we distribute stuff more or less equally across shards.
const uint64_t max_disk_size; // per-shard
const uint64_t disk_usage_threshold;
bool _shutdown = false;
std::optional<shared_promise<>> _shutdown_promise = {};
struct request_controller_timeout_exception_factory {
class request_controller_timed_out_error : public timed_out_error {
public:
virtual const char* what() const noexcept override {
return "commitlog: timed out";
}
};
static auto timeout() {
return request_controller_timed_out_error();
}
};
// Allocation must throw timed_out_error by contract.
using timeout_exception_factory = request_controller_timeout_exception_factory;
basic_semaphore<timeout_exception_factory> _flush_semaphore;
seastar::metrics::metric_groups _metrics;
// TODO: verify that we're ok with not-so-great granularity
using clock_type = lowres_clock;
using time_point = clock_type::time_point;
using sseg_ptr = ::shared_ptr<segment>;
using request_controller_type = basic_semaphore<timeout_exception_factory, db::timeout_clock>;
using request_controller_units = semaphore_units<timeout_exception_factory, db::timeout_clock>;
request_controller_type _request_controller;
class named_file : public seastar::file {
sstring _name;
uint64_t _known_size = 0;
template<typename Func, typename... Args>
auto make_awaiter(future<Args...>, Func func);
template<typename Func, typename... Args>
struct myawait;
public:
named_file(std::string_view name)
: _name(name)
{}
named_file(named_file&&) = default;
future<> open(open_flags, file_open_options, std::optional<uint64_t> size_in = std::nullopt) noexcept;
future<> rename(std::string_view to);
// NOT overrides. Nothing virtual. Will rely on exact type
template <typename CharType>
auto dma_write(uint64_t pos, const CharType* buffer, size_t len, const io_priority_class& pc = default_priority_class(), io_intent* intent = nullptr) noexcept;
auto dma_write(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc = default_priority_class(), io_intent* intent = nullptr) noexcept;
auto truncate(uint64_t length) noexcept;
auto allocate(uint64_t position, uint64_t length) noexcept;
auto remove_file() noexcept;
void assign(file&& f, uint64_t size) {
this->file::operator=(std::move(f));
this->_known_size = size;
}
uint64_t known_size() const {
return _known_size;
}
const sstring& name() const {
return _name;
}
void maybe_update_size(uint64_t pos) {
_known_size = std::max(pos, _known_size);
}
};
// Segments dropped while not clean may not be
// deleted. Marker enum to keep track of this.
enum class dispose_mode : char {
Delete, ForceDelete, Keep,
};
std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
std::vector<std::pair<named_file, dispose_mode>> _files_to_dispose;
void account_memory_usage(size_t size) noexcept {
_request_controller.consume(size);
}
void notify_memory_written(size_t size) noexcept {
_request_controller.signal(size);
}
template<typename T, typename R = typename T::result_type>
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
future<R> allocate_when_possible(T writer, db::timeout_clock::time_point timeout);
template<typename T>
struct byte_flow {
T bytes_written = 0;
T bytes_released = 0;
T bytes_flush_requested = 0;
byte_flow operator+(const byte_flow& rhs) const {
return byte_flow{
.bytes_written = bytes_written + rhs.bytes_written,
.bytes_released = bytes_released + rhs.bytes_released,
.bytes_flush_requested = bytes_flush_requested + rhs.bytes_flush_requested,
};
}
byte_flow operator-(const byte_flow& rhs) const {
return byte_flow{
.bytes_written = bytes_written - rhs.bytes_written,
.bytes_released = bytes_released - rhs.bytes_released,
.bytes_flush_requested = bytes_flush_requested - rhs.bytes_flush_requested,
};
}
byte_flow<double> operator/(double d) const {
return byte_flow<double>{
.bytes_written = bytes_written / d,
.bytes_released = bytes_released / d,
.bytes_flush_requested = bytes_flush_requested / d,
};
}
friend std::ostream& operator<<(std::ostream& os, const byte_flow& r) {
return os << std::fixed
<< "[written=" << r.bytes_written
<< ", released=" << r.bytes_released
<< ", flush_req=" << r.bytes_flush_requested
<< "]";
}
};
struct stats : public byte_flow<uint64_t> {
uint64_t cycle_count = 0;
uint64_t flush_count = 0;
uint64_t allocation_count = 0;
uint64_t bytes_slack = 0;
uint64_t segments_created = 0;
uint64_t segments_destroyed = 0;
uint64_t pending_flushes = 0;
uint64_t flush_limit_exceeded = 0;
uint64_t buffer_list_bytes = 0;
// size on disk, actually used - i.e. containing data (allocate+cycle)
uint64_t active_size_on_disk = 0;
uint64_t wasted_size_on_disk = 0;
// size allocated on disk - i.e. files created (new, reserve, recycled)
uint64_t total_size_on_disk = 0;
uint64_t requests_blocked_memory = 0;
uint64_t blocked_on_new_segment = 0;
uint64_t active_allocations = 0;
};
class scope_increment_counter {
uint64_t& _dst;
public:
scope_increment_counter(uint64_t& dst)
: _dst(dst)
{
++_dst;
}
~scope_increment_counter() {
--_dst;
}
};
stats totals;
byte_flow<uint64_t> last_bytes;
byte_flow<double> bytes_rate;
typename std::chrono::high_resolution_clock::time_point last_time;
size_t pending_allocations() const {
return _request_controller.waiters();
}
future<> begin_flush() {
++totals.pending_flushes;
if (totals.pending_flushes >= cfg.max_active_flushes) {
++totals.flush_limit_exceeded;
clogger.trace("Flush ops overflow: {}. Will block.", totals.pending_flushes);
}
return _flush_semaphore.wait();
}
void end_flush() noexcept {
_flush_semaphore.signal();
--totals.pending_flushes;
}
segment_manager(config c);
~segment_manager() {
clogger.trace("Commitlog {} disposed", cfg.commit_log_location);
}
uint64_t next_id() {
return ++_ids;
}
void sanity_check_size(size_t size) {
if (size > max_mutation_size) {
throw std::invalid_argument(
"Mutation of " + std::to_string(size)
+ " bytes is too large for the maximum size of "
+ std::to_string(max_mutation_size));
}
}
future<> init();
future<sseg_ptr> new_segment();
future<sseg_ptr> active_segment(db::timeout_clock::time_point timeout);
future<sseg_ptr> allocate_segment();
future<sseg_ptr> allocate_segment_ex(descriptor, named_file, open_flags);
sstring filename(const descriptor& d) const {
return cfg.commit_log_location + "/" + d.filename();
}
future<> clear();
future<> sync_all_segments();
future<> shutdown_all_segments();
future<> shutdown();
void create_counters(const sstring& metrics_category_name);
future<> orphan_all();
void add_file_to_dispose(named_file, dispose_mode);
future<> do_pending_deletes();
future<> delete_segments(std::vector<sstring>);
void discard_unused_segments() noexcept;
void discard_completed_segments(const cf_id_type&) noexcept;
void discard_completed_segments(const cf_id_type&, const rp_set&) noexcept;
void on_timer();
void sync();
void arm(uint32_t extra = 0) {
if (!_shutdown) {
_timer.arm(std::chrono::milliseconds(cfg.commitlog_sync_period_in_ms + extra));
}
}
std::vector<sstring> get_active_names() const;
uint64_t get_num_dirty_segments() const;
uint64_t get_num_active_segments() const;
using buffer_type = fragmented_temporary_buffer;
buffer_type acquire_buffer(size_t s, size_t align);
temporary_buffer<char> allocate_single_buffer(size_t, size_t);
future<std::vector<descriptor>> list_descriptors(sstring dir);
flush_handler_id add_flush_handler(flush_handler h) {
auto id = ++_flush_ids;
_flush_handlers[id] = std::move(h);
return id;
}
void remove_flush_handler(flush_handler_id id) {
_flush_handlers.erase(id);
}
void flush_segments(uint64_t size_to_remove);
private:
class shutdown_marker{};
future<> clear_reserve_segments();
void abort_recycled_list(std::exception_ptr);
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
queue<sseg_ptr> _reserve_segments;
queue<named_file> _recycled_segments;
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
flush_handler_id _flush_ids = 0;
replay_position _flush_position;
timer<clock_type> _timer;
future<> replenish_reserve();
future<> _reserve_replenisher;
future<> _background_sync;
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
};
future<> db::commitlog::segment_manager::named_file::open(open_flags flags, file_open_options opt, std::optional<uint64_t> size_in) noexcept {
assert(!*this);
auto f = co_await open_file_dma(_name, flags, opt);
// bypass roundtrip to disk if caller knows size, or open flags truncated file
auto existing_size = size_in
? *size_in
: (flags & open_flags::truncate) == open_flags{}
? co_await f.size()
: 0
;
assign(std::move(f), existing_size);
}
future<> db::commitlog::segment_manager::named_file::rename(std::string_view to) {
assert(!*this);
try {
auto s = sstring(to);
auto dir = std::filesystem::path(to).parent_path();
co_await seastar::rename_file(_name, s);
_name = std::move(s);
co_await seastar::sync_directory(dir.string());
} catch (...) {
commit_error_handler(std::current_exception());
throw;
}
}
/**
* Special awaiter for chaining a callee-continuation (essentially "then") into calling
* co-routine frame, by storing the continuation not as a chained task, but a func
* in the returned awaiter. The function is only executed iff we don't except
* the base call.
* This is a very explicit way to optimize away a then or coroutine frame, the latter
* which the compiler really should be able to coalesque, but...
*/
template<typename Func, typename... Args>
struct db::commitlog::segment_manager::named_file::myawait : public seastar::internal::awaiter<true, Args...> {
using mybase = seastar::internal::awaiter<true, Args...>;
using resume_type = decltype(std::declval<mybase>().await_resume());
Func _func;
myawait(future<Args...> f, Func func)
: mybase(std::move(f))
, _func(std::move(func))
{}
resume_type await_resume() {
if constexpr (std::is_same_v<resume_type, void>) {
mybase::await_resume();
_func();
} else {
return _func(mybase::await_resume());
}
}
};
template<typename Func, typename... Args>
auto db::commitlog::segment_manager::named_file::make_awaiter(future<Args...> f, Func func) {
return myawait<Func, Args...>(std::move(f), std::move(func));
}
template <typename CharType>
auto db::commitlog::segment_manager::named_file::dma_write(uint64_t pos, const CharType* buffer, size_t len, const io_priority_class& pc, io_intent* intent) noexcept {
return make_awaiter(file::dma_write(pos, buffer, len, pc, intent), [this, pos](size_t res) {
maybe_update_size(pos + res);
return res;
});
}
auto db::commitlog::segment_manager::named_file::dma_write(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc, io_intent* intent) noexcept {
return make_awaiter(file::dma_write(pos, std::move(iov), pc, intent), [this, pos](size_t res) {
maybe_update_size(pos + res);
return res;
});
}
auto db::commitlog::segment_manager::named_file::truncate(uint64_t length) noexcept {
return make_awaiter(file::truncate(length), [this, length] {
_known_size = length;
});
}
auto db::commitlog::segment_manager::named_file::allocate(uint64_t position, uint64_t length) noexcept {
return make_awaiter(file::allocate(position, length), [this, position, length] {
_known_size = position + length;
});
}
auto db::commitlog::segment_manager::named_file::remove_file() noexcept {
return make_awaiter(seastar::remove_file(name()), [this] {
_known_size = 0;
});
}
template<typename T>
static void write(fragmented_temporary_buffer::ostream& out, T value) {
auto v = net::hton(value);
out.write(reinterpret_cast<const char*>(&v), sizeof(v));
}
template<typename T, typename Input>
std::enable_if_t<std::is_fundamental<T>::value, T> read(Input& in) {
return net::ntoh(in.template read<T>());
}
/*
* A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*
* The IO flow is somewhat convoluted and goes something like this:
*
* Mutation path:
* - Adding data to the segment usually writes into the internal buffer
* - On EOB or overflow we issue a write to disk ("cycle").
* - A cycle call will acquire the segment read lock and send the
* buffer to the corresponding position in the file
* - If we are periodic and crossed a timing threshold, or running "batch" mode
* we might be forced to issue a flush ("sync") after adding data
* - A sync call acquires the write lock, thus locking out writes
* and waiting for pending writes to finish. It then checks the
* high data mark, and issues the actual file flush.
* Note that the write lock is released prior to issuing the
* actual file flush, thus we are allowed to write data to
* after a flush point concurrently with a pending flush.
*
* Sync timer:
* - In periodic mode, we try to primarily issue sync calls in
* a timer task issued every N seconds. The timer does the same
* operation as the above described sync, and resets the timeout
* so that mutation path will not trigger syncs and delay.
*
* Note that we do not care which order segment chunks finish writing
* to disk, other than all below a flush point must finish before flushing.
*
* We currently do not wait for flushes to finish before issueing the next
* cycle call ("after" flush point in the file). This might not be optimal.
*
* To close and finish a segment, we first close the gate object that guards
* writing data to it, then flush it fully (including waiting for futures create
* by the timer to run their course), and finally wait for it to
* become "clean", i.e. get notified that all mutations it holds have been
* persisted to sstables elsewhere. Once this is done, we can delete the
* segment. If a segment (object) is deleted without being fully clean, we
* do not remove the file on disk.
*
*/
class db::commitlog::segment : public enable_shared_from_this<segment>, public cf_holder {
friend class rp_handle;
using named_file = segment_manager::named_file;
using dispose_mode = segment_manager::dispose_mode;
::shared_ptr<segment_manager> _segment_manager;
descriptor _desc;
named_file _file;
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _waste = 0;
size_t _alignment;
bool _closed = false;
bool _terminated = false;
using buffer_type = segment_manager::buffer_type;
using sseg_ptr = segment_manager::sseg_ptr;
using clock_type = segment_manager::clock_type;
using time_point = segment_manager::time_point;
buffer_type _buffer;
fragmented_temporary_buffer::ostream _buffer_ostream;
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
time_point _sync_time;
utils::flush_queue<replay_position, std::less<replay_position>, clock_type> _pending_ops;
uint64_t _num_allocs = 0;
std::unordered_set<table_schema_version> _known_schema_versions;
friend std::ostream& operator<<(std::ostream&, const segment&);
friend class segment_manager;
size_t buffer_position() const {
return _buffer.size_bytes() - _buffer_ostream.size();
}
future<> begin_flush() {
// This is maintaining the semantica of only using the write-lock
// as a gate for flushing, i.e. once we've begun a flush for position X
// we are ok with writes to positions > X
return _segment_manager->begin_flush();
}
void end_flush() {
_segment_manager->end_flush();
if (can_delete()) {
_segment_manager->discard_unused_segments();
}
}
public:
struct cf_mark {
const segment& s;
};
friend std::ostream& operator<<(std::ostream&, const cf_mark&);
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t);
static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t);
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
static constexpr size_t descriptor_header_size = 5 * sizeof(uint32_t);
static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C';
static constexpr uint32_t multi_entry_size_magic = 0xffffffff;
// The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t);
// TODO : tune initial / default size
static constexpr size_t default_size = 128 * 1024;
segment(::shared_ptr<segment_manager> m, descriptor&& d, named_file&& f, size_t alignment)
: _segment_manager(std::move(m)), _desc(std::move(d)), _file(std::move(f)),
_alignment(alignment),
_sync_time(clock_type::now()), _pending_ops(true) // want exception propagation
{
++_segment_manager->totals.segments_created;
clogger.debug("Created new segment {}", *this);
}
~segment() {
dispose_mode mode = dispose_mode::Keep;
if (is_clean()) {
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.bytes_released += file_position();
_segment_manager->totals.wasted_size_on_disk -= _waste;
mode = dispose_mode::Delete;
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
}
_segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes();
if (mode != dispose_mode::Keep || _file) {
_segment_manager->add_file_to_dispose(std::move(_file), mode);
}
}
uint64_t size_on_disk() const noexcept {
return _file.known_size();
}
bool is_schema_version_known(schema_ptr s) {
return _known_schema_versions.contains(s->version());
}
void add_schema_version(schema_ptr s) {
_known_schema_versions.emplace(s->version());
}
void forget_schema_versions() {
_known_schema_versions.clear();
}
void release_cf_count(const cf_id_type& cf) override {
mark_clean(cf, 1);
if (can_delete()) {
_segment_manager->discard_unused_segments();
}
}
bool must_sync() {
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
return false;
}
auto now = clock_type::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - _sync_time).count();
if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) {
clogger.debug("{} needs sync. {} ms elapsed", *this, ms);
return true;
}
return false;
}
/**
* Finalize this segment and get a new one
*/
future<sseg_ptr> finish_and_get_new(db::timeout_clock::time_point timeout) {
//FIXME: discarded future.
(void)close();
return _segment_manager->active_segment(timeout);
}
void reset_sync_time() {
_sync_time = clock_type::now();
}
future<sseg_ptr> shutdown() {
/**
* When we are shutting down, we first
* close the segment, thus no new
* data can be appended. Then we just issue a
* flush, which will wait for any queued ops
* to complete as well. Then we close the ops
* queue, just to be sure.
*/
auto me = shared_from_this();
// could have kept the "finally" continuations
// here, but this potentially missed immediate
// exceptions thrown in close/p_o.close.
std::exception_ptr p;
try {
co_await close();
} catch (...) {
p = std::current_exception();
}
co_await _pending_ops.close();
co_await _file.truncate(_flush_pos);
co_await _file.close();
if (p) {
co_return coroutine::exception(std::move(p));
}
co_return me;
}
// See class comment for info
future<sseg_ptr> sync() {
// Note: this is not a marker for when sync was finished.
// It is when it was initiated
reset_sync_time();
return cycle(true);
}
// See class comment for info
future<sseg_ptr> flush() {
auto me = shared_from_this();
assert(me.use_count() > 1);
uint64_t pos = _file_pos;
clogger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
// Only run the flush when all write ops at lower rp:s
// have completed.
replay_position rp(_desc.id, position_type(pos));
// Run like this to ensure flush ordering, and making flushes "waitable"
co_await _pending_ops.run_with_ordered_post_op(rp, [] {}, [&] {
assert(_pending_ops.has_operation(rp));
return do_flush(pos);
});
co_return me;
}
future<sseg_ptr> terminate() {
assert(_closed);
if (!std::exchange(_terminated, true)) {
// write a terminating zero block iff we are ending (a reused)
// block before actual file end.
// we should only get here when all actual data is
// already flushed (see below, close()).
if (file_position() < _segment_manager->max_size) {
clogger.trace("{} is closed but not terminated.", *this);
if (_buffer.empty()) {
new_buffer(0);
}
return cycle(true, true);
}
}
return make_ready_future<sseg_ptr>(shared_from_this());
}
future<sseg_ptr> close() {
_closed = true;
auto s = co_await sync();
co_await flush();
co_await terminate();
_waste = _file.known_size() - file_position();
_segment_manager->totals.wasted_size_on_disk += _waste;
co_return s;
}
future<sseg_ptr> do_flush(uint64_t pos) {
auto me = shared_from_this();
co_await begin_flush();
auto finally = defer([&] () noexcept {
end_flush();
});
if (pos <= _flush_pos) {
clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
co_return me;
}
try {
co_await _file.flush();
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
clogger.trace("{} synced to {}", *this, _flush_pos);
} catch (...) {
clogger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
}
co_return me;
}
/**
* Allocate a new buffer
*/
void new_buffer(size_t s) {
assert(_buffer.empty());
auto overhead = segment_overhead_size;
if (_file_pos == 0) {
overhead += descriptor_header_size;
}
auto a = align_up(s + overhead, _alignment);
auto k = std::max(a, default_size);
_buffer = _segment_manager->acquire_buffer(k, _alignment);
_buffer_ostream = _buffer.get_ostream();
auto out = _buffer_ostream.write_substream(overhead);
out.fill('\0', overhead);
_segment_manager->totals.buffer_list_bytes += _buffer.size_bytes();
}
bool buffer_is_empty() const {
return buffer_position() <= segment_overhead_size
|| (_file_pos == 0 && buffer_position() <= (segment_overhead_size + descriptor_header_size));
}
/**
* Send any buffer contents to disk and get a new tmp buffer
*/
// See class comment for info
future<sseg_ptr> cycle(bool flush_after = false, bool termination = false) {
auto me = shared_from_this();
if (_buffer.empty() && !termination) {
if (flush_after) {
co_await flush();
}
co_return me;
}
auto size = clear_buffer_slack();
auto buf = std::exchange(_buffer, { });
auto off = _file_pos;
auto top = off + size;
auto num = _num_allocs;
_file_pos = top;
_buffer_ostream = { };
_num_allocs = 0;
assert(me.use_count() > 1);
auto out = buf.get_ostream();
auto header_size = 0;
if (off == 0) {
// first block. write file header.
write(out, segment_magic);
write(out, _desc.ver);
write(out, _desc.id);
crc32_nbo crc;
crc.process(_desc.ver);
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
write(out, crc.checksum());
header_size = descriptor_header_size;
}
if (!termination) {
// write chunk header
crc32_nbo crc;
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
crc.process(uint32_t(off + header_size));
write(out, uint32_t(_file_pos));
write(out, crc.checksum());
forget_schema_versions();
clogger.trace("Writing {} entries, {} k in {} -> {}", num, size, off, off + size);
} else {
assert(num == 0);
assert(_closed);
clogger.trace("Terminating {} at pos {}", *this, _file_pos);
write(out, uint64_t(0));
}
replay_position rp(_desc.id, position_type(off));
// The write will be allowed to start now, but flush (below) must wait for not only this,
// but all previous write/flush pairs.
co_await _pending_ops.run_with_ordered_post_op(rp, [&]() -> future<> {
auto view = fragmented_temporary_buffer::view(buf);
view.remove_suffix(buf.size_bytes() - size);
assert(size == view.size_bytes());
if (view.empty()) {
co_return;
}
auto&& priority_class = service::get_local_commitlog_priority();
auto finally = defer([&] () noexcept {
_segment_manager->notify_memory_written(size);
_segment_manager->totals.buffer_list_bytes -= buf.size_bytes();
if (_file.known_size() < _file_pos) {
_segment_manager->totals.total_size_on_disk += (_file_pos - _file.known_size());
}
});
for (;;) {
auto current = *view.begin();
try {
auto bytes = co_await _file.dma_write(off, current.data(), current.size(), priority_class);
_segment_manager->totals.bytes_written += bytes;
_segment_manager->totals.active_size_on_disk += bytes;
++_segment_manager->totals.cycle_count;
if (bytes == view.size_bytes()) {
clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, *this, size, size, off);
break;
}
// gah, partial write. should always get here with dma chunk sized
// "bytes", but lets make sure...
bytes = align_down(bytes, _alignment);
off += bytes;
view.remove_prefix(bytes);
clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, *this, size - view.size_bytes(), size, off - bytes);
continue;
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
} catch (...) {
clogger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
throw;
}
}
}, [&]() -> future<> {
assert(_pending_ops.has_operation(rp));
if (flush_after) {
co_await do_flush(top);
}
});
co_return me;
}
future<sseg_ptr> batch_cycle(timeout_clock::time_point timeout) {
/**
* For batch mode we force a write "immediately".
* However, we first wait for all previous writes/flushes
* to complete.
*
* This has the benefit of allowing several allocations to
* queue up in a single buffer.
*/
auto me = shared_from_this();
auto fp = _file_pos;
try {
co_await _pending_ops.wait_for_pending(timeout);
if (fp != _file_pos) {
// some other request already wrote this buffer.
// If so, wait for the operation at our intended file offset
// to finish, then we know the flush is complete and we
// are in accord.
// (Note: wait_for_pending(pos) waits for operation _at_ pos (and before),
replay_position rp(_desc.id, position_type(fp));
co_await _pending_ops.wait_for_pending(rp, timeout);
assert(_segment_manager->cfg.mode != sync_mode::BATCH || _flush_pos > fp);
if (_flush_pos <= fp) {
// previous op we were waiting for was not sync one, so it did not flush
// force flush here
co_await do_flush(fp);
}
} else {
// It is ok to leave the sync behind on timeout because there will be at most one
// such sync, all later allocations will block on _pending_ops until it is done.
co_await with_timeout(timeout, sync());
}
} catch (...) {
// If we get an IO exception (which we assume this is)
// we should close the segment.
// TODO: should we also trunctate away any partial write
// we did?
me->_closed = true; // just mark segment as closed, no writes will be done.
throw;
};
co_return me;
}
void background_cycle() {
//FIXME: discarded future
(void)cycle().discard_result().handle_exception([] (auto ex) {
clogger.error("Failed to flush commits to disk: {}", ex);
});
}
enum class write_result {
ok,
must_sync,
no_space,
ok_need_batch_sync,
};
/**
* Add a "mutation" to the segment.
* Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not
* die. We don't keep ourselves alive (anymore)
*/
write_result allocate(entry_writer& writer, segment_manager::request_controller_units& permit, db::timeout_clock::time_point timeout) {
if (must_sync()) {
return write_result::must_sync;
}
const auto size = writer.size(*this);
const auto s = size + writer.num_entries * entry_overhead_size + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u); // total size
_segment_manager->sanity_check_size(s);
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
return write_result::no_space;
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer.sync) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
// This will most likely cause parallel writes, and consecutive flushes.
return write_result::must_sync;
}
background_cycle();
}
size_t buf_memory = s;
if (_buffer.empty()) {
new_buffer(s);
buf_memory += buffer_position();
}
if (_closed) {
throw std::runtime_error("commitlog: Cannot add data to a closed segment");
}
buf_memory -= permit.release();
_segment_manager->account_memory_usage(buf_memory);
auto& out = _buffer_ostream;
std::optional<crc32_nbo> mecrc;
// if this is multi-entry write, we need to add an extra header + crc
// the header and crc formula is:
// header:
// magic : uint32_t
// size : uint32_t
// crc1 : uint32_t - crc of magic, size
// -> entries[]
// post:
// crc2 : uint32_t - crc1 + each entry crc.
if (writer.num_entries > 1) {
mecrc.emplace();
write<uint32_t>(out, multi_entry_size_magic);
write<uint32_t>(out, s);
mecrc->process(multi_entry_size_magic);
mecrc->process(uint32_t(s));
write<uint32_t>(out, mecrc->checksum());
}
for (size_t entry = 0; entry < writer.num_entries; ++entry) {
replay_position rp(_desc.id, position());
auto id = writer.id(entry);
auto entry_size = writer.num_entries == 1 ? size : writer.size(*this, entry);
auto es = entry_size + entry_overhead_size;
_cf_dirty[id]++; // increase use count for cf.
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
crc32_nbo crc;
write<uint32_t>(out, es);
crc.process(uint32_t(es));
write<uint32_t>(out, crc.checksum());
// actual data
auto entry_out = out.write_substream(entry_size);
auto entry_data = entry_out.to_input_stream();
writer.write(*this, entry_out, entry);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
auto checksum = crc.checksum();
write<uint32_t>(out, checksum);
if (mecrc) {
mecrc->process(checksum);
}
writer.result(entry, std::move(h));
}
if (mecrc) {
// write the crc of header + all sub-entry crc
write<uint32_t>(out, mecrc->checksum());
}
++_segment_manager->totals.allocation_count;
++_num_allocs;
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer.sync) {
return write_result::ok_need_batch_sync;
} else {
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
// have to do it ourselves.
if ((buffer_position() >= (db::commitlog::segment::default_size))) {
background_cycle();
}
}
return write_result::ok;
}
position_type position() const {
return position_type(_file_pos + buffer_position());
}
size_t file_position() const {
return _file_pos;
}
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
// a.k.a. zero the tail.
size_t clear_buffer_slack() {
auto buf_pos = buffer_position();
auto size = align_up(buf_pos, _alignment);
auto fill_size = size - buf_pos;
_buffer_ostream.fill('\0', fill_size);
_segment_manager->totals.bytes_slack += fill_size;
_segment_manager->account_memory_usage(fill_size);
return size;
}
void mark_clean(const cf_id_type& id, uint64_t count) noexcept {
auto i = _cf_dirty.find(id);
if (i != _cf_dirty.end()) {
assert(i->second >= count);
i->second -= count;
if (i->second == 0) {
_cf_dirty.erase(i);
}
}
}
void mark_clean(const cf_id_type& id) noexcept {
_cf_dirty.erase(id);
}
void mark_clean() noexcept {
_cf_dirty.clear();
}
bool is_still_allocating() const noexcept {
return !_closed && position() < _segment_manager->max_size;
}
bool is_clean() const noexcept {
return _cf_dirty.empty();
}
bool is_unused() const noexcept {
return !is_still_allocating() && is_clean();
}
bool is_flushed() const noexcept {
return position() <= _flush_pos;
}
bool can_delete() const noexcept {
return is_unused() && is_flushed();
}
bool contains(const replay_position& pos) const noexcept {
return pos.id == _desc.id;
}
sstring get_segment_name() const {
return _desc.filename();
}
};
template<typename T, typename R>
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
future<R> db::commitlog::segment_manager::allocate_when_possible(T writer, db::timeout_clock::time_point timeout) {
auto size = writer.size();
// If this is already too big now, we should throw early. It's also a correctness issue, since
// if we are too big at this moment we'll never reach allocate() to actually throw at that
// point.
sanity_check_size(size);
auto fut = get_units(_request_controller, size, timeout);
if (_request_controller.waiters()) {
totals.requests_blocked_memory++;
}
scope_increment_counter allocating(totals.active_allocations);
auto permit = co_await std::move(fut);
sseg_ptr s;
if (!_segments.empty() && _segments.back()->is_still_allocating()) {
s = _segments.back();
} else {
s = co_await active_segment(timeout);
}
for (;;) {
using write_result = segment::write_result;
switch (s->allocate(writer, permit, timeout)) {
case write_result::ok:
co_return writer.result();
case write_result::must_sync:
s = co_await with_timeout(timeout, s->sync());
continue;
case write_result::no_space:
s = co_await s->finish_and_get_new(timeout);
continue;
case write_result::ok_need_batch_sync:
s = co_await s->batch_cycle(timeout);
co_return writer.result();
}
}
}
const size_t db::commitlog::segment::default_size;
db::commitlog::segment_manager::segment_manager(config c)
: cfg([&c] {
config cfg(c);
if (cfg.commit_log_location.empty()) {
cfg.commit_log_location = "/var/lib/scylla/commitlog";
}
if (cfg.max_active_writes == 0) {
cfg.max_active_writes = // TODO: call someone to get an idea...
25 * smp::count;
}
cfg.max_active_writes = std::max(uint64_t(1), cfg.max_active_writes / smp::count);
if (cfg.max_active_flushes == 0) {
cfg.max_active_flushes = // TODO: call someone to get an idea...
5 * smp::count;
}
cfg.max_active_flushes = std::max(uint64_t(1), cfg.max_active_flushes / smp::count);
if (!cfg.base_segment_id) {
cfg.base_segment_id = std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count() + 1;
}
return cfg;
}())
, max_size(std::min<size_t>(std::numeric_limits<position_type>::max() / (1024 * 1024), std::max<size_t>(cfg.commitlog_segment_size_in_mb, 1)) * 1024 * 1024)
, max_mutation_size(max_size >> 1)
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
// our threshold for trying to force a flush. needs heristics, for now max - segment_size/2.
, disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value()
? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024
: (max_disk_size -
(max_disk_size >= (max_size*2) ? max_size
: (max_disk_size > (max_size/2) ? (max_size/2) : max_disk_size/3))))
, _flush_semaphore(cfg.max_active_flushes)
// That is enough concurrency to allow for our largest mutation (max_mutation_size), plus
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
// than default_size at the end of the allocation, that allows for every valid mutation to
// always be admitted for processing.
, _request_controller(max_request_controller_units(), request_controller_timeout_exception_factory{})
, _reserve_segments(1)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
clogger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)",
cfg.commit_log_location, max_disk_size / (1024 * 1024),
smp::count);
if (!cfg.metrics_category_name.empty()) {
create_counters(cfg.metrics_category_name);
}
}
size_t db::commitlog::segment_manager::max_request_controller_units() const {
return max_mutation_size + db::commitlog::segment::default_size;
}
future<> db::commitlog::segment_manager::replenish_reserve() {
while (!_shutdown) {
co_await _reserve_segments.not_full();
if (_shutdown) {
break;
}
try {
gate::holder g(_gate);
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
// segment (because colocation stuff etc), so always allow a new
// file if needed. That and performance stuff...
auto s = co_await allocate_segment();
auto ret = _reserve_segments.push(std::move(s));
if (!ret) {
clogger.error("Segment reserve is full! Ignoring and trying to continue, but shouldn't happen");
}
continue;
} catch (shutdown_marker&) {
break;
} catch (...) {
clogger.warn("Exception in segment reservation: {}", std::current_exception());
}
co_await sleep(100ms);
}
}
future<std::vector<db::commitlog::descriptor>>
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
auto dir = co_await open_checked_directory(commit_error_handler, dirname);
std::vector<db::commitlog::descriptor> result;
auto is_cassandra_segment = [](sstring name) {
// We want to ignore commitlog segments generated by Cassandra-derived tools (#1112)
auto c = sstring("Cassandra");
if (name.size() < c.size()) {
return false;
}
return name.substr(0, c.size()) == c;
};
auto h = dir.list_directory([&](directory_entry de) -> future<> {
auto type = de.type;
if (!type && !de.name.empty()) {
type = co_await file_type(dirname + "/" + de.name);
}
if (type == directory_entry_type::regular && de.name[0] != '.' && !is_cassandra_segment(de.name)) {
try {
result.emplace_back(de.name, cfg.fname_prefix);
} catch (std::domain_error& e) {
clogger.warn(e.what());
}
}
});
co_await h.done();
co_return result;
}
future<> db::commitlog::segment_manager::init() {
auto descs = co_await list_descriptors(cfg.commit_log_location);
assert(_reserve_segments.empty()); // _segments_to_replay must not pick them up
segment_id_type id = *cfg.base_segment_id;
for (auto& d : descs) {
id = std::max(id, replay_position(d.id).base_id());
_segments_to_replay.push_back(cfg.commit_log_location + "/" + d.filename());
}
// base id counter is [ <shard> | <base> ]
_ids = replay_position(this_shard_id(), id).id;
// always run the timer now, since we need to handle segment pre-alloc etc as well.
_timer.set_callback(std::bind(&segment_manager::on_timer, this));
auto delay = this_shard_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
clogger.trace("Delaying timer loop {} ms", delay);
// We need to wait until we have scanned all other segments to actually start serving new
// segments. We are ready now
_reserve_replenisher = replenish_reserve();
arm(delay);
}
void db::commitlog::segment_manager::create_counters(const sstring& metrics_category_name) {
namespace sm = seastar::metrics;
_metrics.add_group(metrics_category_name, {
sm::make_gauge("segments", [this] { return _segments.size(); },
sm::description("Holds the current number of segments.")),
sm::make_gauge("allocating_segments", [this] { return std::count_if(_segments.begin(), _segments.end(), [] (const sseg_ptr & s) { return s->is_still_allocating(); }); },
sm::description("Holds the number of not closed segments that still have some free space. "
"This value should not get too high.")),
sm::make_gauge("unused_segments", [this] { return std::count_if(_segments.begin(), _segments.end(), [] (const sseg_ptr & s) { return s->is_unused(); }); },
sm::description("Holds the current number of unused segments. "
"A non-zero value indicates that the disk write path became temporary slow.")),
sm::make_counter("alloc", totals.allocation_count,
sm::description("Counts number of times a new mutation has been added to a segment. "
"Divide bytes_written by this value to get the average number of bytes per mutation written to the disk.")),
sm::make_counter("cycle", totals.cycle_count,
sm::description("Counts number of commitlog write cycles - when the data is written from the internal memory buffer to the disk.")),
sm::make_counter("flush", totals.flush_count,
sm::description("Counts number of times the flush() method was called for a file.")),
sm::make_counter("bytes_written", totals.bytes_written,
sm::description("Counts number of bytes written to the disk. "
"Divide this value by \"alloc\" to get the average number of bytes per mutation written to the disk.")),
sm::make_counter("bytes_released", totals.bytes_released,
sm::description("Counts number of bytes released from disk. (Deleted/recycled)")),
sm::make_counter("bytes_flush_requested", totals.bytes_flush_requested,
sm::description("Counts number of bytes requested to be flushed (persisted).")),
sm::make_counter("slack", totals.bytes_slack,
sm::description("Counts number of unused bytes written to the disk due to disk segment alignment.")),
sm::make_gauge("pending_flushes", totals.pending_flushes,
sm::description("Holds number of currently pending flushes. See the related flush_limit_exceeded metric.")),
sm::make_gauge("pending_allocations", [this] { return pending_allocations(); },
sm::description("Holds number of currently pending allocations. "
"A non-zero value indicates that we have a bottleneck in the disk write flow.")),
sm::make_counter("requests_blocked_memory", totals.requests_blocked_memory,
sm::description("Counts number of requests blocked due to memory pressure. "
"A non-zero value indicates that the commitlog memory quota is not enough to serve the required amount of requests.")),
sm::make_counter("flush_limit_exceeded", totals.flush_limit_exceeded,
sm::description(
seastar::format("Counts number of times a flush limit was exceeded. "
"A non-zero value indicates that there are too many pending flush operations (see pending_flushes) and some of "
"them will be blocked till the total amount of pending flush operations drops below {}.", cfg.max_active_flushes))),
sm::make_gauge("disk_total_bytes", totals.total_size_on_disk,
sm::description("Holds size of disk space in bytes reserved for data so far. "
"A too high value indicates that we have some bottleneck in the writing to sstables path.")),
sm::make_gauge("disk_active_bytes", totals.active_size_on_disk,
sm::description("Holds size of disk space in bytes used for data so far. "
"A too high value indicates that we have some bottleneck in the writing to sstables path.")),
sm::make_gauge("disk_slack_end_bytes", totals.wasted_size_on_disk,
sm::description("Holds size of disk space in bytes unused because of segment switching (end slack). "
"A too high value indicates that we do not write enough data to each segment.")),
sm::make_gauge("memory_buffer_bytes", totals.buffer_list_bytes,
sm::description("Holds the total number of bytes in internal memory buffers.")),
sm::make_gauge("blocked_on_new_segment", totals.blocked_on_new_segment,
sm::description("Number of allocations blocked on acquiring new segment.")),
sm::make_gauge("active_allocations", totals.active_allocations,
sm::description("Current number of active allocations.")),
});
}
void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
if (_segments.empty()) {
return;
}
// defensive copy.
auto callbacks = boost::copy_range<std::vector<flush_handler>>(_flush_handlers | boost::adaptors::map_values);
auto& active = _segments.back();
// RP at "start" of segment we leave untouched.
replay_position high(active->_desc.id, 0);
// But if all segments are closed or we force-flush,
// include all.
if (!active->is_still_allocating()) {
high = replay_position(high.id + 1, 0);
}
// Now get a set of used CF ids:
std::unordered_set<cf_id_type> ids;
uint64_t n = size_to_remove;
uint64_t flushing = 0;
for (auto& s : _segments) {
// if a segment is allocating, it should be included in flush request,
// because we cannot free anything there anyway. If a segment is allocating,
// it is the last one, so just break.
if (s->is_still_allocating()) {
break;
}
auto rp = replay_position(s->_desc.id, db::position_type(s->size_on_disk()));
if (rp <= _flush_position) {
// already requested.
continue;
}
auto size = s->size_on_disk();
auto waste = s->_waste;
flushing += size - waste;
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
ids.insert(id);
}
if (size_to_remove != 0) {
if (n <= size) {
high = rp;
break;
}
n -= s->size_on_disk();
}
}
clogger.debug("Flushing ({} MB) to {}", flushing/(1024*1024), high);
// For each CF id: for each callback c: call c(id, high)
for (auto& f : callbacks) {
for (auto& id : ids) {
try {
f(id, high);
} catch (...) {
clogger.error("Exception during flush request {}/{}: {}", id, high, std::current_exception());
}
}
}
_flush_position = high;
totals.bytes_flush_requested += flushing;
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(descriptor d, named_file f, open_flags flags) {
file_open_options opt;
opt.extent_allocation_size_hint = max_size;
opt.append_is_unlikely = true;
size_t align;
std::exception_ptr ep;
auto filename = f.name();
try {
// when we get here, f.known_size() is either size of recycled segment we open,
// or zero in case we create brand new segment
co_await f.open(flags, opt, f.known_size());
align = f.disk_write_dma_alignment();
auto is_overwrite = false;
if ((flags & open_flags::dsync) != open_flags{}) {
auto existing_size = f.known_size();
is_overwrite = true;
// would be super nice if we just could mmap(/dev/zero) and do sendto
// instead of this, but for now we must do explicit buffer writes.
// if recycled (or from last run), we might have either truncated smaller or written it
// (slightly) larger due to final zeroing of file
if (existing_size > max_size) {
co_await f.truncate(max_size);
} else if (existing_size < max_size) {
totals.total_size_on_disk += (max_size - existing_size);
clogger.trace("Pre-writing {} of {} KB to segment {}", (max_size - existing_size)/1024, max_size/1024, filename);
// re-open without o_dsync for pre-alloc. The reason/rationale
// being that we want automatic (meta)data sync from O_DSYNC for when
// we do actual CL flushes, but here it would just result in
// data being committed before we've reached eof/finished writing.
// Again an argument for sendfile-like constructs I guess...
co_await f.close();
co_await f.open(flags & open_flags(~int(open_flags::dsync)), opt, existing_size);
co_await f.allocate(existing_size, max_size - existing_size);
size_t buf_size = align_up<size_t>(16 * 1024, size_t(align));
size_t zerofill_size = max_size - align_down(existing_size, align);
auto rem = zerofill_size;
auto buf = allocate_single_buffer(buf_size, align);
while (rem != 0) {
static constexpr size_t max_write = 128 * 1024;
auto n = std::min(max_write / buf_size, 1 + rem / buf_size);
std::vector<iovec> v;
v.reserve(n);
size_t m = 0;
while (m < rem && m < max_write) {
auto s = std::min(rem - m, buf_size);
v.emplace_back(iovec{ buf.get_write(), s});
m += s;
}
auto s = co_await f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority());
if (!s) [[unlikely]] {
on_internal_error(clogger, format("dma_write returned 0: max_size={} rem={} iovec.n={}", max_size, rem, n));
}
rem -= s;
}
// sync metadata (size/written)
co_await f.flush();
co_await f.close();
co_await f.open(flags, opt, max_size);
// we will never add blocks (scouts honour). I can haz smaller align?
align = f.disk_overwrite_dma_alignment();
}
} else {
co_await f.truncate(max_size);
}
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(filename, f, flags);
if (nf) {
f.assign(std::move(nf), f.known_size());
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
}
}
}
f.assign(make_checked_file(commit_error_handler, std::move(f)), f.known_size());
} catch (...) {
ep = std::current_exception();
}
if (ep) {
// do this early, so iff we are to fast-fail server,
// we do it before anything else can go wrong.
try {
commit_error_handler(ep);
} catch (...) {
ep = std::current_exception();
}
}
if (ep && f) {
co_await f.close();
}
if (ep) {
add_file_to_dispose(std::move(f), dispose_mode::Delete);
co_return coroutine::exception(std::move(ep));
}
co_return make_shared<segment>(shared_from_this(), std::move(d), std::move(f), align);
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment() {
for (;;) {
descriptor d(next_id(), cfg.fname_prefix);
auto dst = filename(d);
auto flags = open_flags::wo;
if (cfg.use_o_dsync) {
flags |= open_flags::dsync;
}
if (!_recycled_segments.empty()) {
auto f = _recycled_segments.pop();
// Note: we have to do the rename here to ensure
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
// out-of-order files. (Sort does not help).
clogger.debug("Using recycled segment file {} -> {}", f.name(), dst);
co_await f.rename(dst);
co_return co_await allocate_segment_ex(std::move(d), std::move(f), flags);
}
if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
auto f = _recycled_segments.not_empty();
if (!f.available()) {
_new_counter = 0; // zero this so timer task does not duplicate the below flush
flush_segments(0); // force memtable flush already
}
try {
co_await std::move(f);
} catch (shutdown_marker&) {
throw;
} catch (...) {
clogger.warn("Exception while waiting for segments {}. Will retry allocation...", std::current_exception());
}
continue;
}
named_file f(dst);
co_return co_await allocate_segment_ex(std::move(d), std::move(f), flags|open_flags::create);
}
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::new_segment() {
gate::holder g(_gate);
if (_shutdown) {
co_await coroutine::return_exception(std::runtime_error("Commitlog has been shut down. Cannot add data"));
}
++_new_counter;
if (_reserve_segments.empty()) {
// don't increase reserve count if we are at max, or we would go over disk limit.
if (_reserve_segments.max_size() < cfg.max_reserve_segments && (totals.total_size_on_disk + max_size) <= max_disk_size) {
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
}
// if we have no reserve and we're above/at limits, make background task a little more eager.
auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk;
if (!_shutdown && cur >= disk_usage_threshold) {
_timer.cancel();
_timer.arm(std::chrono::milliseconds(0));
}
}
auto s = co_await _reserve_segments.pop_eventually();
_segments.push_back(s);
_segments.back()->reset_sync_time();
co_return s;
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::active_segment(db::timeout_clock::time_point timeout) {
// If there is no active segment, try to allocate one using new_segment(). If we time out,
// make sure later invocations can still pick that segment up once it's ready.
for (;;) {
if (!_segments.empty() && _segments.back()->is_still_allocating()) {
co_return _segments.back();
}
scope_increment_counter blocked_on_new(totals.blocked_on_new_segment);
// #9896 - we don't want to issue a new_segment call until
// the old one has terminated with either result or exception.
// Do all waiting through the shared_future
if (!_segment_allocating) {
auto f = new_segment();
// must check that we are not already done.
if (f.available()) {
f.get(); // maybe force exception
continue;
}
_segment_allocating.emplace(f.discard_result().finally([this] {
// clear the shared_future _before_ resolving its contents
// (i.e. with result of this finally)
_segment_allocating = std::nullopt;
}));
}
co_await _segment_allocating->get_future(timeout);
}
}
/**
* go through all segments, clear id up to pos. if segment becomes clean and unused by this,
* it is discarded.
*/
void db::commitlog::segment_manager::discard_completed_segments(const cf_id_type& id, const rp_set& used) noexcept {
auto& usage = used.usage();
clogger.debug("Discarding {}: {}", id, usage);
for (auto&s : _segments) {
auto i = usage.find(s->_desc.id);
if (i != usage.end()) {
s->mark_clean(id, i->second);
}
}
discard_unused_segments();
}
void db::commitlog::segment_manager::discard_completed_segments(const cf_id_type& id) noexcept {
clogger.debug("Discard all data for {}", id);
for (auto&s : _segments) {
s->mark_clean(id);
}
discard_unused_segments();
}
namespace db {
std::ostream& operator<<(std::ostream& out, const db::commitlog::segment& s) {
return out << s._desc.filename();
}
std::ostream& operator<<(std::ostream& out, const db::commitlog::segment::cf_mark& m) {
return out << (m.s._cf_dirty | boost::adaptors::map_keys);
}
std::ostream& operator<<(std::ostream& out, const db::replay_position& p) {
return out << "{" << p.shard_id() << ", " << p.base_id() << ", " << p.pos << "}";
}
}
void db::commitlog::segment_manager::discard_unused_segments() noexcept {
clogger.trace("Checking for unused segments ({} active)", _segments.size());
std::erase_if(_segments, [=](sseg_ptr s) {
if (s->can_delete()) {
clogger.debug("Segment {} is unused", *s);
return true;
}
if (s->is_still_allocating()) {
clogger.debug("Not safe to delete segment {}; still allocating.", s);
} else if (!s->is_clean()) {
clogger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
} else {
clogger.debug("Not safe to delete segment {}; disk ops pending", s);
}
return false;
});
// launch in background, but guard with gate so this deletion is
// sure to finish in shutdown, because at least through this path,
// segments on deletion queue could be non-empty, and we don't want
// those accidentally left around for replay.
if (!_shutdown) {
(void)with_gate(_gate, [this] {
return do_pending_deletes();
});
}
}
future<> db::commitlog::segment_manager::clear_reserve_segments() {
while (!_reserve_segments.empty()) {
_reserve_segments.pop();
}
for (auto& [f, mode] : _files_to_dispose) {
if (mode == dispose_mode::Delete) {
mode = dispose_mode::ForceDelete;
}
}
_recycled_segments.consume([&](named_file f) {
_files_to_dispose.emplace_back(std::move(f), dispose_mode::ForceDelete);
return true;
});
return do_pending_deletes();
}
future<> db::commitlog::segment_manager::sync_all_segments() {
clogger.debug("Issuing sync for all segments");
// #8952 - calls that do sync/cycle can end up altering
// _segments (end_flush()->discard_unused())
auto def_copy = _segments;
co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> {
co_await s->sync();
clogger.debug("Synced segment {}", *s);
});
}
future<> db::commitlog::segment_manager::shutdown_all_segments() {
clogger.debug("Issuing shutdown for all segments");
// #8952 - calls that do sync/cycle can end up altering
// _segments (end_flush()->discard_unused())
auto def_copy = _segments;
co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> {
co_await s->shutdown();
clogger.debug("Shutdown segment {}", *s);
});
}
future<> db::commitlog::segment_manager::shutdown() {
if (!_shutdown_promise) {
_shutdown_promise = shared_promise<>();
// Wait for all pending requests to finish. Need to sync first because segments that are
// alive may be holding semaphore permits.
auto block_new_requests = get_units(_request_controller, max_request_controller_units());
try {
co_await sync_all_segments();
} catch (...) {
clogger.error("Syncing all segments failed during shutdown: {}. Aborting.", std::current_exception());
abort();
}
std::exception_ptr p;
try {
co_await std::move(block_new_requests);
_timer.cancel(); // no more timer calls
_shutdown = true; // no re-arm, no create new segments.
// do a discard + delete sweep to force
// gate holder (i.e. replenish) to wake up
discard_unused_segments();
auto f = _gate.close();
co_await do_pending_deletes();
auto ep = std::make_exception_ptr(shutdown_marker{});
if (_recycled_segments.empty()) {
abort_recycled_list(ep);
}
auto f2 = std::exchange(_background_sync, make_ready_future<>());
co_await std::move(f);
co_await std::move(f2);
try {
co_await shutdown_all_segments();
} catch (...) {
clogger.error("Shutting down all segments failed during shutdown: {}. Aborting.", std::current_exception());
abort();
}
} catch (...) {
p = std::current_exception();
}
discard_unused_segments();
try {
co_await clear_reserve_segments();
} catch (...) {
p = std::current_exception();
}
try {
co_await std::move(_reserve_replenisher);
} catch (...) {
p = std::current_exception();
}
// slight functional change from non-coroutine version: we propagate all/any
// exceptions, not just the replenish one.
if (p) {
_shutdown_promise->set_exception(p);
} else {
_shutdown_promise->set_value();
}
}
co_await _shutdown_promise->get_shared_future();
clogger.debug("Commitlog shutdown complete");
}
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {
_files_to_dispose.emplace_back(std::move(f), mode);
}
future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> files) {
for (auto& s : files) {
// Note: this is only for replay files. We can decide to
// recycle these, but they don't count into footprint,
// thus unopened named_files are what we want (known_size == 0)
_files_to_dispose.emplace_back(s, dispose_mode::Delete);
}
return do_pending_deletes();
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
// may not call here with elements in list. that would leak files.
assert(_recycled_segments.empty());
_recycled_segments.abort(ep);
// and ensure next lap(s) still has a queue
_recycled_segments = queue<named_file>(std::numeric_limits<size_t>::max());
}
namespace db {
std::ostream& operator<<(std::ostream& os, const commitlog::segment_manager::named_file& f) {
return os << f.name() << " (" << f.known_size() << ")";
}
std::ostream& operator<<(std::ostream& os, commitlog::segment_manager::dispose_mode mode) {
using dispose_mode = db::commitlog::segment_manager::dispose_mode;
switch (mode) {
case dispose_mode::Delete: os << "Delete"; break;
case dispose_mode::ForceDelete: os << "Force Delete"; break;
case dispose_mode::Keep: os << "Keep"; break;
default: break;
}
return os;
}
std::ostream& operator<<(std::ostream& os, const std::pair<commitlog::segment_manager::named_file, db::commitlog::segment_manager::dispose_mode>& p) {
return os << p.first << " (" << p.second << ")";
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftd = std::exchange(_files_to_dispose, {});
if (ftd.empty()) {
co_return;
}
std::exception_ptr recycle_error;
size_t num_deleted = 0;
auto exts = cfg.extensions;
clogger.debug("Discarding segments {}", ftd);
for (auto& [f, mode] : ftd) {
// `f.remove_file()` resets known_size to 0, so remember the size here,
// in order to subtract it from total_size_on_disk accurately.
size_t size = f.known_size();
try {
if (f) {
co_await f.close();
}
// retain the file (replay...)
if (mode == dispose_mode::Keep) {
continue;
}
if (exts && !exts->commitlog_file_extensions().empty()) {
for (auto& ext : exts->commitlog_file_extensions()) {
co_await ext->before_delete(f.name());
}
}
auto usage = totals.total_size_on_disk;
auto next_usage = usage - size;
if (next_usage <= max_disk_size && mode != dispose_mode::ForceDelete) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
clogger.debug("Recycling segment file {}", f.name());
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
try {
co_await f.rename(dst);
auto b = _recycled_segments.push(std::move(f));
assert(b); // we set this to max_size_t so...
continue;
} catch (...) {
clogger.error("Could not recycle segment {}: {}", f.name(), std::current_exception());
recycle_error = std::current_exception();
// fallthrough
}
}
// last resort.
co_await f.remove_file();
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", f.name(), std::current_exception());
}
// if we get here, we either successfully deleted the file,
// or had such an exception that we consider the file dead
// anyway. In either case we _remove_ the file size from
// footprint, because it is no longer our problem.
totals.total_size_on_disk -= size;
}
// #8376 - if we had an error in recycling (disk rename?), and no elements
// are available, we could have waiters hoping they will get segements.
// abort the queue (wakes up any existing waiters - futures), and let them
// retry. Since we did deletions instead, disk footprint should allow
// for new allocs at least. Or more likely, everything is broken, but
// we will at least make more noise.
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
}
future<> db::commitlog::segment_manager::orphan_all() {
_segments.clear();
return clear_reserve_segments();
}
/*
* Sync all segments, then clear them out. To ensure all ops are done.
* (Assumes you have barriered adding ops!)
* Only use from tests.
*/
future<> db::commitlog::segment_manager::clear() {
clogger.debug("Clearing commitlog");
co_await shutdown();
clogger.debug("Clearing all segments");
for (auto& s : _segments) {
s->mark_clean();
}
co_await orphan_all();
}
/**
* Called by timer in periodic mode.
*/
void db::commitlog::segment_manager::sync() {
auto f = std::exchange(_background_sync, make_ready_future<>());
// #8952 - calls that do sync/cycle can end up altering
// _segments (end_flush()->discard_unused())
auto def_copy = _segments;
_background_sync = parallel_for_each(def_copy, [](sseg_ptr s) {
return s->sync().discard_result();
}).then([f = std::move(f)]() mutable {
return std::move(f);
});
}
void db::commitlog::segment_manager::on_timer() {
// Gate, because we are starting potentially blocking ops
// without waiting for them, so segement_manager could be shut down
// while they are running.
(void)seastar::with_gate(_gate, [this] {
if (cfg.mode != sync_mode::BATCH) {
sync();
}
byte_flow<uint64_t> curr = totals;
auto diff = curr - std::exchange(last_bytes, curr);
auto now = std::chrono::high_resolution_clock::now();
auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(now - last_time).count();
auto rate = diff / seconds;
// not using yet. but should maybe, adjust for time windows etc.
// for now, use simple "timer frequency" based diffs (i.e. rate per 10s)
// to try to predict where disk foot print will be by the next timer call.
bytes_rate = rate;
last_time = now;
clogger.debug("Rate: {} / s ({} s)", rate, seconds);
// IFF a new segment was put in use since last we checked, and we're
// above threshold, request flush.
if (_new_counter > 0) {
auto max = disk_usage_threshold;
auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk;
uint64_t extra = 0;
// TODO: better heuristics? Do a semi-pessimistic approach, guess that half
// of flush request will manage to finish by next lap, so count it as half.
auto returned = diff.bytes_released + diff.bytes_flush_requested/2;
if (diff.bytes_written > returned) {
// we are guessing we are gonna add at least this.
extra = (diff.bytes_written - returned);
}
// do not just measure current footprint, but maybe include expected
// footprint that will be added.
if (max != 0 && (cur + extra) >= max) {
clogger.debug("Used size on disk {} MB ({} MB projected) exceeds local threshold {} MB"
, (cur) / (1024 * 1024)
, (cur+extra) / (1024 * 1024)
, max / (1024 * 1024)
);
_new_counter = 0;
flush_segments(cur + extra - max);
}
}
return do_pending_deletes();
});
arm();
}
std::vector<sstring> db::commitlog::segment_manager::get_active_names() const {
std::vector<sstring> res;
for (auto i: _segments) {
if (!i->is_unused()) {
// Each shared is located in its own directory
res.push_back(cfg.commit_log_location + "/" + i->get_segment_name());
}
}
return res;
}
uint64_t db::commitlog::segment_manager::get_num_dirty_segments() const {
return std::count_if(_segments.begin(), _segments.end(), [](sseg_ptr s) {
return !s->is_still_allocating() && !s->is_clean();
});
}
uint64_t db::commitlog::segment_manager::get_num_active_segments() const {
return std::count_if(_segments.begin(), _segments.end(), [](sseg_ptr s) {
return s->is_still_allocating();
});
}
temporary_buffer<char> db::commitlog::segment_manager::allocate_single_buffer(size_t s, size_t alignment) {
return temporary_buffer<char>::aligned(alignment, s);
}
db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acquire_buffer(size_t s, size_t alignment) {
s = align_up(s, segment::default_size);
auto fragment_count = s / segment::default_size;
std::vector<temporary_buffer<char>> buffers;
buffers.reserve(fragment_count);
while (buffers.size() < fragment_count) {
buffers.emplace_back(allocate_single_buffer(segment::default_size, alignment));
}
clogger.trace("Allocated {} k buffer", s / 1024);
return fragmented_temporary_buffer(std::move(buffers), s);
}
/**
* Add mutation.
*/
future<db::rp_handle> db::commitlog::add(const cf_id_type& id,
size_t size, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, serializer_func func) {
class serializer_func_entry_writer final : public entry_writer {
cf_id_type _id;
serializer_func _func;
size_t _size;
public:
db::rp_handle res;
serializer_func_entry_writer(const cf_id_type& id, size_t sz, serializer_func func, db::commitlog::force_sync sync)
: entry_writer(sync), _id(id), _func(std::move(func)), _size(sz)
{}
const cf_id_type& id(size_t) const override { return _id; }
size_t size(segment&, size_t) override { return _size; }
size_t size(segment&) override { return _size; }
size_t size() const override { return _size; }
void write(segment&, output& out, size_t) const override {
_func(out);
}
void result(size_t, rp_handle h) override {
res = std::move(h);
}
using result_type = db::rp_handle;
result_type result() {
return std::move(res);
}
};
return _segment_manager->allocate_when_possible(serializer_func_entry_writer(id, size, std::move(func), sync), timeout);
}
future<db::rp_handle> db::commitlog::add_entry(const cf_id_type& id, const commitlog_entry_writer& cew, timeout_clock::time_point timeout)
{
assert(id == cew.schema()->id());
class cl_entry_writer final : public entry_writer {
commitlog_entry_writer _writer;
public:
rp_handle res;
cl_entry_writer(const commitlog_entry_writer& wr)
: entry_writer(wr.sync()), _writer(wr)
{}
const cf_id_type& id(size_t) const override {
return _writer.schema()->id();
}
size_t size(segment& seg) override {
_writer.set_with_schema(!seg.is_schema_version_known(_writer.schema()));
return _writer.size();
}
size_t size(segment& seg, size_t) override {
return size(seg);
}
size_t size() const override {
return _writer.mutation_size();
}
void write(segment& seg, output& out, size_t) const override {
if (_writer.with_schema()) {
seg.add_schema_version(_writer.schema());
}
_writer.write(out);
}
void result(size_t, rp_handle h) override {
res = std::move(h);
}
using result_type = db::rp_handle;
result_type result() {
return std::move(res);
}
};
return _segment_manager->allocate_when_possible(cl_entry_writer(cew), timeout);
}
future<std::vector<db::rp_handle>>
db::commitlog::add_entries(std::vector<commitlog_entry_writer> entry_writers, db::timeout_clock::time_point timeout) {
class cl_entries_writer final : public entry_writer {
std::vector<commitlog_entry_writer> _writers;
std::unordered_set<table_schema_version> _known;
public:
std::vector<rp_handle> res;
cl_entries_writer(force_sync sync, std::vector<commitlog_entry_writer> entry_writers)
: entry_writer(sync, entry_writers.size()), _writers(std::move(entry_writers))
{
res.reserve(_writers.size());
}
const cf_id_type& id(size_t i) const override {
return _writers.at(i).schema()->id();
}
size_t size(segment& seg) override {
size_t res = 0;
for (auto i = _writers.begin(), e = _writers.end(); i != e; ++i) {
auto known = seg.is_schema_version_known(i->schema());
if (!known) {
known = _known.contains(i->schema()->version());
}
if (!known) {
_known.emplace(i->schema()->version());
}
i->set_with_schema(!known);
res += i->size();
}
return res;
}
size_t size(segment& seg, size_t i) override {
return _writers.at(i).size(); // we have already set schema known/unknown
}
size_t size() const override {
return std::accumulate(_writers.begin(), _writers.end(), size_t(0), [](size_t acc, const commitlog_entry_writer& w) {
return w.mutation_size() + acc;
});
}
void write(segment& seg, output& out, size_t i) const override {
auto& w = _writers.at(i);
if (w.with_schema()) {
seg.add_schema_version(w.schema());
}
w.write(out);
}
void result(size_t i, rp_handle h) override {
assert(i == res.size());
res.emplace_back(std::move(h));
}
using result_type = std::vector<db::rp_handle>;
result_type result() {
return std::move(res);
}
};
force_sync sync(std::any_of(entry_writers.begin(), entry_writers.end(), [](auto& w) { return bool(w.sync()); }));
return _segment_manager->allocate_when_possible(cl_entries_writer(sync, std::move(entry_writers)), timeout);
}
db::commitlog::commitlog(config cfg)
: _segment_manager(::make_shared<segment_manager>(std::move(cfg))) {
}
db::commitlog::commitlog(commitlog&& v) noexcept
: _segment_manager(std::move(v._segment_manager)) {
}
db::commitlog::~commitlog()
{}
future<db::commitlog> db::commitlog::create_commitlog(config cfg) {
commitlog c(std::move(cfg));
co_await c._segment_manager->init();
co_return c;
}
db::commitlog::flush_handler_anchor::flush_handler_anchor(flush_handler_anchor&& f)
: _cl(f._cl), _id(f._id)
{
f._id = 0;
}
db::commitlog::flush_handler_anchor::flush_handler_anchor(commitlog& cl, flush_handler_id id)
: _cl(cl), _id(id)
{}
db::commitlog::flush_handler_anchor::~flush_handler_anchor() {
unregister();
}
db::commitlog::flush_handler_id db::commitlog::flush_handler_anchor::release() {
flush_handler_id id = 0;
std::swap(_id, id);
return id;
}
void db::commitlog::flush_handler_anchor::unregister() {
auto id = release();
if (id != 0) {
_cl.remove_flush_handler(id);
}
}
db::commitlog::flush_handler_anchor db::commitlog::add_flush_handler(flush_handler h) {
return flush_handler_anchor(*this, _segment_manager->add_flush_handler(std::move(h)));
}
void db::commitlog::remove_flush_handler(flush_handler_id id) {
_segment_manager->remove_flush_handler(id);
}
void db::commitlog::discard_completed_segments(const cf_id_type& id, const rp_set& used) {
_segment_manager->discard_completed_segments(id, used);
}
void db::commitlog::discard_completed_segments(const cf_id_type& id) {
_segment_manager->discard_completed_segments(id);
}
future<> db::commitlog::sync_all_segments() {
return _segment_manager->sync_all_segments();
}
future<> db::commitlog::shutdown() {
return _segment_manager->shutdown();
}
future<> db::commitlog::release() {
return _segment_manager->orphan_all();
}
size_t db::commitlog::max_record_size() const {
return _segment_manager->max_mutation_size - segment::entry_overhead_size;
}
uint64_t db::commitlog::max_active_writes() const {
return _segment_manager->cfg.max_active_writes;
}
uint64_t db::commitlog::max_active_flushes() const {
return _segment_manager->cfg.max_active_flushes;
}
future<> db::commitlog::clear() {
return _segment_manager->clear();
}
const db::commitlog::config& db::commitlog::active_config() const {
return _segment_manager->cfg;
}
// No commit_io_check needed in the log reader since the database will fail
// on error at startup if required
future<>
db::commitlog::read_log_file(sstring filename, sstring pfx, seastar::io_priority_class read_io_prio_class, commit_load_reader_func next, position_type off, const db::extensions* exts) {
struct work {
private:
file_input_stream_options make_file_input_stream_options(seastar::io_priority_class read_io_prio_class) {
file_input_stream_options fo;
fo.buffer_size = db::commitlog::segment::default_size;
fo.read_ahead = 10;
fo.io_priority_class = read_io_prio_class;
return fo;
}
public:
file f;
descriptor d;
commit_load_reader_func func;
input_stream<char> fin;
input_stream<char> r;
uint64_t id = 0;
size_t pos = 0;
size_t next = 0;
size_t start_off = 0;
size_t file_size = 0;
size_t corrupt_size = 0;
bool eof = false;
bool header = true;
bool failed = false;
fragmented_temporary_buffer::reader frag_reader;
work(file f, descriptor din, commit_load_reader_func fn, seastar::io_priority_class read_io_prio_class, position_type o = 0)
: f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options(read_io_prio_class))), start_off(o) {
}
work(work&&) = default;
bool advance(const fragmented_temporary_buffer& buf) {
pos += buf.size_bytes();
if (buf.size_bytes() == 0) {
eof = true;
}
return !eof;
}
bool end_of_file() const {
return eof;
}
bool end_of_chunk() const {
return eof || next == pos;
}
future<> skip(size_t bytes) {
pos += bytes;
if (pos > file_size) {
eof = true;
pos = file_size;
}
return fin.skip(bytes);
}
void stop() {
eof = true;
}
void fail() {
failed = true;
stop();
}
future<> read_header() {
fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::descriptor_header_size);
if (!advance(buf)) {
// zero length file. accept it just to be nice.
co_return;
}
// Will throw if we got eof
auto in = buf.get_istream();
auto magic = read<uint32_t>(in);
auto ver = read<uint32_t>(in);
auto id = read<uint64_t>(in);
auto checksum = read<uint32_t>(in);
if (magic == 0 && ver == 0 && id == 0 && checksum == 0) {
// let's assume this was an empty (pre-allocated)
// file. just skip it.
co_return stop();
}
if (id != d.id) {
// filename and id in file does not match.
// assume not valid/recycled.
stop();
co_return;
}
if (magic != segment::segment_magic) {
throw invalid_segment_format();
}
crc32_nbo crc;
crc.process(ver);
crc.process<int32_t>(id & 0xffffffff);
crc.process<int32_t>(id >> 32);
auto cs = crc.checksum();
if (cs != checksum) {
throw header_checksum_error();
}
this->id = id;
this->next = 0;
}
future<> read_chunk() {
fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::segment_overhead_size); auto start = pos;
if (!advance(buf)) {
co_return;
}
auto in = buf.get_istream();
auto next = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
if (next == 0 && checksum == 0) {
// in a pre-allocating world, this means eof
stop();
co_return;
}
crc32_nbo crc;
crc.process<int32_t>(id & 0xffffffff);
crc.process<int32_t>(id >> 32);
crc.process<uint32_t>(start);
auto cs = crc.checksum();
if (cs != checksum) {
// if a chunk header checksum is broken, we shall just assume that all
// remaining is as well. We cannot trust the "next" pointer, so...
clogger.debug("Checksum error in segment chunk at {}.", start);
corrupt_size += (file_size - pos);
stop();
co_return;
}
this->next = next;
if (start_off >= next) {
co_return co_await skip(next - pos);
}
while (!end_of_chunk()) {
co_await read_entry();
}
}
using produce_func = std::function<future<>(buffer_and_replay_position, uint32_t)>;
future<> produce(buffer_and_replay_position bar) {
try {
co_await func(std::move(bar));
} catch (...) {
fail();
throw;
}
}
future<> read_entry() {
return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1));
}
future<> do_read_entry(produce_func pf) {
static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t);
/**
* #598 - Must check that data left in chunk is enough to even read an entry.
* If not, this is small slack space in the chunk end, and we should just go
* to the next.
*/
assert(pos <= next);
if ((pos + entry_header_size) >= next) {
co_await skip(next - pos);
co_return;
}
auto buf = co_await frag_reader.read_exactly(fin, entry_header_size);
replay_position rp(id, position_type(pos));
if (!advance(buf)) {
co_return;
}
auto in = buf.get_istream();
auto size = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
crc32_nbo crc;
crc.process(size);
// check for multi-entry
if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) {
auto actual_size = checksum;
auto end = pos + actual_size - entry_header_size - sizeof(uint32_t);
assert(end <= next);
// really small read...
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
crc.process(actual_size);
// verify header crc.
if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) {
auto slack = next - pos;
if (size != 0) {
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
co_await skip(slack);
co_return;
}
std::vector<buffer_and_replay_position> tmp;
tmp.reserve(10);
// now read all sub-entries into buffers, and collect crc.
while (pos < end) {
co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> {
tmp.emplace_back(std::move(br));
crc.process(checksum);
co_return;
});
}
// and verify crc.
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
if (checksum != crc.checksum()) {
auto slack = next - pos;
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size);
corrupt_size += actual_size;
co_await skip(slack);
co_return;
}
// all is ok. send data to subscriber.
for (auto&& br : tmp) {
co_await produce(std::move(br));
if (failed) {
break;
}
}
co_return;
}
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
auto slack = next - pos;
if (size != 0) {
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
// size == 0 -> special scylla case: zero padding due to dma blocks
co_await skip(slack);
co_return;
}
buf = co_await frag_reader.read_exactly(fin, size - entry_header_size);
advance(buf);
in = buf.get_istream();
auto data_size = size - segment::entry_overhead_size;
in.skip(data_size);
checksum = read<uint32_t>(in);
buf.remove_suffix(buf.size_bytes() - data_size);
crc.process_fragmented(fragmented_temporary_buffer::view(buf));
if (crc.checksum() != checksum) {
// If we're getting a checksum error here, most likely the rest of
// the file will be corrupt as well. But it does not hurt to retry.
// Just go to the next entry (since "size" in header seemed ok).
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
corrupt_size += size;
co_return;
}
co_await pf({std::move(buf), rp}, checksum);
}
future<> read_file() {
std::exception_ptr p;
try {
file_size = co_await f.size();
co_await read_header();
while (!end_of_file()) {
co_await read_chunk();
}
if (corrupt_size > 0) {
throw segment_data_corruption_error("Data corruption", corrupt_size);
}
} catch (...) {
p = std::current_exception();
}
co_await fin.close();
if (p) {
std::rethrow_exception(p);
}
}
};
auto bare_filename = std::filesystem::path(filename).filename().string();
if (bare_filename.rfind(pfx, 0) != 0) {
co_return;
}
file f;
try {
f = co_await open_file_dma(filename, open_flags::ro);
if (exts && !exts->commitlog_file_extensions().empty()) {
for (auto* ext : exts->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(filename, f, open_flags::ro);
if (nf) {
f = std::move(nf);
}
}
}
} catch (...) {
commit_error_handler(std::current_exception());
throw;
}
f = make_checked_file(commit_error_handler, std::move(f));
descriptor d(filename, pfx);
work w(std::move(f), d, std::move(next), read_io_prio_class, off);
co_await w.read_file();
}
std::vector<sstring> db::commitlog::get_active_segment_names() const {
return _segment_manager->get_active_names();
}
uint64_t db::commitlog::disk_limit() const {
return _segment_manager->max_disk_size;
}
uint64_t db::commitlog::disk_footprint() const {
return _segment_manager->totals.total_size_on_disk;
}
uint64_t db::commitlog::get_total_size() const {
return _segment_manager->totals.active_size_on_disk
+ _segment_manager->totals.wasted_size_on_disk
+ _segment_manager->totals.buffer_list_bytes
;
}
uint64_t db::commitlog::get_completed_tasks() const {
return _segment_manager->totals.allocation_count;
}
uint64_t db::commitlog::get_flush_count() const {
return _segment_manager->totals.flush_count;
}
uint64_t db::commitlog::get_pending_tasks() const {
return _segment_manager->totals.pending_flushes;
}
uint64_t db::commitlog::get_pending_flushes() const {
return _segment_manager->totals.pending_flushes;
}
uint64_t db::commitlog::get_pending_allocations() const {
return _segment_manager->pending_allocations();
}
uint64_t db::commitlog::get_flush_limit_exceeded_count() const {
return _segment_manager->totals.flush_limit_exceeded;
}
uint64_t db::commitlog::get_num_segments_created() const {
return _segment_manager->totals.segments_created;
}
uint64_t db::commitlog::get_num_segments_destroyed() const {
return _segment_manager->totals.segments_destroyed;
}
uint64_t db::commitlog::get_num_dirty_segments() const {
return _segment_manager->get_num_dirty_segments();
}
uint64_t db::commitlog::get_num_active_segments() const {
return _segment_manager->get_num_active_segments();
}
uint64_t db::commitlog::get_num_blocked_on_new_segment() const {
return _segment_manager->totals.blocked_on_new_segment;
}
uint64_t db::commitlog::get_num_active_allocations() const {
return _segment_manager->totals.active_allocations;
}
future<std::vector<db::commitlog::descriptor>> db::commitlog::list_existing_descriptors() const {
return list_existing_descriptors(active_config().commit_log_location);
}
future<std::vector<db::commitlog::descriptor>> db::commitlog::list_existing_descriptors(const sstring& dir) const {
return _segment_manager->list_descriptors(dir);
}
future<std::vector<sstring>> db::commitlog::list_existing_segments() const {
return list_existing_segments(active_config().commit_log_location);
}
future<std::vector<sstring>> db::commitlog::list_existing_segments(const sstring& dir) const {
return list_existing_descriptors(dir).then([dir](auto descs) {
std::vector<sstring> paths;
std::transform(descs.begin(), descs.end(), std::back_inserter(paths), [&](auto& d) {
return dir + "/" + d.filename();
});
return make_ready_future<std::vector<sstring>>(std::move(paths));
});
}
std::vector<sstring> db::commitlog::get_segments_to_replay() const {
return std::move(_segment_manager->_segments_to_replay);
}
future<> db::commitlog::delete_segments(std::vector<sstring> files) const {
return _segment_manager->delete_segments(std::move(files));
}
db::rp_handle::rp_handle() noexcept
{}
db::rp_handle::rp_handle(shared_ptr<cf_holder> h, cf_id_type cf, replay_position rp) noexcept
: _h(std::move(h)), _cf(cf), _rp(rp)
{}
db::rp_handle::rp_handle(rp_handle&& v) noexcept
: _h(std::move(v._h)), _cf(v._cf), _rp(std::exchange(v._rp, {}))
{}
db::rp_handle& db::rp_handle::operator=(rp_handle&& v) noexcept {
if (this != &v) {
this->~rp_handle();
new (this) rp_handle(std::move(v));
}
return *this;
}
db::rp_handle::~rp_handle() {
if (_rp != replay_position() && _h) {
_h->release_cf_count(_cf);
}
}
db::replay_position db::rp_handle::release() {
return std::exchange(_rp, {});
}