Files
scylladb/db/commitlog/commitlog.hh
Benny Halevy 3feb759943 everywhere: use utils::chunked_vector for list of mutations
Currently, we use std::vector<*mutation> to keep
a list of mutations for processing.
This can lead to large allocation, e.g. when the vector
size is a function of the number of tables.

Use a chunked vector instead to prevent oversized allocations.

`perf-simple-query --smp 1` results obtained for fixed 400MHz frequency
and PGO disabled:

Before (read path):
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...

89055.97 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39417 insns/op,   18003 cycles/op,        0 errors)
103372.72 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39380 insns/op,   17300 cycles/op,        0 errors)
98942.27 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39413 insns/op,   17336 cycles/op,        0 errors)
103752.93 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39407 insns/op,   17252 cycles/op,        0 errors)
102516.77 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39403 insns/op,   17288 cycles/op,        0 errors)
throughput:
	mean=   99528.13 standard-deviation=6155.71
	median= 102516.77 median-absolute-deviation=3844.59
	maximum=103752.93 minimum=89055.97
instructions_per_op:
	mean=   39403.99 standard-deviation=14.25
	median= 39406.75 median-absolute-deviation=9.30
	maximum=39416.63 minimum=39380.39
cpu_cycles_per_op:
	mean=   17435.81 standard-deviation=318.24
	median= 17300.40 median-absolute-deviation=147.59
	maximum=18002.53 minimum=17251.75
```

After (read path)
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
59755.04 tps ( 66.2 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39466 insns/op,   22834 cycles/op,        0 errors)
71854.16 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39417 insns/op,   17883 cycles/op,        0 errors)
82149.45 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39411 insns/op,   17409 cycles/op,        0 errors)
49640.04 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.3 tasks/op,   39474 insns/op,   19975 cycles/op,        0 errors)
54963.22 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.3 tasks/op,   39474 insns/op,   18235 cycles/op,        0 errors)
throughput:
	mean=   63672.38 standard-deviation=13195.12
	median= 59755.04 median-absolute-deviation=8709.16
	maximum=82149.45 minimum=49640.04
instructions_per_op:
	mean=   39448.38 standard-deviation=31.60
	median= 39466.17 median-absolute-deviation=25.75
	maximum=39474.12 minimum=39411.42
cpu_cycles_per_op:
	mean=   19267.01 standard-deviation=2217.03
	median= 18234.80 median-absolute-deviation=1384.25
	maximum=22834.26 minimum=17408.67
```

`perf-simple-query --smp 1 --write` results obtained for fixed 400MHz frequency
and PGO disabled:

Before (write path):
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=write, query_single_key=no, counters=no}
Disabling auto compaction
63736.96 tps ( 59.4 allocs/op,  16.4 logallocs/op,  14.3 tasks/op,   49667 insns/op,   19924 cycles/op,        0 errors)
64109.41 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   49992 insns/op,   20084 cycles/op,        0 errors)
56950.47 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50005 insns/op,   20501 cycles/op,        0 errors)
44858.42 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50014 insns/op,   21947 cycles/op,        0 errors)
28592.87 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50027 insns/op,   27659 cycles/op,        0 errors)
throughput:
	mean=   51649.63 standard-deviation=15059.74
	median= 56950.47 median-absolute-deviation=12087.33
	maximum=64109.41 minimum=28592.87
instructions_per_op:
	mean=   49941.18 standard-deviation=153.76
	median= 50005.24 median-absolute-deviation=73.01
	maximum=50027.07 minimum=49667.05
cpu_cycles_per_op:
	mean=   22023.01 standard-deviation=3249.92
	median= 20500.74 median-absolute-deviation=1938.76
	maximum=27658.75 minimum=19924.32
```

After (write path)
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=write, query_single_key=no, counters=no}
Disabling auto compaction
53395.93 tps ( 59.4 allocs/op,  16.5 logallocs/op,  14.3 tasks/op,   50326 insns/op,   21252 cycles/op,        0 errors)
46527.83 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50704 insns/op,   21555 cycles/op,        0 errors)
55846.30 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50731 insns/op,   21060 cycles/op,        0 errors)
55669.30 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50735 insns/op,   21521 cycles/op,        0 errors)
52130.17 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50757 insns/op,   21334 cycles/op,        0 errors)
throughput:
	mean=   52713.91 standard-deviation=3795.38
	median= 53395.93 median-absolute-deviation=2955.40
	maximum=55846.30 minimum=46527.83
instructions_per_op:
	mean=   50650.57 standard-deviation=182.46
	median= 50731.38 median-absolute-deviation=84.09
	maximum=50756.62 minimum=50325.87
cpu_cycles_per_op:
	mean=   21344.42 standard-deviation=202.86
	median= 21334.00 median-absolute-deviation=176.37
	maximum=21554.61 minimum=21060.24
```

Fixes #24815

Improvement for rare corner cases. No backport required

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#24919
2025-07-13 19:13:11 +03:00

453 lines
16 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <memory>
#include <seastar/core/future.hh>
#include <seastar/core/simple-stream.hh>
#include "replay_position.hh"
#include "commitlog_entry.hh"
#include "db/timeout_clock.hh"
#include "gc_clock.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace seastar { class file; }
#include "seastarx.hh"
namespace db {
class config;
class rp_set;
class rp_handle;
class commitlog_file_extension;
class extensions;
/*
* Commit Log tracks every write operation into the system. The aim of
* the commit log is to be able to successfully recover data that was
* not stored to disk via the Memtable.
*
* This impl is cassandra log format compatible (for what it is worth).
* The behaviour is similar, but not 100% identical as "stock cl".
*
* Files are managed with "normal" file writes (as normal as seastar
* gets) - no mmapping. Data is kept in internal buffers which, when
* full, are written to disk (see below). Files are also flushed
* periodically (or always), ensuring all data is written + writes are
* complete.
*
* In BATCH mode, every write to the log will also send the data to disk
* + issue a flush and wait for both to complete.
*
* In PERIODIC mode, most writes will only add to the internal memory
* buffers. If the mem buffer is saturated, data is sent to disk, but we
* don't wait for the write to complete. However, if periodic (timer)
* flushing has not been done in X ms, we will write + flush to file. In
* which case we wait for it.
*
* The commitlog does not guarantee any ordering between "add" callers
* (due to the above). The actual order in the commitlog is however
* identified by the replay_position returned.
*
* Like the stock cl, the log segments keep track of the highest dirty
* (added) internal position for a given table id (cf_id_type / UUID).
* Code should ensure to use discard_completed_segments with UUID +
* highest rp once a memtable has been flushed. This will allow
* discarding used segments. Failure to do so will keep stuff
* indefinitely.
*/
class commitlog {
public:
class segment_manager;
class segment;
friend class rp_handle;
struct buffer_and_replay_position {
fragmented_temporary_buffer buffer;
replay_position position;
};
private:
::shared_ptr<segment_manager> _segment_manager;
public:
enum class sync_mode {
PERIODIC, BATCH
};
using force_sync = commitlog_entry_writer::force_sync;
struct config {
config() = default;
config(const config&) = default;
static config from_db_config(const db::config&, seastar::scheduling_group sg, size_t shard_available_memory);
seastar::scheduling_group sched_group;
sstring commit_log_location;
sstring metrics_category_name;
uint64_t commitlog_total_space_in_mb = 0;
std::optional<uint64_t> commitlog_flush_threshold_in_mb = {};
std::optional<uint64_t> commitlog_data_max_lifetime_in_seconds = {};
uint64_t commitlog_segment_size_in_mb = 32;
uint64_t commitlog_sync_period_in_ms = 10 * 1000; //TODO: verify default!
// Max number of segments to keep in pre-alloc reserve.
// Not (yet) configurable from scylla.conf.
uint64_t max_reserve_segments = 12;
// Max active flushes. Default value
// zero means try to figure it out ourselves
uint64_t max_active_flushes = 0;
sync_mode mode = sync_mode::PERIODIC;
std::string fname_prefix = descriptor::FILENAME_PREFIX;
bool use_o_dsync = false;
bool warn_about_segments_left_on_disk_after_shutdown = true;
bool allow_going_over_size_limit = false;
bool allow_fragmented_entries = false;
// The base segment ID to use.
// The segment IDs of newly allocated segments will be issued sequentially
// and will start _right after_ this parameter.
// If not set, it will be calculated based on the number of milliseconds
// since boot time.
// If there are segments to replay which have base IDs greater than
// this parameter, new segment IDs will start after the largest one
// of them.
std::optional<segment_id_type> base_segment_id;
const db::extensions * extensions = nullptr;
};
struct descriptor {
private:
sstring _filename;
public:
static const std::string SEPARATOR;
static const std::string FILENAME_PREFIX;
static const std::string FILENAME_EXTENSION;
static inline constexpr uint32_t segment_version_1 = 1u;
static inline constexpr uint32_t segment_version_2 = 2u;
static inline constexpr uint32_t segment_version_3 = 3u;
static inline constexpr uint32_t segment_version_4 = 4u;
static inline constexpr uint32_t current_version = segment_version_4;
descriptor(descriptor&&) noexcept = default;
descriptor(const descriptor&) = default;
descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = current_version, sstring = {});
descriptor(replay_position p, const std::string& fname_prefix = FILENAME_PREFIX);
descriptor(const std::string& filename, const std::string& fname_prefix = FILENAME_PREFIX);
sstring filename() const;
operator replay_position() const;
const segment_id_type id;
const uint32_t ver;
const std::string filename_prefix = FILENAME_PREFIX;
};
commitlog(commitlog&&) noexcept;
~commitlog();
/**
* Commitlog is created via a factory func.
* This of course because it needs to access disk to get up to speed.
* Optionally, could have an "init" func and require calling this.
*/
static future<commitlog> create_commitlog(config);
/**
* Update a running instance with new config options.
* Note: only some options (see code part) are actually
* applied once started.
*/
void update_configuration(const config&);
/**
* Note: To be able to keep impl out of header file,
* actual data writing is done via a std::function.
* If it is proven that this has unacceptable overhead, this can be replace
* by iter an interface or move segments and stuff into the header. But
* I hope not.
*
* A serializing func is provided along with a parameter indicating the size
* of data to be written. (See add).
* Don't write less, absolutely don't write more...
*/
using output = typename seastar::memory_output_stream<detail::sector_split_iterator>;
using serializer_func = std::function<void(output&)>;
/**
* Add a "Mutation" to the commit log.
*
* Resolves with timed_out_error when timeout is reached.
*
* @param mutation_func a function that writes 'size' bytes to the log, representing the mutation.
*/
future<rp_handle> add(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, force_sync sync, serializer_func mutation_func);
/**
* Template version of add.
* Resolves with timed_out_error when timeout is reached.
* @param mu an invocable op that generates the serialized data. (Of size bytes)
*/
template<typename MutationOp>
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, force_sync sync, MutationOp&& mu) {
return add(id, size, timeout, sync, [mu = std::forward<MutationOp>(mu)](output& out) {
mu(out);
});
}
/**
* Template version of add.
* @param mu an invocable op that generates the serialized data. (Of size bytes)
*/
template<typename MutationOp>
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, force_sync sync, MutationOp&& mu) {
return add_mutation(id, size, db::timeout_clock::time_point::max(), sync, std::forward<MutationOp>(mu));
}
/**
* Add an entry to the commit log.
* Resolves with timed_out_error when timeout is reached.
* @param entry_writer a writer responsible for writing the entry
*/
future<rp_handle> add_entry(const cf_id_type& id, const commitlog_entry_writer& entry_writer, db::timeout_clock::time_point timeout);
/**
* Add N entries to the commit log as a single operation (in a single segment).
* Resolves with timed_out_error when timeout is reached.
* @param entry_writers a vector of writers responsible for writing respective entry
*/
future<utils::chunked_vector<rp_handle>> add_entries(utils::chunked_vector<commitlog_entry_writer> entry_writers, db::timeout_clock::time_point timeout);
/**
* Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
* given. Discards any commit log segments that are no longer used.
*
* @param cfId the column family ID that was flushed
* @param rp_set the replay positions of the flush
*/
void discard_completed_segments(const cf_id_type&, const rp_set&);
void discard_completed_segments(const cf_id_type&);
/**
* Forces active segment switch.
* Called from API calls to help tests that need predictable
* compaction behaviour.
*/
future<> force_new_active_segment() noexcept;
/**
* Waits for all segment deletes issued up until now to complete.
* Segment delete is done when a segment no longer is active or dirty,
* thus most often by calls to `discard_completed_segments` above.
*/
future<> wait_for_pending_deletes() noexcept;
/**
* A 'flush_handler' is invoked when the CL determines that size on disk has
* exceeded allowable threshold. It is called once for every currently active
* CF id with the highest replay_position which we would prefer to free "until".
* I.e. a the highest potentially freeable position in the CL.
*
* Whatever the callback does to help (or not) this desire is up to him.
* This is called synchronously, so callee might want to instigate async ops
* in the background.
*
*/
typedef std::function<void(cf_id_type, replay_position)> flush_handler;
typedef uint64_t flush_handler_id;
class flush_handler_anchor {
public:
friend class commitlog;
~flush_handler_anchor();
flush_handler_anchor(flush_handler_anchor&&);
flush_handler_anchor(const flush_handler_anchor&) = delete;
flush_handler_id release(); // disengage anchor - danger danger.
void unregister();
private:
flush_handler_anchor(commitlog&, flush_handler_id);
commitlog & _cl;
flush_handler_id _id;
};
flush_handler_anchor add_flush_handler(flush_handler);
void remove_flush_handler(flush_handler_id);
/**
* Returns a vector of the segment names
*/
std::vector<sstring> get_active_segment_names() const;
/**
* Returns a vector of segment paths which were
* preexisting when this instance of commitlog was created.
*/
future<std::vector<sstring>> get_segments_to_replay() const;
/**
* Delete aforementioned segments, and possible metadata
* associated with them
*/
future<> delete_segments(std::vector<sstring>) const;
uint64_t get_total_size() const;
uint64_t get_buffer_size() const;
uint64_t get_completed_tasks() const;
uint64_t get_flush_count() const;
uint64_t get_pending_tasks() const;
uint64_t get_pending_flushes() const;
uint64_t get_pending_allocations() const;
uint64_t get_flush_limit_exceeded_count() const;
uint64_t get_num_segments_created() const;
uint64_t get_num_segments_destroyed() const;
uint64_t get_num_blocked_on_new_segment() const;
uint64_t get_num_active_allocations() const;
/**
* Get number of inactive (finished), segments lingering
* due to still being dirty
*/
uint64_t get_num_dirty_segments() const;
/**
* Get number of active segments, i.e. still being allocated to
*/
uint64_t get_num_active_segments() const;
/**
* Returns the largest amount of data that can be written in a single "mutation".
*/
size_t max_record_size() const;
/**
* Return max allowed pending flushes (per this shard)
*/
uint64_t max_active_flushes() const;
/**
* Return disk footprint
*/
uint64_t disk_footprint() const;
/**
* Return configured disk footprint limit
*/
uint64_t disk_limit() const;
future<> clear();
const config& active_config() const;
/**
* Issues disk sync on all (allocating) segments. I.e. ensures that
* all data written up until this call is indeed on disk.
* _However_, if you issue new "add" ops while this is executing,
* those can/will be missed.
*/
future<> sync_all_segments();
/**
* Shuts everything down and causes any
* incoming writes to throw exceptions
*/
future<> shutdown();
/**
* Ensure segments are released, even if we don't free the
* commitlog proper. (hint, our shutdown is "partial")
*/
future<> release();
future<std::vector<descriptor>> list_existing_descriptors() const;
future<std::vector<descriptor>> list_existing_descriptors(const sstring& dir) const;
future<std::vector<sstring>> list_existing_segments() const;
future<std::vector<sstring>> list_existing_segments(const sstring& dir) const;
gc_clock::time_point min_gc_time(const cf_id_type&) const;
// Return the lowest possible replay position across all existing or future commitlog segments.
// In other words, only positions greater or equal to min_position() can
// be replayed on the next reboot.
replay_position min_position() const;
// (Re-)set data mix lifetime.
void update_max_data_lifetime(std::optional<uint64_t> commitlog_data_max_lifetime_in_seconds);
using commit_load_reader_func = std::function<future<>(buffer_and_replay_position)>;
class segment_error : public std::exception {};
class segment_data_corruption_error: public segment_error {
std::string _msg;
public:
segment_data_corruption_error(std::string msg, uint64_t s)
: _msg(std::move(msg)), _bytes(s) {
}
uint64_t bytes() const {
return _bytes;
}
const char* what() const noexcept override {
return _msg.c_str();
}
private:
uint64_t _bytes;
};
class invalid_segment_format : public segment_error {
static constexpr const char* _msg = "Not a scylla format commitlog file";
public:
const char* what() const noexcept override {
return _msg;
}
};
class header_checksum_error : public segment_error {
static constexpr const char* _msg = "Checksum error in file header";
public:
const char* what() const noexcept override {
return _msg;
}
};
class segment_truncation : public segment_error {
std::string _msg;
uint64_t _pos;
public:
segment_truncation(uint64_t);
uint64_t position() const;
const char* what() const noexcept override;
};
class replay_state {
public:
replay_state();
~replay_state();
private:
friend class commitlog;
class impl;
std::unique_ptr<impl> _impl;
};
static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr);
static future<> read_log_file(const replay_state&, sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr);
private:
commitlog(config);
struct entry_writer;
};
}