Files
scylladb/compaction/compaction.cc

2286 lines
107 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
/*
*/
#include <vector>
#include <map>
#include <functional>
#include <utility>
#include <assert.h>
#include <algorithm>
#include <boost/range/join.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/closeable.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "compaction/compaction_garbage_collector.hh"
#include "compaction/exceptions.hh"
#include "dht/i_partitioner.hh"
#include "sstables/exceptions.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_writer.hh"
#include "sstables/progress_monitor.hh"
#include "sstables/sstables_manager.hh"
#include "compaction.hh"
#include "schema/schema.hh"
#include "db/system_keyspace.hh"
#include "db_clock.hh"
#include "mutation/mutation_compactor.hh"
#include "leveled_manifest.hh"
#include "dht/partition_filter.hh"
#include "mutation_writer/shard_based_splitting_writer.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "mutation/mutation_source_metadata.hh"
#include "mutation/mutation_fragment_stream_validator.hh"
#include "utils/assert.hh"
#include "utils/error_injection.hh"
#include "utils/chunked_vector.hh"
#include "utils/pretty_printers.hh"
#include "readers/multi_range.hh"
#include "readers/compacting.hh"
#include "tombstone_gc.hh"
#include "replica/database.hh"
#include "mutation/timestamp.hh"
can_gc_fn always_gc = [] (tombstone, is_shadowable) { return true; };
can_gc_fn never_gc = [] (tombstone, is_shadowable) { return false; };
max_purgeable_fn can_always_purge = [] (const dht::decorated_key&, is_shadowable) -> max_purgeable { return max_purgeable(api::max_timestamp); };
max_purgeable_fn can_never_purge = [] (const dht::decorated_key&, is_shadowable) -> max_purgeable { return max_purgeable(api::min_timestamp); };
max_purgeable& max_purgeable::combine(max_purgeable other) {
if (!other) {
return *this;
}
if (!*this) {
*this = std::move(other);
return *this;
}
if (_timestamp > other._timestamp) {
_source = other._source;
_timestamp = other._timestamp;
}
if (_expiry_threshold && other._expiry_threshold) {
_expiry_threshold = std::min(*_expiry_threshold, *other._expiry_threshold);
} else {
_expiry_threshold = std::nullopt;
}
return *this;
}
max_purgeable::can_purge_result max_purgeable::can_purge(tombstone t) const {
if (!*this) {
return { };
}
return {
.can_purge = (t.deletion_time < _expiry_threshold.value_or(gc_clock::time_point::min()) || t.timestamp < _timestamp),
.timestamp_source = _source,
};
}
auto fmt::formatter<max_purgeable::timestamp_source>::format(max_purgeable::timestamp_source s, fmt::format_context& ctx) const -> decltype(ctx.out()) {
switch (s) {
case max_purgeable::timestamp_source::none:
return format_to(ctx.out(), "none");
case max_purgeable::timestamp_source::memtable_possibly_shadowing_data:
return format_to(ctx.out(), "memtable_possibly_shadowing_data");
case max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data:
return format_to(ctx.out(), "other_sstables_possibly_shadowing_data");
}
}
auto fmt::formatter<max_purgeable>::format(max_purgeable mp, fmt::format_context& ctx) const -> decltype(ctx.out()) {
const sstring expiry_str = mp.expiry_threshold() ? fmt::format("{}", mp.expiry_threshold()->time_since_epoch().count()) : "nullopt";
return format_to(ctx.out(), "max_purgeable{{timestamp={}, expiry_treshold={}, source={}}}", mp.timestamp(), expiry_str, mp.source());
}
namespace compaction {
bool is_eligible_for_compaction(const sstables::shared_sstable& sst) noexcept {
return !sst->requires_view_building() && !sst->is_quarantined();
}
logging::logger clogger("compaction");
static const std::unordered_map<compaction_type, sstring> compaction_types = {
{ compaction_type::Compaction, "COMPACTION" },
{ compaction_type::Cleanup, "CLEANUP" },
{ compaction_type::Validation, "VALIDATION" },
{ compaction_type::Scrub, "SCRUB" },
{ compaction_type::Index_build, "INDEX_BUILD" },
{ compaction_type::Reshard, "RESHARD" },
{ compaction_type::Upgrade, "UPGRADE" },
{ compaction_type::Reshape, "RESHAPE" },
{ compaction_type::Split, "SPLIT" },
{ compaction_type::Major, "MAJOR" },
};
sstring compaction_name(compaction_type type) {
auto ret = compaction_types.find(type);
if (ret != compaction_types.end()) {
return ret->second;
}
throw std::runtime_error("Invalid Compaction Type");
}
compaction_type to_compaction_type(sstring type_name) {
for (auto& it : compaction_types) {
if (it.second == type_name) {
return it.first;
}
}
throw std::runtime_error("Invalid Compaction Type Name");
}
std::string_view to_string(compaction_type type) {
switch (type) {
case compaction_type::Compaction: return "Compact";
case compaction_type::Cleanup: return "Cleanup";
case compaction_type::Validation: return "Validate";
case compaction_type::Scrub: return "Scrub";
case compaction_type::Index_build: return "Index_build";
case compaction_type::Reshard: return "Reshard";
case compaction_type::Upgrade: return "Upgrade";
case compaction_type::Reshape: return "Reshape";
case compaction_type::Split: return "Split";
case compaction_type::Major: return "Major";
case compaction_type::RewriteComponent: return "RewriteComponent";
}
on_internal_error_noexcept(clogger, format("Invalid compaction type {}", int(type)));
return "(invalid)";
}
std::string_view to_string(compaction_type_options::scrub::mode scrub_mode) {
switch (scrub_mode) {
case compaction_type_options::scrub::mode::abort:
return "abort";
case compaction_type_options::scrub::mode::skip:
return "skip";
case compaction_type_options::scrub::mode::segregate:
return "segregate";
case compaction_type_options::scrub::mode::validate:
return "validate";
}
on_internal_error_noexcept(clogger, format("Invalid scrub mode {}", int(scrub_mode)));
return "(invalid)";
}
std::string_view to_string(compaction_type_options::scrub::quarantine_mode quarantine_mode) {
switch (quarantine_mode) {
case compaction_type_options::scrub::quarantine_mode::include:
return "include";
case compaction_type_options::scrub::quarantine_mode::exclude:
return "exclude";
case compaction_type_options::scrub::quarantine_mode::only:
return "only";
}
on_internal_error_noexcept(clogger, format("Invalid scrub quarantine mode {}", int(quarantine_mode)));
return "(invalid)";
}
static max_purgeable get_max_purgeable_timestamp(const compaction_group_view& table_s, sstables::sstable_set::incremental_selector& selector,
const std::unordered_set<sstables::shared_sstable>& compacting_set, const dht::decorated_key& dk, uint64_t& bloom_filter_checks,
const api::timestamp_type compacting_max_timestamp, const bool gc_check_only_compacting_sstables, const is_shadowable is_shadowable) {
if (!table_s.tombstone_gc_enabled()) [[unlikely]] {
clogger.trace("get_max_purgeable_timestamp {}.{}: tombstone_gc_enabled=false, returning min_timestamp",
table_s.schema()->ks_name(), table_s.schema()->cf_name());
return max_purgeable(api::min_timestamp);
}
auto timestamp = api::max_timestamp;
if (gc_check_only_compacting_sstables) {
// If gc_check_only_compacting_sstables is enabled, do not
// check memtables and other sstables not being compacted.
clogger.trace("get_max_purgeable_timestamp {}.{}: gc_check_only_compacting_sstables=true, returning max_timestamp",
table_s.schema()->ks_name(), table_s.schema()->cf_name());
return max_purgeable(timestamp);
}
auto source = max_purgeable::timestamp_source::none;
api::timestamp_type memtable_min_timestamp;
if (is_shadowable) {
// For shadowable tombstones, check the minimum live row_marker timestamp
// as rows with timestamp larger than the tombstone's would shadow the tombstone,
// exposing all live cells in the row with timestamps potentially lower than
// the shadowable tombstone (and those are tracked in the min_memtable_live_timestamp).
// In contrast, a shadowable tombstone applies to rows with row_marker whose timestamp
// is less than or equal to the tombstone's timestamp, the same way as a regular tombstone would.
// See https://github.com/scylladb/scylladb/issues/20424
memtable_min_timestamp = table_s.min_memtable_live_row_marker_timestamp();
} else {
// For regular tombstones, check the minimum live data timestamp.
// Even if purgeable tombstones shadow dead data in the memtable, it's ok to purge them;
// since "resurrecting" the already-dead data will no have effect, as they are already dead.
// See https://github.com/scylladb/scylladb/issues/20423
memtable_min_timestamp = table_s.min_memtable_live_timestamp();
}
clogger.trace("get_max_purgeable_timestamp {}.{}: memtable_min_timestamp={} compacting_max_timestamp={} memtable_has_key={} is_shadowable={} min_memtable_live_timestamp={} min_memtable_live_row_marker_timestamp={}",
table_s.schema()->ks_name(), table_s.schema()->cf_name(),
memtable_min_timestamp, compacting_max_timestamp, table_s.memtable_has_key(dk), is_shadowable, table_s.min_memtable_live_timestamp(), table_s.min_memtable_live_row_marker_timestamp());
// Use memtable timestamp if it contains live data older than the sstables being compacted,
// and if the memtable also contains the key we're calculating max purgeable timestamp for.
// First condition helps to not penalize the common scenario where memtable only contains
// newer data.
if (memtable_min_timestamp <= compacting_max_timestamp && table_s.memtable_has_key(dk)) {
timestamp = memtable_min_timestamp;
source = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
}
std::optional<utils::hashed_key> hk;
for (auto&& sst : boost::range::join(selector.select(dk).sstables, table_s.compacted_undeleted_sstables())) {
if (compacting_set.contains(sst)) {
continue;
}
api::timestamp_type min_timestamp = sst->get_stats_metadata().min_timestamp;
auto ts_stats = sst->get_ext_timestamp_stats();
if (!ts_stats.empty()) {
auto stat = is_shadowable ?
sstables::ext_timestamp_stats_type::min_live_row_marker_timestamp :
sstables::ext_timestamp_stats_type::min_live_timestamp;
auto it = ts_stats.find(stat);
if (it != ts_stats.end()) {
min_timestamp = it->second;
} else {
// Do not throw an exception in production, just use the legacy min_timestamp set above
on_internal_error_noexcept(clogger, format("Missing extended timestamp statstics: stat={} is_shadowable={}", int(stat), bool(is_shadowable)));
}
}
if (clogger.is_enabled(log_level::trace)) {
if (!hk) {
hk = sstables::sstable::make_hashed_key(*table_s.schema(), dk.key());
}
clogger.trace("get_max_purgeable_timestamp={}: min_timestamp={} timestamp={} filter_has_key={} is_shadowable={} stats.min_timestamp={} min_live_timestamp={} min_live_row_marker_timestamp={}: sst={}",
min_timestamp >= timestamp || !sst->filter_has_key(*hk) ? timestamp : min_timestamp,
min_timestamp, timestamp, sst->filter_has_key(*hk), is_shadowable,
sst->get_stats_metadata().min_timestamp,
ts_stats[sstables::ext_timestamp_stats_type::min_live_timestamp],
ts_stats[sstables::ext_timestamp_stats_type::min_live_row_marker_timestamp],
sst->get_filename());
}
// There's no point in looking up the key in the sstable filter if
// it does not contain data older than the minimum timestamp.
if (min_timestamp >= timestamp) {
continue;
}
if (!hk) {
hk = sstables::sstable::make_hashed_key(*table_s.schema(), dk.key());
}
if (sst->filter_has_key(*hk)) {
bloom_filter_checks++;
timestamp = min_timestamp;
source = max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data;
}
}
return max_purgeable(timestamp, source);
}
static std::vector<sstables::shared_sstable> get_uncompacting_sstables(const compaction_group_view& table_s, std::vector<sstables::shared_sstable> sstables) {
auto sstable_set = table_s.sstable_set_for_tombstone_gc();
auto all_sstables = *sstable_set->all() | std::ranges::to<std::vector>();
auto& compacted_undeleted = table_s.compacted_undeleted_sstables();
all_sstables.insert(all_sstables.end(), compacted_undeleted.begin(), compacted_undeleted.end());
std::ranges::sort(all_sstables, std::ranges::less(), std::mem_fn(&sstables::sstable::generation));
std::ranges::sort(sstables, std::ranges::less(), std::mem_fn(&sstables::sstable::generation));
std::vector<sstables::shared_sstable> not_compacted_sstables;
std::ranges::set_difference(all_sstables, sstables, std::back_inserter(not_compacted_sstables),
std::ranges::less(), std::mem_fn(&sstables::sstable::generation), std::mem_fn(&sstables::sstable::generation));
return not_compacted_sstables;
}
static std::vector<sstables::basic_info> extract_basic_info_from_sstables(const std::vector<sstables::shared_sstable>& sstables) {
return sstables | std::views::transform([] (auto&& sst) {
return sstables::basic_info{.generation = sst->generation(), .origin = sst->get_origin(), .size = sst->bytes_on_disk()};
}) | std::ranges::to<std::vector<sstables::basic_info>>();
}
class compaction;
class compaction_write_monitor final : public sstables::write_monitor, public backlog_write_progress_manager {
sstables::shared_sstable _sst;
compaction_group_view& _table_s;
const sstables::writer_offset_tracker* _tracker = nullptr;
uint64_t _progress_seen = 0;
api::timestamp_type _maximum_timestamp;
unsigned _sstable_level;
public:
compaction_write_monitor(sstables::shared_sstable sst, compaction_group_view& table_s, api::timestamp_type max_timestamp, unsigned sstable_level)
: _sst(sst)
, _table_s(table_s)
, _maximum_timestamp(max_timestamp)
, _sstable_level(sstable_level)
{}
~compaction_write_monitor() {
if (_sst) {
_table_s.get_backlog_tracker().revert_charges(_sst);
}
}
virtual void on_write_started(const sstables::writer_offset_tracker& tracker) override {
_tracker = &tracker;
_table_s.get_backlog_tracker().register_partially_written_sstable(_sst, *this);
}
virtual void on_data_write_completed() override {
if (_tracker) {
_progress_seen = _tracker->offset;
_tracker = nullptr;
}
}
virtual uint64_t written() const override {
if (_tracker) {
return _tracker->offset;
}
return _progress_seen;
}
api::timestamp_type maximum_timestamp() const override {
return _maximum_timestamp;
}
unsigned level() const override {
return _sstable_level;
}
};
struct compaction_writer {
sstables::shared_sstable sst;
// We use a ptr for pointer stability and so that it can be null
// when using a noop monitor.
sstables::sstable_writer writer;
// The order in here is important. A monitor must be destroyed before the writer it is monitoring since it has a
// periodic timer that checks the writer.
// The writer must be destroyed before the shared_sstable since the it may depend on the sstable
// (as in the mx::writer over compressed_file_data_sink_impl case that depends on sstables::compression).
std::unique_ptr<compaction_write_monitor> monitor;
compaction_writer(std::unique_ptr<compaction_write_monitor> monitor, sstables::sstable_writer writer, sstables::shared_sstable sst)
: sst(std::move(sst)), writer(std::move(writer)), monitor(std::move(monitor)) {}
compaction_writer(sstables::sstable_writer writer, sstables::shared_sstable sst)
: compaction_writer(nullptr, std::move(writer), std::move(sst)) {}
};
class compacted_fragments_writer {
compaction& _c;
std::optional<compaction_writer> _compaction_writer = {};
using creator_func_t = std::function<compaction_writer(const dht::decorated_key&)>;
using stop_func_t = std::function<void(compaction_writer*)>;
creator_func_t _create_compaction_writer;
stop_func_t _stop_compaction_writer;
std::optional<utils::observer<>> _stop_request_observer;
bool _unclosed_partition = false;
struct partition_state {
dht::decorated_key_opt dk;
// Partition tombstone is saved for the purpose of replicating it to every fragment storing a partition pL.
// Then when reading from the SSTable run, we won't unnecessarily have to open >= 2 fragments, the one which
// contains the tombstone and another one(s) that has the partition slice being queried.
::tombstone tombstone;
// Used to determine whether any active tombstones need closing at EOS.
::tombstone current_emitted_tombstone;
// Track last emitted clustering row, which will be used to close active tombstone if splitting partition
position_in_partition last_pos = position_in_partition::before_all_clustered_rows();
bool is_splitting_partition = false;
} _current_partition;
private:
inline void maybe_abort_compaction();
utils::observer<> make_stop_request_observer(utils::observable<>& sro) {
return sro.observe([this] () mutable {
SCYLLA_ASSERT(!_unclosed_partition);
consume_end_of_stream();
});
}
void stop_current_writer();
bool can_split_large_partition() const;
void track_last_position(position_in_partition_view pos);
void split_large_partition();
void do_consume_new_partition(const dht::decorated_key& dk);
stop_iteration do_consume_end_of_partition();
public:
explicit compacted_fragments_writer(compaction& c, creator_func_t cpw, stop_func_t scw)
: _c(c)
, _create_compaction_writer(std::move(cpw))
, _stop_compaction_writer(std::move(scw)) {
}
explicit compacted_fragments_writer(compaction& c, creator_func_t cpw, stop_func_t scw, utils::observable<>& sro)
: _c(c)
, _create_compaction_writer(std::move(cpw))
, _stop_compaction_writer(std::move(scw))
, _stop_request_observer(make_stop_request_observer(sro)) {
}
compacted_fragments_writer(compacted_fragments_writer&& other);
compacted_fragments_writer& operator=(const compacted_fragments_writer&) = delete;
compacted_fragments_writer(const compacted_fragments_writer&) = delete;
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t);
stop_iteration consume(static_row&& sr, tombstone, bool) {
maybe_abort_compaction();
return _compaction_writer->writer.consume(std::move(sr));
}
stop_iteration consume(static_row&& sr) {
return consume(std::move(sr), tombstone{}, bool{});
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool);
stop_iteration consume(clustering_row&& cr) {
return consume(std::move(cr), row_tombstone{}, bool{});
}
stop_iteration consume(range_tombstone_change&& rtc);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
};
using use_backlog_tracker = bool_class<class use_backlog_tracker_tag>;
struct compaction_read_monitor_generator final : public sstables::read_monitor_generator {
class compaction_read_monitor final : public sstables::read_monitor, public backlog_read_progress_manager {
sstables::shared_sstable _sst;
compaction_group_view& _table_s;
const sstables::reader_position_tracker* _tracker = nullptr;
uint64_t _last_position_seen = 0;
use_backlog_tracker _use_backlog_tracker;
public:
virtual void on_read_started(const sstables::reader_position_tracker& tracker) override {
_tracker = &tracker;
if (_use_backlog_tracker) {
_table_s.get_backlog_tracker().register_compacting_sstable(_sst, *this);
}
}
virtual void on_read_completed() override {
if (_tracker) {
_last_position_seen = _tracker->position;
_tracker = nullptr;
}
}
virtual uint64_t compacted() const override {
if (_tracker) {
return _tracker->position;
}
return _last_position_seen;
}
void remove_sstable() {
if (_sst && _use_backlog_tracker) {
_table_s.get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
compaction_read_monitor(sstables::shared_sstable sst, compaction_group_view& table_s, use_backlog_tracker use_backlog_tracker)
: _sst(std::move(sst)), _table_s(table_s), _use_backlog_tracker(use_backlog_tracker) { }
~compaction_read_monitor() {
// We failed to finish handling this SSTable, so we have to update the backlog_tracker
// about it.
if (_sst && _use_backlog_tracker) {
_table_s.get_backlog_tracker().revert_charges(_sst);
}
}
friend class compaction_read_monitor_generator;
};
virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _table_s, _use_backlog_tracker));
return p.first->second;
}
explicit compaction_read_monitor_generator(compaction_group_view& table_s, use_backlog_tracker use_backlog_tracker = use_backlog_tracker::yes)
: _table_s(table_s), _use_backlog_tracker(use_backlog_tracker) {}
uint64_t compacted() const {
return std::ranges::fold_left(_generated_monitors | std::views::values | std::views::transform([](auto& monitor) { return monitor.compacted(); }), uint64_t(0), std::plus());
}
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto& sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst->generation());
if (it != _generated_monitors.end()) {
it->second.remove_sstable();
}
}
}
private:
compaction_group_view& _table_s;
std::unordered_map<sstables::generation_type, compaction_read_monitor> _generated_monitors;
use_backlog_tracker _use_backlog_tracker;
friend class compaction_progress_monitor;
};
void compaction_progress_monitor::set_generator(std::unique_ptr<sstables::read_monitor_generator> generator) {
_generator = std::move(generator);
}
void compaction_progress_monitor::reset_generator() {
if (_generator) {
_progress = dynamic_cast<compaction_read_monitor_generator&>(*_generator).compacted();
}
_generator = nullptr;
}
uint64_t compaction_progress_monitor::get_progress() const {
if (_generator) {
return dynamic_cast<compaction_read_monitor_generator&>(*_generator).compacted();
}
return _progress;
}
class compaction {
protected:
compaction_data& _cdata;
compaction_group_view& _table_s;
const compaction_sstable_creator_fn _sstable_creator;
const schema_ptr _schema;
const reader_permit _permit;
std::vector<sstables::shared_sstable> _sstables;
std::vector<sstables::generation_type> _input_sstable_generations;
std::vector<sstables::basic_info> _input_sstables_basic_info;
// Unused sstables are tracked because if compaction is interrupted we can only delete them.
// Deleting used sstables could potentially result in data loss.
std::unordered_set<sstables::shared_sstable> _new_partial_sstables;
std::vector<sstables::shared_sstable> _new_unused_sstables;
std::vector<sstables::shared_sstable> _all_new_sstables;
lw_shared_ptr<sstables::sstable_set> _compacting;
const compaction_type _type;
const uint64_t _max_sstable_size;
const uint32_t _sstable_level;
uint64_t _start_size = 0;
uint64_t _end_size = 0;
// fully expired files, which are skipped, aren't taken into account.
uint64_t _compacting_data_file_size = 0;
api::timestamp_type _compacting_max_timestamp = api::min_timestamp;
uint64_t _estimated_partitions = 0;
double _estimated_droppable_tombstone_ratio = 0;
uint64_t _bloom_filter_checks = 0;
combined_reader_statistics _reader_statistics;
tombstone_purge_stats _tombstone_purge_stats;
db::replay_position _rp;
encoding_stats_collector _stats_collector;
const bool _can_split_large_partition = false;
bool _contains_multi_fragment_runs = false;
mutation_source_metadata _ms_metadata = {};
const compaction_sstable_replacer_fn _replacer;
const sstables::run_id _run_identifier;
// optional clone of sstable set to be used for expiration purposes, so it will be set if expiration is enabled.
std::optional<sstables::sstable_set> _sstable_set;
// used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys.
std::optional<sstables::sstable_set::incremental_selector> _selector;
std::unordered_set<sstables::shared_sstable> _compacting_for_max_purgeable_func;
// optional owned_ranges vector for cleanup;
const owned_ranges_ptr _owned_ranges = {};
// required for reshard compaction.
const dht::sharder* _sharder = nullptr;
const std::optional<dht::incremental_owned_ranges_checker> _owned_ranges_checker;
// Garbage collected sstables that are sealed but were not added to SSTable set yet.
std::vector<sstables::shared_sstable> _unused_garbage_collected_sstables;
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
std::vector<sstables::shared_sstable> _used_garbage_collected_sstables;
utils::observable<> _stop_request_observable;
tombstone_gc_state _tombstone_gc_state;
int64_t _output_repaired_at = 0;
private:
// Keeps track of monitors for input sstable.
// If _update_backlog_tracker is set to true, monitors are responsible for adjusting backlog as compaction progresses.
compaction_progress_monitor& _progress_monitor;
compaction_data& init_compaction_data(compaction_data& cdata, const compaction_descriptor& descriptor) const {
cdata.compaction_fan_in = descriptor.fan_in();
return cdata;
}
// Called in a seastar thread
utils::chunked_vector<dht::partition_range>
get_ranges_for_invalidation(const std::vector<sstables::shared_sstable>& sstables) {
// If owned ranges is disengaged, it means no cleanup work was done and
// so nothing needs to be invalidated.
if (!_owned_ranges) {
return {};
}
auto owned_ranges = dht::to_partition_ranges_chunked(*_owned_ranges).get();
auto non_owned_ranges = sstables
| std::views::transform([] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return dht::partition_range::make({sst->get_first_decorated_key(), true},
{sst->get_last_decorated_key(), true});
}) | std::ranges::to<utils::chunked_vector<dht::partition_range>>();
return dht::subtract_ranges(*_schema, std::move(non_owned_ranges), std::move(owned_ranges)).get();
}
protected:
compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor, use_backlog_tracker use_backlog_tracker)
: _cdata(init_compaction_data(cdata, descriptor))
, _table_s(table_s)
, _sstable_creator(std::move(descriptor.creator))
, _schema(_table_s.schema())
, _permit(_table_s.make_compaction_reader_permit())
, _sstables(std::move(descriptor.sstables))
, _type(descriptor.options.type())
, _max_sstable_size(descriptor.max_sstable_bytes)
, _sstable_level(descriptor.level)
, _can_split_large_partition(descriptor.can_split_large_partition)
, _replacer(std::move(descriptor.replacer))
, _run_identifier(descriptor.run_identifier)
, _sstable_set(std::move(descriptor.all_sstables_snapshot))
, _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional<sstables::sstable_set::incremental_selector>{})
, _compacting_for_max_purgeable_func(std::unordered_set<sstables::shared_sstable>(_sstables.begin(), _sstables.end()))
, _owned_ranges(std::move(descriptor.owned_ranges))
, _sharder(descriptor.sharder)
, _owned_ranges_checker(_owned_ranges ? std::optional<dht::incremental_owned_ranges_checker>(*_owned_ranges) : std::nullopt)
, _tombstone_gc_state(_table_s.get_tombstone_gc_state())
, _progress_monitor(progress_monitor)
{
if (descriptor.gc_check_only_compacting_sstables) {
_tombstone_gc_state = _tombstone_gc_state.with_commitlog_check_disabled();
}
std::unordered_set<sstables::run_id> ssts_run_ids;
_contains_multi_fragment_runs = std::any_of(_sstables.begin(), _sstables.end(), [&ssts_run_ids] (sstables::shared_sstable& sst) {
return !ssts_run_ids.insert(sst->run_identifier()).second;
});
_progress_monitor.set_generator(std::make_unique<compaction_read_monitor_generator>(_table_s, use_backlog_tracker));
}
sstables::read_monitor_generator& unwrap_monitor_generator() const {
if (_progress_monitor._generator) {
return *_progress_monitor._generator;
}
return sstables::default_read_monitor_generator();
}
virtual uint64_t partitions_per_sstable() const {
// some tests use _max_sstable_size == 0 for force many one partition per sstable
auto max_sstable_size = std::max<uint64_t>(_max_sstable_size, 1);
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_compacting_data_file_size) / max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions, _schema));
}
void setup_new_sstable(sstables::shared_sstable& sst) {
_all_new_sstables.push_back(sst);
_new_partial_sstables.insert(sst);
for (auto ancestor : _input_sstable_generations) {
sst->add_ancestor(ancestor);
}
}
void finish_new_sstable(compaction_writer* writer) {
writer->writer.set_repaired_at(_output_repaired_at);
writer->writer.consume_end_of_stream();
writer->sst->open_data().get();
_end_size += writer->sst->bytes_on_disk();
_new_unused_sstables.push_back(writer->sst);
_new_partial_sstables.erase(writer->sst);
}
sstables::sstable_writer_config make_sstable_writer_config(compaction_type type) {
auto s = compaction_name(type);
std::transform(s.begin(), s.end(), s.begin(), [] (char c) {
return std::tolower(c);
});
sstables::sstable_writer_config cfg = _table_s.configure_writer(std::move(s));
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &sstables::default_write_monitor();
cfg.run_identifier = _run_identifier;
cfg.replay_position = _rp;
cfg.sstable_level = _sstable_level;
return cfg;
}
api::timestamp_type maximum_timestamp() const {
auto m = std::max_element(_sstables.begin(), _sstables.end(), [] (const sstables::shared_sstable& sst1, const sstables::shared_sstable& sst2) {
return sst1->get_stats_metadata().max_timestamp < sst2->get_stats_metadata().max_timestamp;
});
return (*m)->get_stats_metadata().max_timestamp;
}
encoding_stats get_encoding_stats() const {
return _stats_collector.get();
}
compaction_completion_desc
get_compaction_completion_desc(std::vector<sstables::shared_sstable> input_sstables, std::vector<sstables::shared_sstable> output_sstables) {
auto ranges = get_ranges_for_invalidation(input_sstables);
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges)};
}
// Tombstone expiration is enabled based on the presence of sstable set.
// If it's not present, we cannot purge tombstones without the risk of resurrecting data.
bool tombstone_expiration_enabled() const {
return bool(_sstable_set) && _table_s.tombstone_gc_enabled();
}
compaction_writer create_gc_compaction_writer(sstables::run_id gc_run) const {
auto sst = _sstable_creator(this_shard_id());
auto monitor = std::make_unique<compaction_write_monitor>(sst, _table_s, maximum_timestamp(), _sstable_level);
sstables::sstable_writer_config cfg = _table_s.configure_writer("garbage_collection");
cfg.run_identifier = gc_run;
cfg.monitor = monitor.get();
uint64_t estimated_partitions = std::max(1UL, uint64_t(ceil(partitions_per_sstable() * _estimated_droppable_tombstone_ratio)));
auto writer = sst->get_writer(*schema(), estimated_partitions, cfg, get_encoding_stats());
return compaction_writer(std::move(monitor), std::move(writer), std::move(sst));
}
void stop_gc_compaction_writer(compaction_writer* c_writer) {
c_writer->writer.consume_end_of_stream();
auto sst = c_writer->sst;
sst->open_data().get();
_unused_garbage_collected_sstables.push_back(std::move(sst));
}
// Writes a temporary sstable run containing only garbage collected data.
// Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well,
// right before there's an attempt to release exhausted sstables earlier.
// Generated sstables will be temporarily added to table to make sure that a compaction crash will not
// result in data resurrection.
// When compaction finishes, all the temporary sstables generated here will be deleted and removed
// from table's sstable set.
compacted_fragments_writer get_gc_compacted_fragments_writer() {
// because the temporary sstable run can overlap with the non-gc sstables run created by
// get_compacted_fragments_writer(), we have to use a different run_id. the gc_run_id is
// created here as:
// 1. it can be shared across all sstables created by this writer
// 2. it is optional, as gc writer is not always used
auto gc_run = sstables::run_id::create_random_id();
return compacted_fragments_writer(*this,
[this, gc_run] (const dht::decorated_key&) { return create_gc_compaction_writer(gc_run); },
[this] (compaction_writer* cw) { stop_gc_compaction_writer(cw); },
_stop_request_observable);
}
// Retrieves all unused garbage collected sstables that will be subsequently added
// to the SSTable set, and mark them as used.
std::vector<sstables::shared_sstable> consume_unused_garbage_collected_sstables() {
auto unused = std::exchange(_unused_garbage_collected_sstables, {});
_used_garbage_collected_sstables.insert(_used_garbage_collected_sstables.end(), unused.begin(), unused.end());
return unused;
}
const std::vector<sstables::shared_sstable>& used_garbage_collected_sstables() const {
return _used_garbage_collected_sstables;
}
virtual bool enable_garbage_collected_sstable_writer() const noexcept {
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max() && bool(_replacer);
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
compaction(compaction&& other) = delete;
compaction& operator=(compaction&& other) = delete;
virtual ~compaction() {
_progress_monitor.reset_generator();
}
private:
// Default range sstable reader that will only return mutation that belongs to current shard.
virtual mutation_reader make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) = 0;
mutation_reader setup_sstable_reader() {
if (!_owned_ranges_checker) {
return make_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
}
auto source = mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
log_trace("Creating sstable set reader with range {}", range);
return make_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace_state),
fwd,
fwd_mr);
});
auto owned_range_generator = [this] () -> std::optional<dht::partition_range> {
auto r = _owned_ranges_checker->next_owned_range();
if (r == nullptr) {
return std::nullopt;
}
log_trace("Skipping to the next owned range {}", *r);
return dht::to_partition_range(*r);
};
return make_multi_range_reader(_schema, _permit, std::move(source),
std::move(owned_range_generator),
_schema->full_slice(),
tracing::trace_state_ptr());
}
virtual sstables::sstable_set make_sstable_set_for_input() const {
return _table_s.get_compaction_strategy().make_sstable_set(_table_s);
}
tombstone_gc_state get_tombstone_gc_state() const {
return _tombstone_gc_state;
}
future<> setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(make_sstable_set_for_input());
auto fully_expired = _table_s.fully_expired_sstables(_sstables, gc_clock::now());
min_max_tracker<api::timestamp_type> timestamp_tracker;
double sum_of_estimated_droppable_tombstone_ratio = 0;
_input_sstable_generations.reserve(_sstables.size());
_input_sstables_basic_info.reserve(_sstables.size());
int64_t repaired_at = 0;
std::vector<int64_t> repaired_at_for_compacted_sstables;
uint64_t compaction_size = 0;
for (auto& sst : _sstables) {
co_await coroutine::maybe_yield();
auto& sst_stats = sst->get_stats_metadata();
repaired_at_for_compacted_sstables.push_back(sst_stats.repaired_at);
repaired_at = std::max(sst_stats.repaired_at, repaired_at);
timestamp_tracker.update(sst_stats.min_timestamp);
timestamp_tracker.update(sst_stats.max_timestamp);
_input_sstables_basic_info.emplace_back(sst->generation(), sst->get_origin(), sst->bytes_on_disk());
// Compacted sstable keeps track of its ancestors.
_input_sstable_generations.push_back(sst->generation());
_start_size += sst->bytes_on_disk();
_cdata.total_partitions += sst->get_estimated_key_count();
// Do not actually compact a sstable that is fully expired and can be safely
// dropped without resurrecting old data.
if (tombstone_expiration_enabled() && fully_expired.contains(sst)) {
log_debug("Fully expired sstable {} will be dropped on compaction completion", sst->get_filename());
continue;
}
_stats_collector.update(sst->get_encoding_stats_for_compaction());
compaction_size += sst->data_size();
// We also capture the sstable, so we keep it alive while the read isn't done
ssts->insert(sst);
// FIXME: If the sstables have cardinality estimation bitmaps, use that
// for a better estimate for the number of partitions in the merged
// sstable than just adding up the lengths of individual sstables.
_estimated_partitions += sst->get_estimated_key_count();
sum_of_estimated_droppable_tombstone_ratio += sst->estimate_droppable_tombstone_ratio(gc_clock::now(), get_tombstone_gc_state(), _schema);
_compacting_data_file_size += sst->ondisk_data_size();
_compacting_max_timestamp = std::max(_compacting_max_timestamp, sst->get_stats_metadata().max_timestamp);
if (sst->originated_on_this_node().value_or(false) && sst_stats.position.shard_id() == this_shard_id()) {
_rp = std::max(_rp, sst_stats.position);
}
}
_cdata.compaction_size += compaction_size;
log_debug("{} [{}]", report_start_desc(), fmt::join(_sstables | std::views::transform([] (auto sst) { return to_string(sst, true); }), ","));
if (repaired_at) {
_output_repaired_at = repaired_at;
}
log_debug("repaired_at_vec={} output_repaired_at={}", repaired_at_for_compacted_sstables, _output_repaired_at);
if (ssts->size() < _sstables.size()) {
log_debug("{} out of {} input sstables are fully expired sstables that will not be actually compacted",
_sstables.size() - ssts->size(), _sstables.size());
}
// _estimated_droppable_tombstone_ratio could exceed 1.0 in certain cases, so limit it to 1.0.
_estimated_droppable_tombstone_ratio = std::min(1.0, sum_of_estimated_droppable_tombstone_ratio / ssts->size());
_compacting = std::move(ssts);
_ms_metadata.min_timestamp = timestamp_tracker.min();
_ms_metadata.max_timestamp = timestamp_tracker.max();
}
// This consumer will perform mutation compaction on producer side using
// compacting_reader. It's useful for allowing data from different buckets
// to be compacted together.
future<> consume_without_gc_writer(gc_clock::time_point compaction_time) {
auto consumer = make_interposer_consumer([this] (mutation_reader reader) mutable {
return seastar::async([this, reader = std::move(reader)] () mutable {
auto close_reader = deferred_close(reader);
auto cfc = get_compacted_fragments_writer();
reader.consume_in_thread(std::move(cfc));
});
});
const auto& gc_state = get_tombstone_gc_state();
return consumer(make_compacting_reader(setup_sstable_reader(), compaction_time, max_purgeable_func(), gc_state,
streamed_mutation::forwarding::no, &_tombstone_purge_stats));
}
future<> consume() {
auto now = gc_clock::now();
// consume_without_gc_writer(), which uses compacting_reader, is ~3% slower.
// let's only use it when GC writer is disabled and interposer consumer is enabled, as we
// wouldn't like others to pay the penalty for something they don't need.
if (!enable_garbage_collected_sstable_writer() && use_interposer_consumer()) {
return consume_without_gc_writer(now);
}
auto consumer = make_interposer_consumer([this, now] (mutation_reader reader) mutable
{
return seastar::async([this, reader = std::move(reader), now] () mutable {
auto close_reader = deferred_close(reader);
if (enable_garbage_collected_sstable_writer()) {
using compact_mutations = compact_for_compaction<compacted_fragments_writer, compacted_fragments_writer>;
auto cfc = compact_mutations(*schema(), now,
max_purgeable_func(),
get_tombstone_gc_state(),
get_compacted_fragments_writer(),
get_gc_compacted_fragments_writer(),
&_tombstone_purge_stats);
reader.consume_in_thread(std::move(cfc));
return;
}
using compact_mutations = compact_for_compaction<compacted_fragments_writer, noop_compacted_fragments_consumer>;
auto cfc = compact_mutations(*schema(), now,
max_purgeable_func(),
get_tombstone_gc_state(),
get_compacted_fragments_writer(),
noop_compacted_fragments_consumer(),
&_tombstone_purge_stats);
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(setup_sstable_reader());
}
// based on the specified policies, the `compaction` base class designates
// an `end_consumer` which:
//
// 1. consumes the mutations read from the producer (i.e., reader) and
// 2. optionally performs the compaction, then
// 3. segregates the mutation stream into multiple chunks, i.e., sstables.
//
// but the derived compaction classes are allowed to further customize the
// the way how segregation is performed with its own rule by interposing this
// process. typically, it creates yet another consumer with the same interface
// wrapping around the given end consumer. the former consumer acts like a
// decorator of the latter.
//
// if the derived compaction wants to opt in for this behavior, in addition
// to overriding `make_interposer_consumer()`, it would have to override
// `use_interposer_consumer()` so it returns true.
virtual mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) {
return _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
}
virtual bool use_interposer_consumer() const {
return _table_s.get_compaction_strategy().use_interposer_consumer();
}
protected:
virtual compaction_result finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
compaction_result ret {
.shard_id = this_shard_id(),
.type = _type,
.sstables_in = std::move(_input_sstables_basic_info),
.sstables_out = extract_basic_info_from_sstables(_all_new_sstables),
.new_sstables = std::move(_all_new_sstables),
.stats {
.started_at = started_at,
.ended_at = ended_at,
.start_size = _start_size,
.end_size = _end_size,
.bloom_filter_checks = _bloom_filter_checks,
.reader_statistics = std::move(_reader_statistics),
.tombstone_purge_stats = std::move(_tombstone_purge_stats),
},
};
auto ratio = double(_end_size) / double(_start_size);
auto duration = std::chrono::duration<float>(ended_at - started_at);
// Don't report NaN or negative number.
on_end_of_compaction();
// FIXME: there is some missing information in the log message below.
// look at CompactionTask::runMayThrow() in origin for reference.
// - add support to merge summary (message: Partition merge counts were {%s}.).
// - there is no easy way, currently, to know the exact number of total partitions.
// By the time being, using estimated key count.
log_debug("{} {} sstables to [{}]. {} to {} (~{}% of original) in {}ms = {}. ~{} total partitions merged to {}.",
report_finish_desc(), _input_sstable_generations.size(),
fmt::join(ret.new_sstables | std::views::transform([] (auto sst) { return to_string(sst, false); }), ","),
utils::pretty_printed_data_size(_start_size), utils::pretty_printed_data_size(_end_size), int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), utils::pretty_printed_throughput(_start_size, duration),
_cdata.total_partitions, _cdata.total_keys_written);
return ret;
}
private:
void on_interrupt(std::exception_ptr ex) {
log_info("{} of {} sstables interrupted due to: {}, at {}", report_start_desc(), _input_sstable_generations.size(), ex, current_backtrace());
delete_sstables_for_interrupted_compaction();
}
virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
max_purgeable_fn max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
return can_never_purge;
}
return [this] (const dht::decorated_key& dk, is_shadowable is_shadowable) {
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, !_tombstone_gc_state.is_commitlog_check_enabled(), is_shadowable);
};
}
virtual void on_new_partition() {}
virtual void on_end_of_compaction() {};
// create a writer based on decorated key.
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0;
// stop current writer
virtual void stop_sstable_writer(compaction_writer* writer) = 0;
compacted_fragments_writer get_compacted_fragments_writer() {
return compacted_fragments_writer(*this,
[this] (const dht::decorated_key& dk) { return create_compaction_writer(dk); },
[this] (compaction_writer* cw) { stop_sstable_writer(cw); });
}
const schema_ptr& schema() const {
return _schema;
}
void delete_sstables_for_interrupted_compaction() {
// Delete either partially or fully written sstables of a compaction that
// was either stopped abruptly (e.g. out of disk space) or deliberately
// (e.g. nodetool stop COMPACTION).
for (auto& sst : boost::range::join(_new_partial_sstables, _new_unused_sstables)) {
log_debug("Deleting sstable {} of interrupted compaction for {}.{}", sst->get_filename(), _schema->ks_name(), _schema->cf_name());
sst->mark_for_deletion();
}
}
protected:
template <typename... Args>
void log(log_level level, std::string_view fmt, const Args&... args) const {
if (clogger.is_enabled(level)) {
auto msg = fmt::format(fmt::runtime(fmt), args...);
clogger.log(level, "[{} {}.{} {}] {}", _type, _schema->ks_name(), _schema->cf_name(), _cdata.compaction_uuid, msg);
}
}
template <typename... Args>
void log_error(std::string_view fmt, Args&&... args) const {
log(log_level::error, std::move(fmt), std::forward<Args>(args)...);
}
template <typename... Args>
void log_warning(std::string_view fmt, Args&&... args) const {
log(log_level::warn, std::move(fmt), std::forward<Args>(args)...);
}
template <typename... Args>
void log_info(std::string_view fmt, Args&&... args) const {
log(log_level::info, std::move(fmt), std::forward<Args>(args)...);
}
template <typename... Args>
void log_debug(std::string_view fmt, Args&&... args) const {
log(log_level::debug, std::move(fmt), std::forward<Args>(args)...);
}
template <typename... Args>
void log_trace(std::string_view fmt, Args&&... args) const {
log(log_level::trace, std::move(fmt), std::forward<Args>(args)...);
}
public:
static future<compaction_result> run(std::unique_ptr<compaction> c);
friend class compacted_fragments_writer;
};
compacted_fragments_writer::compacted_fragments_writer(compacted_fragments_writer&& other)
: _c(other._c)
, _compaction_writer(std::move(other._compaction_writer))
, _create_compaction_writer(std::move(other._create_compaction_writer))
, _stop_compaction_writer(std::move(other._stop_compaction_writer)) {
if (std::exchange(other._stop_request_observer, std::nullopt)) {
_stop_request_observer = make_stop_request_observer(_c._stop_request_observable);
}
}
void compacted_fragments_writer::maybe_abort_compaction() {
if (_c._cdata.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
throw compaction_stopped_exception(_c._schema->ks_name(), _c._schema->cf_name(), _c._cdata.stop_requested);
}
}
void compacted_fragments_writer::stop_current_writer() {
// stop sstable writer being currently used.
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
}
bool compacted_fragments_writer::can_split_large_partition() const {
return _c._can_split_large_partition;
}
void compacted_fragments_writer::track_last_position(position_in_partition_view pos) {
if (can_split_large_partition()) {
_current_partition.last_pos = pos;
}
}
void compacted_fragments_writer::split_large_partition() {
// Closes the active range tombstone if needed, before emitting partition end.
// after_key(last_pos) is used for both closing and re-opening the active tombstone, which
// will result in current fragment storing an inclusive end bound for last pos, and the
// next fragment storing an exclusive start bound for last pos. This is very important
// for not losing information on the range tombstone.
auto after_last_pos = position_in_partition::after_key(*_c.schema(), _current_partition.last_pos.key());
if (_current_partition.current_emitted_tombstone) {
auto rtc = range_tombstone_change(after_last_pos, tombstone{});
_c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk);
_compaction_writer->writer.consume(std::move(rtc));
}
_c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, utils::pretty_printed_data_size(_c._max_sstable_size));
// Close partition in current writer, and open it again in a new writer.
do_consume_end_of_partition();
stop_current_writer();
do_consume_new_partition(*_current_partition.dk);
// Replicate partition tombstone to every fragment, allowing the SSTable run reader
// to open a single fragment during the read.
if (_current_partition.tombstone) {
consume(_current_partition.tombstone);
}
if (_current_partition.current_emitted_tombstone) {
_compaction_writer->writer.consume(range_tombstone_change(after_last_pos, _current_partition.current_emitted_tombstone));
}
_current_partition.is_splitting_partition = false;
}
void compacted_fragments_writer::do_consume_new_partition(const dht::decorated_key& dk) {
maybe_abort_compaction();
if (!_compaction_writer) {
_compaction_writer = _create_compaction_writer(dk);
}
_c.on_new_partition();
_compaction_writer->writer.consume_new_partition(dk);
_unclosed_partition = true;
}
stop_iteration compacted_fragments_writer::do_consume_end_of_partition() {
_unclosed_partition = false;
return _compaction_writer->writer.consume_end_of_partition();
}
void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& dk) {
_current_partition = {
.dk = dk,
.tombstone = tombstone(),
.current_emitted_tombstone = tombstone(),
.last_pos = position_in_partition::for_partition_start(),
.is_splitting_partition = false
};
do_consume_new_partition(dk);
_c._cdata.total_keys_written++;
}
void compacted_fragments_writer::consume(tombstone t) {
_current_partition.tombstone = t;
_compaction_writer->writer.consume(t);
}
stop_iteration compacted_fragments_writer::consume(clustering_row&& cr, row_tombstone, bool) {
maybe_abort_compaction();
if (_current_partition.is_splitting_partition) [[unlikely]] {
split_large_partition();
}
track_last_position(cr.position());
auto ret = _compaction_writer->writer.consume(std::move(cr));
if (can_split_large_partition() && ret == stop_iteration::yes) [[unlikely]] {
_current_partition.is_splitting_partition = true;
}
return stop_iteration::no;
}
stop_iteration compacted_fragments_writer::consume(range_tombstone_change&& rtc) {
maybe_abort_compaction();
_current_partition.current_emitted_tombstone = rtc.tombstone();
track_last_position(rtc.position());
return _compaction_writer->writer.consume(std::move(rtc));
}
stop_iteration compacted_fragments_writer::consume_end_of_partition() {
auto ret = do_consume_end_of_partition();
if (ret == stop_iteration::yes) {
stop_current_writer();
}
return ret;
}
void compacted_fragments_writer::consume_end_of_stream() {
if (_compaction_writer) {
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
}
}
class regular_compaction : public compaction {
seastar::semaphore _replacer_lock = {1};
public:
regular_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor, use_backlog_tracker use_backlog_tracker = use_backlog_tracker::yes)
: compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker)
{
}
mutation_reader make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
unwrap_monitor_generator(),
sstables::default_sstable_predicate(),
&_reader_statistics,
sstables::integrity_check::yes);
}
std::string_view report_start_desc() const override {
return "Compacting";
}
std::string_view report_finish_desc() const override {
return "Compacted";
}
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
auto monitor = std::make_unique<compaction_write_monitor>(sst, _table_s, maximum_timestamp(), _sstable_level);
sstables::sstable_writer_config cfg = make_sstable_writer_config(_type);
cfg.monitor = monitor.get();
return compaction_writer{std::move(monitor), sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats()), sst};
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
finish_new_sstable(writer);
maybe_replace_exhausted_sstables_by_sst(writer->sst);
}
}
void on_new_partition() override {
update_pending_ranges();
}
virtual void on_end_of_compaction() override {
replace_remaining_exhausted_sstables();
}
private:
void maybe_replace_exhausted_sstables_by_sst(sstables::shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
if (!enable_garbage_collected_sstable_writer()) {
return;
}
auto permit = seastar::get_units(_replacer_lock, 1).get();
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
auto not_exhausted = [s = _schema, &dk = sst->get_last_decorated_key()] (sstables::shared_sstable& sst) {
return sst->get_last_decorated_key().tri_compare(*s, dk) > 0;
};
auto exhausted = std::partition(_sstables.begin(), _sstables.end(), not_exhausted);
if (exhausted != _sstables.end()) {
// The goal is that exhausted sstables will be deleted as soon as possible,
// so we need to release reference to them.
std::for_each(exhausted, _sstables.end(), [this] (sstables::shared_sstable& sst) {
_compacting_for_max_purgeable_func.erase(sst);
// Fully expired sstable is not actually compacted, therefore it's not present in the compacting set.
_compacting->erase(sst);
});
// Make sure SSTable created by garbage collected writer is made available
// before exhausted SSTable is released, so to prevent data resurrection.
_stop_request_observable();
// Added Garbage collected SSTables to list of unused SSTables that will be added
// to SSTable set. GC SSTables should be added before compaction completes because
// a failure could result in data resurrection if data is not made available.
auto unused_gc_sstables = consume_unused_garbage_collected_sstables();
_new_unused_sstables.insert(_new_unused_sstables.end(), unused_gc_sstables.begin(), unused_gc_sstables.end());
auto exhausted_ssts = std::vector<sstables::shared_sstable>(exhausted, _sstables.end());
log_debug("Replacing earlier exhausted sstable(s) [{}] by new sstable(s) [{}]",
fmt::join(exhausted_ssts | std::views::transform([] (auto sst) { return to_string(sst, false); }), ","),
fmt::join(_new_unused_sstables | std::views::transform([] (auto sst) { return to_string(sst, true); }), ","));
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
dynamic_cast<compaction_read_monitor_generator&>(unwrap_monitor_generator()).remove_exhausted_sstables(exhausted_ssts);
}
}
void replace_remaining_exhausted_sstables() {
if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) {
std::vector<sstables::shared_sstable> old_sstables;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables));
// Remove Garbage Collected SSTables from the SSTable set if any was previously added.
auto& used_gc_sstables = used_garbage_collected_sstables();
old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end());
_replacer(get_compaction_completion_desc(std::move(old_sstables), std::move(_new_unused_sstables)));
}
}
void update_pending_ranges() {
auto pending_replacements = std::exchange(_cdata.pending_replacements, {});
if (!_sstable_set || _sstable_set->all()->empty() || pending_replacements.empty()) { // set can be empty for testing scenario.
return;
}
// Releases reference to sstables compacted by this compaction or another, both of which belongs
// to the same column family
for (auto& pending_replacement : pending_replacements) {
for (auto& sst : pending_replacement.removed) {
// Set may not contain sstable to be removed because this compaction may have started
// before the creation of that sstable.
if (!_sstable_set->all()->contains(sst)) {
continue;
}
_sstable_set->erase(sst);
}
for (auto& sst : pending_replacement.added) {
_sstable_set->insert(sst);
}
}
_selector.emplace(_sstable_set->make_incremental_selector());
}
};
class reshape_compaction : public regular_compaction {
private:
bool has_sstable_replacer() const noexcept {
return bool(_replacer);
}
public:
reshape_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)
: regular_compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no) {
}
virtual sstables::sstable_set make_sstable_set_for_input() const override {
return sstables::make_partitioned_sstable_set(_schema, _table_s.token_range());
}
// Unconditionally enable incremental compaction if the strategy specifies a max output size, e.g. LCS.
virtual bool enable_garbage_collected_sstable_writer() const noexcept override {
return _max_sstable_size != std::numeric_limits<uint64_t>::max() && bool(_replacer);
}
mutation_reader make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
unwrap_monitor_generator(),
sstables::default_sstable_predicate(),
nullptr,
sstables::integrity_check::yes);
}
std::string_view report_start_desc() const override {
return "Reshaping";
}
std::string_view report_finish_desc() const override {
return "Reshaped";
}
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
sstables::sstable_writer_config cfg = make_sstable_writer_config(compaction_type::Reshape);
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats()), sst};
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
if (has_sstable_replacer()) {
regular_compaction::stop_sstable_writer(writer);
} else {
finish_new_sstable(writer);
}
}
}
virtual void on_end_of_compaction() override {
if (has_sstable_replacer()) {
regular_compaction::on_end_of_compaction();
}
}
};
class cleanup_compaction final : public regular_compaction {
public:
cleanup_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)
: regular_compaction(table_s, std::move(descriptor), cdata, progress_monitor)
{
}
std::string_view report_start_desc() const override {
return "Cleaning";
}
std::string_view report_finish_desc() const override {
return "Cleaned";
}
};
class split_compaction final : public regular_compaction {
compaction_type_options::split _options;
public:
split_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::split options,
compaction_progress_monitor& progress_monitor)
: regular_compaction(table_s, std::move(descriptor), cdata, progress_monitor)
, _options(std::move(options))
{
}
mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override {
return [this, end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> {
return mutation_writer::segregate_by_token_group(std::move(reader),
_options.classifier,
_table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)));
};
}
bool use_interposer_consumer() const override {
return true;
}
std::string_view report_start_desc() const override {
return "Splitting";
}
std::string_view report_finish_desc() const override {
return "Split";
}
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
// Deduce the estimated keys based on the token range that will end up in this new sstable after the split.
auto token_range_of_new_sst = _table_s.get_token_range_after_split(dk.token());
const auto estimated_keys = _sstables[0]->estimated_keys_for_range(token_range_of_new_sst).get();
auto monitor = std::make_unique<compaction_write_monitor>(sst, _table_s, maximum_timestamp(), _sstable_level);
sstables::sstable_writer_config cfg = make_sstable_writer_config(_type);
cfg.monitor = monitor.get();
return compaction_writer{std::move(monitor), sst->get_writer(*_schema, estimated_keys, cfg, get_encoding_stats()), sst};
}
};
class scrub_compaction final : public regular_compaction {
public:
static void report_validation_error(compaction_type type, const ::schema& schema, sstring what, std::string_view action = "") {
clogger.error("[{} compaction {}.{}] {}{}{}", type, schema.ks_name(), schema.cf_name(), what, action.empty() ? "" : "; ", action);
}
private:
class reader : public mutation_reader::impl {
using skip = bool_class<class skip_tag>;
private:
compaction_type_options::scrub::mode _scrub_mode;
mutation_reader _reader;
mutation_fragment_stream_validator _validator;
bool _skip_to_next_partition = false;
uint64_t& _validation_errors;
bool& _failed_to_fix_sstable;
compaction_type_options::scrub::drop_unfixable_sstables _drop_unfixable_sstables;
private:
void maybe_abort_scrub(std::function<void()> report_error) {
if (_scrub_mode == compaction_type_options::scrub::mode::abort) {
report_error();
throw compaction_aborted_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data");
}
++_validation_errors;
}
skip on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) {
auto report_fn = [this, error] (std::string_view action = "") {
report_validation_error(compaction_type::Scrub, *_schema, error, action);
};
maybe_abort_scrub(report_fn);
report_fn("Rectifying by adding assumed missing partition-end");
auto pe = mutation_fragment_v2(*_schema, _permit, partition_end{});
if (!_validator(pe)) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction failed to rectify unexpected partition-start, validator rejects the injected partition-end");
}
push_mutation_fragment(std::move(pe));
if (!_validator(ps)) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction failed to rectify unexpected partition-start, validator rejects it even after the injected partition-end");
}
return skip::no;
}
skip on_invalid_partition(const dht::decorated_key& new_key, sstring error) {
auto report_fn = [this, error] (std::string_view action = "") {
report_validation_error(compaction_type::Scrub, *_schema, error, action);
};
maybe_abort_scrub(report_fn);
if (_scrub_mode == compaction_type_options::scrub::mode::segregate) {
report_fn("Detected");
_validator.reset(new_key);
// Let the segregating interposer consumer handle this.
return skip::no;
}
report_fn("Skipping");
_skip_to_next_partition = true;
return skip::yes;
}
skip on_invalid_mutation_fragment(const mutation_fragment_v2& mf, sstring error) {
auto report_fn = [this, error] (std::string_view action = "") {
report_validation_error(compaction_type::Scrub, *_schema, error, action);
};
maybe_abort_scrub(report_fn);
const auto& key = _validator.previous_partition_key();
if (_validator.current_tombstone()) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction cannot handle invalid fragments with an active range tombstone change");
}
// If the unexpected fragment is a partition end, we just drop it.
// The only case a partition end is invalid is when it comes after
// another partition end, and we can just drop it in that case.
if (!mf.is_end_of_partition() && _scrub_mode == compaction_type_options::scrub::mode::segregate) {
report_fn("Injecting partition start/end to segregate out-of-order fragment");
push_mutation_fragment(*_schema, _permit, partition_end{});
// We loose the partition tombstone if any, but it will be
// picked up when compaction merges these partitions back.
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_start(key, {})));
_validator.reset(mf);
// Let the segregating interposer consumer handle this.
return skip::no;
}
report_fn("Skipping");
return skip::yes;
}
void on_invalid_end_of_stream(sstring error) {
auto report_fn = [this, error] (std::string_view action = "") {
report_validation_error(compaction_type::Scrub, *_schema, error, action);
};
maybe_abort_scrub(report_fn);
// Handle missing partition_end
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end{}));
report_fn("Rectifying by adding missing partition-end to the end of the stream");
}
void on_malformed_sstable_exception(std::exception_ptr e) {
bool should_abort = _scrub_mode == compaction_type_options::scrub::mode::abort ||
(_scrub_mode == compaction_type_options::scrub::mode::segregate && !_drop_unfixable_sstables);
if (should_abort) {
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
format("scrub compaction failed due to unrecoverable error: {}", e));
}
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
}
end_stream();
}
void end_stream() {
// Closes the active range tombstone if needed, before emitting partition end.
if (auto current_tombstone = _validator.current_tombstone(); current_tombstone) {
const auto& last_pos = _validator.previous_position();
auto after_last_pos = position_in_partition::after_key(*_schema, last_pos.key());
auto rtc = range_tombstone_change(std::move(after_last_pos), tombstone{});
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
}
// Emit partition end if needed.
if (_validator.previous_mutation_fragment_kind() != mutation_fragment_v2::kind::partition_end) {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end{}));
}
// Report the end of stream.
_end_of_stream = true;
}
void fill_buffer_from_underlying() {
utils::get_local_injector().inject("rest_api_keyspace_scrub_abort", [] { throw compaction_aborted_exception("", "", "scrub compaction found invalid data"); });
while (!_reader.is_buffer_empty() && !is_buffer_full()) {
if (_end_of_stream && _failed_to_fix_sstable) {
return;
}
auto mf = _reader.pop_mutation_fragment();
if (mf.is_partition_start()) {
// First check that fragment kind monotonicity stands.
// When skipping to another partition the fragment
// monotonicity of the partition-start doesn't have to be
// and shouldn't be verified. We know the last fragment the
// validator saw is a partition-start, passing it another one
// will confuse it.
if (!_skip_to_next_partition) {
if (auto res = _validator(mf); !res) {
if (on_unexpected_partition_start(mf, res.what()) == skip::yes) {
continue;
}
}
// Continue processing this partition start.
}
_skip_to_next_partition = false;
// Then check that the partition monotonicity stands.
const auto& dk = mf.as_partition_start().key();
if (auto res = _validator(dk); !res) {
if (on_invalid_partition(dk, res.what()) == skip::yes) {
continue;
}
}
} else if (_skip_to_next_partition) {
continue;
} else {
if (auto res = _validator(mf); !res) {
if (on_invalid_mutation_fragment(mf, res.what()) == skip::yes) {
continue;
}
}
}
push_mutation_fragment(std::move(mf));
}
if (_end_of_stream && _failed_to_fix_sstable) {
return;
}
_end_of_stream = _reader.is_end_of_stream() && _reader.is_buffer_empty();
if (_end_of_stream) {
if (auto res = _validator.on_end_of_stream(); !res) {
on_invalid_end_of_stream(res.what());
}
}
}
public:
reader(mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors,
bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables)
: impl(underlying.schema(), underlying.permit())
, _scrub_mode(scrub_mode)
, _reader(std::move(underlying))
, _validator(*_schema)
, _validation_errors(validation_errors)
, _failed_to_fix_sstable(failed_to_fix_sstable)
, _drop_unfixable_sstables(drop_unfixable_sstables)
{ }
virtual future<> fill_buffer() override {
if (_end_of_stream) {
return make_ready_future<>();
}
return repeat([this] {
return _reader.fill_buffer().then([this] {
fill_buffer_from_underlying();
return stop_iteration(is_buffer_full() || _end_of_stream);
});
}).handle_exception([this] (std::exception_ptr e) {
try {
std::rethrow_exception(std::move(e));
} catch (const compaction_job_exception&) {
// Propagate these unchanged.
throw;
} catch (const storage_io_error&) {
// Propagate these unchanged.
throw;
} catch (const sstables::malformed_sstable_exception& e) {
on_malformed_sstable_exception(std::current_exception());
} catch (...) {
// We don't want failed scrubs to be retried.
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
format("scrub compaction failed due to unrecoverable error: {}", std::current_exception()));
}
});
}
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range pr) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
return _reader.close();
}
};
private:
compaction_type_options::scrub _options;
std::string _scrub_start_description;
mutable std::string _scrub_finish_description;
uint64_t _bucket_count = 0;
uint64_t _validation_errors = 0;
bool _failed_to_fix_sstable = false;
public:
scrub_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::scrub options, compaction_progress_monitor& progress_monitor)
: regular_compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no)
, _options(options)
, _scrub_start_description(fmt::format("Scrubbing in {} mode", _options.operation_mode))
, _scrub_finish_description(fmt::format("Finished scrubbing in {} mode", _options.operation_mode)) {
}
std::string_view report_start_desc() const override {
return _scrub_start_description;
}
std::string_view report_finish_desc() const override {
if (_options.operation_mode == compaction_type_options::scrub::mode::segregate) {
_scrub_finish_description = fmt::format("Finished scrubbing in {} mode{}", _options.operation_mode, _bucket_count ? fmt::format(" (segregated input into {} bucket(s))", _bucket_count) : "");
}
return _scrub_finish_description;
}
mutation_reader make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) override {
if (!range.is_full()) {
on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range));
}
auto full_scan_reader = _compacting->make_full_scan_reader(std::move(s), std::move(permit), nullptr, unwrap_monitor_generator(), sstables::integrity_check::yes);
return make_mutation_reader<reader>(std::move(full_scan_reader), _options.operation_mode, _validation_errors, _failed_to_fix_sstable, _options.drop_unfixable);
}
uint64_t partitions_per_sstable() const override {
const auto original_estimate = compaction::partitions_per_sstable();
if (_bucket_count <= 1) {
return original_estimate;
} else {
const auto shift = std::min(uint64_t(63), _bucket_count - 1);
return std::max(uint64_t(1), original_estimate >> shift);
}
}
mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override {
if (!use_interposer_consumer()) {
return end_consumer;
}
return [this, end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> {
auto cfg = mutation_writer::segregate_config{memory::stats().total_memory() / 10};
return mutation_writer::segregate_by_partition(std::move(reader), cfg,
[consumer = std::move(end_consumer), this] (mutation_reader rd) {
++_bucket_count;
return consumer(std::move(rd));
});
};
}
bool use_interposer_consumer() const override {
return _options.operation_mode == compaction_type_options::scrub::mode::segregate;
}
compaction_result finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) override {
auto ret = compaction::finish(started_at, ended_at);
ret.stats.validation_errors = _validation_errors;
return ret;
}
void drop_unfixable_sstables() {
if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) {
std::vector<sstables::shared_sstable> old_sstables;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables));
// Remove Garbage Collected SSTables from the SSTable set if any was previously added.
auto& used_gc_sstables = used_garbage_collected_sstables();
old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end());
_replacer(get_compaction_completion_desc(std::move(old_sstables), {}));
}
// Mark new sstables for deletion as well
for (auto& sst : boost::range::join(_new_partial_sstables, _new_unused_sstables)) {
sst->mark_for_deletion();
}
}
virtual void on_end_of_compaction() override {
if (_options.drop_unfixable && _failed_to_fix_sstable) {
drop_unfixable_sstables();
} else {
regular_compaction::on_end_of_compaction();
}
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (_options.drop_unfixable && _failed_to_fix_sstable && writer) {
finish_new_sstable(writer);
} else {
regular_compaction::stop_sstable_writer(writer);
}
}
friend mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables);
};
mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables) {
return make_mutation_reader<scrub_compaction::reader>(std::move(rd), scrub_mode, validation_errors, failed_to_fix_sstable, drop_unfixable_sstables);
}
class resharding_compaction final : public compaction {
// Partition count estimation for a shard S:
//
// TE, the total estimated partition count for a shard S, is defined as
// TE = Sum(i = 0...N) { Ei / Si }.
//
// where i is an input sstable that belongs to shard S,
// Ei is the estimated partition count for sstable i,
// Si is the total number of shards that own sstable i.
//
struct estimated_values {
uint64_t estimated_size = 0;
uint64_t estimated_partitions = 0;
};
std::vector<estimated_values> _estimation_per_shard;
std::vector<sstables::run_id> _run_identifiers;
private:
// return estimated partitions per sstable for a given shard
uint64_t partitions_per_sstable(shard_id s) const {
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions, _schema));
}
public:
resharding_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)
: compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no)
, _estimation_per_shard(smp::count)
, _run_identifiers(smp::count)
{
for (auto& sst : _sstables) {
const auto& shards = sst->get_shards_for_this_sstable();
auto size = sst->bytes_on_disk();
auto estimated_partitions = sst->get_estimated_key_count();
for (auto& s : shards) {
_estimation_per_shard[s].estimated_size += std::max(uint64_t(1), uint64_t(ceil(double(size) / shards.size())));
_estimation_per_shard[s].estimated_partitions += std::max(uint64_t(1), uint64_t(ceil(double(estimated_partitions) / shards.size())));
}
}
for (auto i : std::views::iota(0u, smp::count)) {
_run_identifiers[i] = sstables::run_id::create_random_id();
}
}
~resharding_compaction() { }
// Use reader that makes sure no non-local mutation will not be filtered out.
mutation_reader make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) override {
return _compacting->make_range_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
nullptr,
sm_fwd,
mr_fwd,
unwrap_monitor_generator(),
sstables::integrity_check::yes);
}
mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override {
return [end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> {
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
};
}
bool use_interposer_consumer() const override {
return true;
}
std::string_view report_start_desc() const override {
return "Resharding";
}
std::string_view report_finish_desc() const override {
return "Resharded";
}
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shards = _sharder->shard_for_writes(dk.token());
if (shards.size() != 1) {
// Resharding is not supposed to run on tablets, so this case does not have to be supported.
on_internal_error(clogger, fmt::format("Got {} shards for token {} in table {}.{}", shards.size(), dk.token(), _schema->ks_name(), _schema->cf_name()));
}
auto shard = shards[0];
auto sst = _sstable_creator(shard);
setup_new_sstable(sst);
auto cfg = make_sstable_writer_config(compaction_type::Reshard);
// sstables generated for a given shard will share the same run identifier.
cfg.run_identifier = _run_identifiers.at(shard);
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), shard), sst};
}
void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
finish_new_sstable(writer);
}
}
};
future<compaction_result> compaction::run(std::unique_ptr<compaction> c) {
return seastar::async([c = std::move(c)] () mutable {
c->setup().get();
auto consumer = c->consume();
auto start_time = db_clock::now();
try {
consumer.get();
} catch (...) {
c->on_interrupt(std::current_exception());
c = nullptr; // make sure writers are stopped while running in thread context. This is because of calls to file.close().get();
throw;
}
return c->finish(std::move(start_time), db_clock::now());
});
}
compaction_type compaction_type_options::type() const {
// Maps options_variant indexes to the corresponding compaction_type member.
static const compaction_type index_to_type[] = {
compaction_type::Compaction,
compaction_type::Cleanup,
compaction_type::Upgrade,
compaction_type::Scrub,
compaction_type::Reshard,
compaction_type::Reshape,
compaction_type::Split,
compaction_type::Major,
compaction_type::RewriteComponent,
};
static_assert(std::variant_size_v<compaction_type_options::options_variant> == std::size(index_to_type));
return index_to_type[_options.index()];
}
static std::unique_ptr<compaction> make_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor) {
struct {
compaction_group_view& table_s;
compaction_descriptor&& descriptor;
compaction_data& cdata;
compaction_progress_monitor& progress_monitor;
std::unique_ptr<compaction> operator()(compaction_type_options::reshape) {
return std::make_unique<reshape_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::reshard) {
return std::make_unique<resharding_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::regular) {
return std::make_unique<regular_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::major) {
return std::make_unique<regular_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::cleanup) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::upgrade) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::scrub scrub_options) {
return std::make_unique<scrub_compaction>(table_s, std::move(descriptor), cdata, scrub_options, progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::split split_options) {
return std::make_unique<split_compaction>(table_s, std::move(descriptor), cdata, std::move(split_options), progress_monitor);
}
std::unique_ptr<compaction> operator()(compaction_type_options::component_rewrite) {
throw std::runtime_error("component_rewrite compaction should be handled separately");
}
} visitor_factory{table_s, std::move(descriptor), cdata, progress_monitor};
return descriptor.options.visit(visitor_factory);
}
static future<compaction_result> scrub_sstables_validate_mode(compaction_descriptor descriptor, compaction_data& cdata, compaction_group_view& table_s, sstables::read_monitor_generator& monitor_generator) {
auto schema = table_s.schema();
auto permit = table_s.make_compaction_reader_permit();
uint64_t validation_errors = 0;
cdata.compaction_size = std::ranges::fold_left(descriptor.sstables | std::views::transform([] (auto& sst) { return sst->data_size(); }), int64_t(0), std::plus{});
for (const auto& sst : descriptor.sstables) {
clogger.info("Scrubbing in validate mode {}", sst->get_filename());
validation_errors += co_await sst->validate(permit, cdata.abort, [&schema] (sstring what) {
scrub_compaction::report_validation_error(compaction_type::Scrub, *schema, what);
}, monitor_generator(sst), true);
// Did validation actually finish because aborted?
if (cdata.is_stop_requested()) {
// Compaction manager will catch this exception and re-schedule the compaction.
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
}
clogger.info("Finished scrubbing in validate mode {} - sstable is {}", sst->get_filename(), validation_errors == 0 ? "valid" : "invalid");
}
using scrub = compaction_type_options::scrub;
if (validation_errors != 0 && descriptor.options.as<scrub>().quarantine_sstables == scrub::quarantine_invalid_sstables::yes) {
for (auto& sst : descriptor.sstables) {
try {
co_await sst->change_state(sstables::sstable_state::quarantine);
} catch (...) {
clogger.error("Moving {} to quarantine failed due to {}, continuing.", sst->get_filename(), std::current_exception());
}
}
}
co_return compaction_result {
.new_sstables = {},
.stats = {
.ended_at = db_clock::now(),
.validation_errors = validation_errors,
},
};
}
future<compaction_result> scrub_sstables_validate_mode(compaction_descriptor descriptor, compaction_data& cdata, compaction_group_view& table_s, compaction_progress_monitor& progress_monitor) {
progress_monitor.set_generator(std::make_unique<compaction_read_monitor_generator>(table_s, use_backlog_tracker::no));
auto d = defer([&] { progress_monitor.reset_generator(); });
auto res = co_await scrub_sstables_validate_mode(descriptor, cdata, table_s, *progress_monitor._generator);
co_return res;
}
future<compaction_result> rewrite_sstables_component(compaction_descriptor descriptor, compaction_group_view& table_s) {
return seastar::async([descriptor = std::move(descriptor), &table_s] () mutable {
compaction_result result {
.stats = {
.started_at = db_clock::now(),
},
};
const auto& options = descriptor.options.as<compaction_type_options::component_rewrite>();
bool update_id = static_cast<bool>(options.update_id);
// When rewriting a component, we cannot use the standard descriptor creator
// because we must preserve the sstable version.
auto creator = [&table_s] (sstables::shared_sstable sst) {
return table_s.make_sstable(sst->state(), sst->get_version());
};
result.new_sstables.reserve(descriptor.sstables.size());
for (auto& sst : descriptor.sstables) {
auto rewritten = sst->link_with_rewritten_component(creator, options.component_to_rewrite, options.modifier, update_id).get();
result.new_sstables.push_back(rewritten);
}
descriptor.replacer({std::move(descriptor.sstables), result.new_sstables});
result.stats.ended_at = db_clock::now();
return result;
});
}
future<compaction_result>
compact_sstables(compaction_descriptor descriptor, compaction_data& cdata, compaction_group_view& table_s, compaction_progress_monitor& progress_monitor) {
if (descriptor.sstables.empty()) {
return make_exception_future<compaction_result>(std::runtime_error(format("Called {} compaction with empty set on behalf of {}.{}",
compaction_name(descriptor.options.type()), table_s.schema()->ks_name(), table_s.schema()->cf_name())));
}
if (descriptor.options.type() == compaction_type::Scrub
&& std::get<compaction_type_options::scrub>(descriptor.options.options()).operation_mode == compaction_type_options::scrub::mode::validate) {
// Bypass the usual compaction machinery for dry-mode scrub
return scrub_sstables_validate_mode(std::move(descriptor), cdata, table_s, progress_monitor);
}
if (descriptor.options.type() == compaction_type::RewriteComponent) {
return rewrite_sstables_component(std::move(descriptor), table_s);
}
return compaction::run(make_compaction(table_s, std::move(descriptor), cdata, progress_monitor));
}
std::unordered_set<sstables::shared_sstable>
get_fully_expired_sstables(const compaction_group_view& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point compaction_time) {
clogger.debug("Checking droppable sstables in {}.{}", table_s.schema()->ks_name(), table_s.schema()->cf_name());
if (compacting.empty()) {
return {};
}
std::unordered_set<sstables::shared_sstable> candidates;
// Note: This contains both repaired and unrepaired sstables which means
// compaction consults both repaired and unrepaired sstables for tombstone gc.
auto uncompacting_sstables = get_uncompacting_sstables(table_s, compacting);
// Get list of uncompacting sstables that overlap the ones being compacted.
std::vector<sstables::shared_sstable> overlapping = leveled_manifest::overlapping(*table_s.schema(), compacting, uncompacting_sstables);
int64_t min_timestamp = std::numeric_limits<int64_t>::max();
for (auto& sstable : overlapping) {
auto gc_before = sstable->get_gc_before_for_fully_expire(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
if (sstable->get_max_local_deletion_time() >= gc_before) {
min_timestamp = std::min(min_timestamp, sstable->get_stats_metadata().min_timestamp);
}
}
auto compacted_undeleted_gens = table_s.compacted_undeleted_sstables()
| std::views::transform(std::mem_fn(&sstables::sstable::generation))
| std::ranges::to<std::unordered_set>();
auto has_undeleted_ancestor = [&compacted_undeleted_gens] (auto& candidate) {
// Get ancestors from sstable which is empty after restart. It works for this purpose because
// we only need to check that a sstable compacted *in this instance* hasn't an ancestor undeleted.
// Not getting it from sstable metadata because mc format hasn't it available.
return std::ranges::any_of(candidate->compaction_ancestors(), [&compacted_undeleted_gens] (const sstables::generation_type& gen) {
return compacted_undeleted_gens.contains(gen);
});
};
// SStables that do not contain live data is added to list of possibly expired sstables.
for (auto& candidate : compacting) {
auto gc_before = candidate->get_gc_before_for_fully_expire(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
clogger.debug("Checking if candidate of generation {} and max_deletion_time {} is expired, gc_before is {}",
candidate->generation(), candidate->get_stats_metadata().max_local_deletion_time, gc_before);
// A fully expired sstable which has an ancestor undeleted shouldn't be compacted because
// expired data won't be purged because undeleted sstables are taken into account when
// calculating max purgeable timestamp, and not doing it could lead to a compaction loop.
if (candidate->get_max_local_deletion_time() < gc_before && !has_undeleted_ancestor(candidate)) {
clogger.debug("Adding candidate of generation {} to list of possibly expired sstables", candidate->generation());
candidates.insert(candidate);
} else {
min_timestamp = std::min(min_timestamp, candidate->get_stats_metadata().min_timestamp);
}
}
auto it = candidates.begin();
while (it != candidates.end()) {
auto& candidate = *it;
// Remove from list any candidate that may contain a tombstone that covers older data.
if (candidate->get_stats_metadata().max_timestamp >= min_timestamp) {
it = candidates.erase(it);
} else {
clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={})",
candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time);
it++;
}
}
clogger.debug("Checking droppable sstables in {}.{}, candidates={}", table_s.schema()->ks_name(), table_s.schema()->cf_name(), candidates.size());
return candidates;
}
unsigned compaction_descriptor::fan_in() const {
auto unique_run_identifiers = std::ranges::transform_view(sstables, &sstables::sstable::run_identifier) | std::ranges::to<std::unordered_set>();
return unique_run_identifiers.size();
}
uint64_t compaction_descriptor::sstables_size() const {
return std::ranges::fold_left(sstables | std::views::transform(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0), std::plus{});
}
}
auto fmt::formatter<::compaction::compaction_type>::format(::compaction::compaction_type type, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", to_string(type));
}
auto fmt::formatter<::compaction::compaction_type_options::scrub::mode>::format(::compaction::compaction_type_options::scrub::mode mode, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", to_string(mode));
}
auto fmt::formatter<::compaction::compaction_type_options::scrub::quarantine_mode>::format(::compaction::compaction_type_options::scrub::quarantine_mode mode, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", to_string(mode));
}