Files
scylladb/replica/logstor/segment_manager.hh
Michael Litvak 31d339e54a logstor: trigger separator flush for buffers that hold old segments
A compaction group has a separator buffer that holds the mixed segments
alive until the separator buffer is flushed. A mixed segment can be
freed only after all separator buffers that hold writes from the segment
are flushed.

Typically a separator buffer is flushed when it becomes full. However
it's possible for example that one compaction groups is filled slower
than others and holds many segments.

To fix this we trigger a separator flush periodically for separator
buffers that hold old segments. We track the active segment sequence
number and for each separator buffer the oldest sequence number it
holds.
2026-03-18 19:24:28 +01:00

128 lines
3.5 KiB
C++

/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <cstdint>
#include <filesystem>
#include <seastar/core/shared_future.hh>
#include <seastar/core/file.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/shared_ptr.hh>
#include "bytes_fwd.hh"
#include "replica/logstor/write_buffer.hh"
#include "types.hh"
#include "utils/updateable_value.hh"
namespace replica {
class database;
namespace logstor {
class compaction_manager;
class segment_set;
class primary_index;
static constexpr size_t default_segment_size = 128 * 1024;
static constexpr size_t default_file_size = 32 * 1024 * 1024;
/// Configuration for the segment manager
struct segment_manager_config {
std::filesystem::path base_dir;
size_t segment_size = default_segment_size;
size_t file_size = default_file_size;
size_t disk_size;
bool compaction_enabled = true;
size_t max_segments_per_compaction = 8;
seastar::scheduling_group compaction_sg;
utils::updateable_value<float> compaction_static_shares;
seastar::scheduling_group separator_sg;
uint32_t separator_delay_limit_ms;
size_t max_separator_memory = 1 * 1024 * 1024;
};
struct table_segment_histogram_bucket {
size_t count;
size_t max_data_size;
table_segment_histogram_bucket& operator+=(table_segment_histogram_bucket& other) {
count += other.count;
max_data_size = std::max(max_data_size, other.max_data_size);
return *this;
}
};
struct table_segment_stats {
size_t compaction_group_count{0};
size_t segment_count{0};
std::vector<table_segment_histogram_bucket> histogram;
table_segment_stats& operator+=(table_segment_stats& other) {
compaction_group_count += other.compaction_group_count;
segment_count += other.segment_count;
histogram.resize(std::max(histogram.size(), other.histogram.size()));
for (size_t i = 0; i < other.histogram.size(); i++) {
histogram[i] += other.histogram[i];
}
return *this;
}
};
class segment_manager_impl;
class log_index;
class segment_manager {
std::unique_ptr<segment_manager_impl> _impl;
private:
segment_manager_impl& get_impl() noexcept;
const segment_manager_impl& get_impl() const noexcept;
public:
static constexpr size_t block_alignment = 4096;
explicit segment_manager(segment_manager_config config);
~segment_manager();
segment_manager(const segment_manager&) = delete;
segment_manager& operator=(const segment_manager&) = delete;
future<> do_recovery(replica::database&);
future<> start();
future<> stop();
future<log_location> write(write_buffer& wb);
future<log_record> read(log_location location);
void free_record(log_location location);
future<> for_each_record(const std::vector<log_segment_id>& segments,
std::function<future<>(log_location, log_record)> callback);
compaction_manager& get_compaction_manager() noexcept;
const compaction_manager& get_compaction_manager() const noexcept;
void set_trigger_compaction_hook(std::function<void()> fn);
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn);
size_t get_segment_size() const noexcept;
future<> discard_segments(segment_set&);
size_t get_memory_usage() const;
future<> await_pending_writes();
friend class segment_manager_impl;
};
}
}