Add support for the new configuration parameter `compaction_max_shares`, and update the compaction manager to pass it down to the compaction controller when it changes. The shares allocated to compaction jobs will be limited by this new parameter. Fixes #9431 Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
671 lines
32 KiB
C++
671 lines
32 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include <seastar/core/shared_future.hh>
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/condition-variable.hh>
|
|
#include <seastar/core/rwlock.hh>
|
|
#include "sstables/shared_sstable.hh"
|
|
#include "utils/exponential_backoff_retry.hh"
|
|
#include "utils/updateable_value.hh"
|
|
#include "utils/serialized_action.hh"
|
|
#include <vector>
|
|
#include <functional>
|
|
#include "compaction.hh"
|
|
#include "compaction_backlog_manager.hh"
|
|
#include "compaction/compaction_descriptor.hh"
|
|
#include "compaction/task_manager_module.hh"
|
|
#include "compaction_state.hh"
|
|
#include "strategy_control.hh"
|
|
#include "backlog_controller.hh"
|
|
#include "seastarx.hh"
|
|
#include "compaction/exceptions.hh"
|
|
#include "tombstone_gc.hh"
|
|
#include "utils/pluggable.hh"
|
|
#include "compaction/compaction_reenabler.hh"
|
|
#include "utils/disk_space_monitor.hh"
|
|
|
|
namespace db {
|
|
class compaction_history_entry;
|
|
class system_keyspace;
|
|
class config;
|
|
}
|
|
|
|
namespace sstables { class test_env_compaction_manager; }
|
|
|
|
namespace compaction {
|
|
using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;
|
|
|
|
class compaction_task_executor;
|
|
class sstables_task_executor;
|
|
class major_compaction_task_executor;
|
|
class custom_compaction_task_executor;
|
|
class regular_compaction_task_executor;
|
|
class offstrategy_compaction_task_executor;
|
|
class rewrite_sstables_compaction_task_executor;
|
|
class split_compaction_task_executor;
|
|
class cleanup_sstables_compaction_task_executor;
|
|
class validate_sstables_compaction_task_executor;
|
|
|
|
inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) {
|
|
return make_lw_shared<const dht::token_range_vector>(std::move(ranges));
|
|
}
|
|
|
|
// Compaction manager provides facilities to submit and track compaction jobs on
|
|
// behalf of existing tables.
|
|
class compaction_manager: public peering_sharded_service<compaction_manager> {
|
|
public:
|
|
using compaction_stats_opt = std::optional<compaction_stats>;
|
|
struct stats {
|
|
int64_t pending_tasks = 0;
|
|
int64_t completed_tasks = 0;
|
|
uint64_t active_tasks = 0; // Number of compaction going on.
|
|
int64_t errors = 0;
|
|
};
|
|
using scheduling_group = backlog_controller::scheduling_group;
|
|
struct config {
|
|
scheduling_group compaction_sched_group;
|
|
scheduling_group maintenance_sched_group;
|
|
size_t available_memory = 0;
|
|
utils::updateable_value<float> static_shares = utils::updateable_value<float>(0);
|
|
utils::updateable_value<float> max_shares = utils::updateable_value<float>(0);
|
|
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
|
|
std::chrono::seconds flush_all_tables_before_major = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::days(1));
|
|
};
|
|
|
|
public:
|
|
class can_purge_tombstones_tag;
|
|
using can_purge_tombstones = bool_class<can_purge_tombstones_tag>;
|
|
|
|
private:
|
|
shared_ptr<compaction::task_manager_module> _task_manager_module;
|
|
|
|
using compaction_task_executor_list_type = bi::list<
|
|
compaction_task_executor,
|
|
bi::base_hook<bi::list_base_hook<bi::link_mode<bi::auto_unlink>>>,
|
|
bi::constant_time_size<false>>;
|
|
// compaction manager may have N fibers to allow parallel compaction per shard.
|
|
compaction_task_executor_list_type _tasks;
|
|
|
|
// Possible states in which the compaction manager can be found.
|
|
//
|
|
// none: started, but not yet enabled. Once the compaction manager moves out of "none", it can
|
|
// never legally move back
|
|
// stopped: stop() was called. The compaction_manager will never be running again
|
|
// and can no longer be used (although it is possible to still grab metrics, stats,
|
|
// etc)
|
|
// running: running, started and enabled at least once. Whether new compactions are accepted or not is determined by the counter
|
|
enum class state { none, stopped, running };
|
|
state _state = state::none;
|
|
// The compaction manager is initiated in the none state. It is moved to the running state when start() is invoked
|
|
// and the service is immediately enabled.
|
|
uint32_t _disabled_state_count = 0;
|
|
|
|
bool is_disabled() const { return _state != state::running || _disabled_state_count > 0; }
|
|
|
|
std::optional<future<>> _stop_future;
|
|
|
|
stats _stats;
|
|
seastar::metrics::metric_groups _metrics;
|
|
double _last_backlog = 0.0f;
|
|
|
|
// Store sstables that are being compacted at the moment. That's needed to prevent
|
|
// a sstable from being compacted twice.
|
|
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
|
|
|
|
future<> _waiting_reevalution = make_ready_future<>();
|
|
condition_variable _postponed_reevaluation;
|
|
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
|
|
std::unordered_set<compaction::compaction_group_view*> _postponed;
|
|
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
|
|
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
|
|
std::unordered_set<int> _weight_tracker;
|
|
|
|
std::unordered_map<compaction::compaction_group_view*, compaction_state> _compaction_state;
|
|
|
|
// Purpose is to serialize all maintenance (non regular) compaction activity to reduce aggressiveness and space requirement.
|
|
// If the operation must be serialized with regular, then the per-table write lock must be taken.
|
|
seastar::named_semaphore _maintenance_ops_sem = {1, named_semaphore_exception_factory{"maintenance operation"}};
|
|
|
|
// This semaphore ensures that off-strategy compaction will be serialized for
|
|
// all tables, to limit space requirement and protect against candidates
|
|
// being picked more than once.
|
|
seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
|
|
|
|
utils::pluggable<db::system_keyspace> _sys_ks;
|
|
|
|
std::function<void()> compaction_submission_callback();
|
|
// all registered tables are reevaluated at a constant interval.
|
|
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
|
|
static constexpr std::chrono::seconds periodic_compaction_submission_interval() { return std::chrono::seconds(3600); }
|
|
|
|
config _cfg;
|
|
timer<lowres_clock> _compaction_submission_timer;
|
|
compaction_controller _compaction_controller;
|
|
compaction_backlog_manager _backlog_manager;
|
|
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
|
serialized_action _throughput_updater;
|
|
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
|
|
serialized_action _update_compaction_static_shares_action;
|
|
utils::observer<float> _compaction_static_shares_observer;
|
|
utils::observer<float> _compaction_max_shares_observer;
|
|
uint64_t _validation_errors = 0;
|
|
|
|
class strategy_control;
|
|
std::unique_ptr<strategy_control> _strategy_control;
|
|
|
|
shared_tombstone_gc_state _shared_tombstone_gc_state;
|
|
// TODO: tombstone_gc_state should now have value semantics, but the code
|
|
// still uses it with reference semantics (inconsistently though).
|
|
// Drop this member, once the code is converted into using value semantics.
|
|
tombstone_gc_state _tombstone_gc_state;
|
|
|
|
utils::disk_space_monitor::subscription _out_of_space_subscription;
|
|
private:
|
|
// Requires task->_compaction_state.gate to be held and task to be registered in _tasks.
|
|
future<compaction_stats_opt> perform_task(shared_ptr<compaction::compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
|
|
|
|
// Return nullopt if compaction cannot be started
|
|
std::optional<gate::holder> start_compaction(compaction_group_view& t);
|
|
|
|
template<typename TaskExecutor, typename... Args>
|
|
requires std::is_base_of_v<compaction_task_executor, TaskExecutor> &&
|
|
std::is_base_of_v<compaction_task_impl, TaskExecutor> &&
|
|
requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&... args) {
|
|
{TaskExecutor(cm, do_throw_if_stopping, std::forward<Args>(args)...)} -> std::same_as<TaskExecutor>;
|
|
}
|
|
future<compaction_manager::compaction_stats_opt> perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
|
|
|
void stop_tasks(const std::vector<shared_ptr<compaction::compaction_task_executor>>& tasks, sstring reason) noexcept;
|
|
future<> await_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>>, bool task_stopped) const noexcept;
|
|
future<> update_throughput(uint32_t value_mbs);
|
|
|
|
// Return the largest fan-in of currently running compactions
|
|
unsigned current_compaction_fan_in_threshold() const;
|
|
|
|
// Return true if compaction can be initiated
|
|
bool can_register_compaction(compaction::compaction_group_view& t, int weight, unsigned fan_in) const;
|
|
// Register weight for a table. Do that only if can_register_weight()
|
|
// returned true.
|
|
void register_weight(int weight);
|
|
// Deregister weight for a table.
|
|
void deregister_weight(int weight);
|
|
|
|
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
|
|
future<std::vector<sstables::shared_sstable>> get_candidates(compaction::compaction_group_view& t) const;
|
|
|
|
bool eligible_for_compaction(const sstables::shared_sstable& sstable) const;
|
|
bool eligible_for_compaction(const sstables::frozen_sstable_run& sstable_run) const;
|
|
|
|
template <std::ranges::range Range>
|
|
requires std::convertible_to<std::ranges::range_value_t<Range>, sstables::shared_sstable> || std::convertible_to<std::ranges::range_value_t<Range>, sstables::frozen_sstable_run>
|
|
std::vector<std::ranges::range_value_t<Range>> get_candidates(compaction_group_view& t, const Range& sstables) const;
|
|
|
|
template <std::ranges::range Range>
|
|
requires std::same_as<std::ranges::range_value_t<Range>, sstables::shared_sstable>
|
|
void register_compacting_sstables(const Range& range);
|
|
|
|
template <std::ranges::range Range>
|
|
requires std::same_as<std::ranges::range_value_t<Range>, sstables::shared_sstable>
|
|
void deregister_compacting_sstables(const Range& range);
|
|
|
|
// gets the table's compaction state
|
|
// throws std::out_of_range exception if not found.
|
|
compaction_state& get_compaction_state(compaction::compaction_group_view* t);
|
|
const compaction_state& get_compaction_state(compaction::compaction_group_view* t) const {
|
|
return const_cast<compaction_manager*>(this)->get_compaction_state(t);
|
|
}
|
|
|
|
// Return true if compaction manager is enabled and
|
|
// table still exists and compaction is not disabled for the table.
|
|
inline bool can_proceed(compaction::compaction_group_view* t) const;
|
|
|
|
future<> postponed_compactions_reevaluation();
|
|
void reevaluate_postponed_compactions() noexcept;
|
|
// Postpone compaction for a table that couldn't be executed due to ongoing
|
|
// similar-sized compaction.
|
|
void postpone_compaction_for_table(compaction::compaction_group_view* t);
|
|
|
|
using quarantine_invalid_sstables = compaction_type_options::scrub::quarantine_invalid_sstables;
|
|
future<compaction_stats_opt> perform_sstable_scrub_validate_mode(compaction::compaction_group_view& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables);
|
|
future<> update_static_shares(float shares);
|
|
|
|
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
|
|
|
|
// Guarantees that a maintenance task, e.g. cleanup, will be performed on all files available at the time
|
|
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
|
|
template<typename TaskType, typename... Args>
|
|
requires std::derived_from<TaskType, compaction_task_executor> &&
|
|
std::derived_from<TaskType, compaction_task_impl>
|
|
|
|
future<compaction_manager::compaction_stats_opt> perform_task_on_all_files(sstring reason, tasks::task_info info, compaction_group_view& t, compaction_type_options options, owned_ranges_ptr owned_ranges_ptr,
|
|
get_candidates_func get_func, throw_if_stopping do_throw_if_stopping, Args... args);
|
|
|
|
future<compaction_stats_opt> rewrite_sstables(compaction::compaction_group_view& t, compaction_type_options options, owned_ranges_ptr, get_candidates_func, tasks::task_info info,
|
|
can_purge_tombstones can_purge = can_purge_tombstones::yes, sstring options_desc = "");
|
|
|
|
// Stop all fibers, without waiting. Safe to be called multiple times.
|
|
void do_stop() noexcept;
|
|
future<> really_do_stop() noexcept;
|
|
|
|
// Propagate replacement of sstables to all ongoing compaction of a given table
|
|
void propagate_replacement(compaction::compaction_group_view& t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added);
|
|
|
|
// This constructor is supposed to only be used for testing so lets be more explicit
|
|
// about invoking it. Ref #10146
|
|
compaction_manager(tasks::task_manager& tm);
|
|
public:
|
|
compaction_manager(config cfg, abort_source& as, tasks::task_manager& tm);
|
|
~compaction_manager();
|
|
class for_testing_tag{};
|
|
// An inline constructor for testing
|
|
compaction_manager(tasks::task_manager& tm, for_testing_tag) : compaction_manager(tm) {}
|
|
|
|
compaction::task_manager_module& get_task_manager_module() noexcept {
|
|
return *_task_manager_module;
|
|
}
|
|
|
|
const scheduling_group& compaction_sg() const noexcept {
|
|
return _cfg.compaction_sched_group;
|
|
}
|
|
|
|
const scheduling_group& maintenance_sg() const noexcept {
|
|
return _cfg.maintenance_sched_group;
|
|
}
|
|
|
|
size_t available_memory() const noexcept {
|
|
return _cfg.available_memory;
|
|
}
|
|
|
|
float static_shares() const noexcept {
|
|
return _cfg.static_shares.get();
|
|
}
|
|
|
|
float max_shares() const noexcept {
|
|
return _cfg.max_shares.get();
|
|
}
|
|
|
|
uint32_t throughput_mbs() const noexcept {
|
|
return _cfg.throughput_mb_per_sec.get();
|
|
}
|
|
|
|
std::chrono::seconds flush_all_tables_before_major() const noexcept {
|
|
return _cfg.flush_all_tables_before_major;
|
|
}
|
|
|
|
void register_metrics();
|
|
|
|
// enable the compaction manager.
|
|
void enable();
|
|
|
|
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
|
// once, from main teardown path.
|
|
future<> stop();
|
|
future<> start(const db::config& cfg, utils::disk_space_monitor* dsm);
|
|
|
|
// cancels all running compactions and moves the compaction manager into disabled state.
|
|
// The compaction manager is still alive after drain but it will not accept new compactions
|
|
// unless it is moved back to enabled state.
|
|
future<> drain();
|
|
|
|
// Check if compaction manager is running, i.e. it was enabled or drained
|
|
bool is_running() const noexcept {
|
|
return _state == state::running;
|
|
}
|
|
|
|
using compaction_history_consumer = noncopyable_function<future<>(const db::compaction_history_entry&)>;
|
|
future<> get_compaction_history(compaction_history_consumer&& f);
|
|
|
|
// Submit a table to be compacted.
|
|
void submit(compaction::compaction_group_view& t);
|
|
|
|
// Can regular compaction be performed in the given table
|
|
bool can_perform_regular_compaction(compaction::compaction_group_view& t);
|
|
|
|
// Maybe wait before adding more sstables
|
|
// if there are too many sstables.
|
|
future<> maybe_wait_for_sstable_count_reduction(compaction::compaction_group_view& t);
|
|
|
|
// Submit a table to be off-strategy compacted.
|
|
// Returns true iff off-strategy compaction was required and performed.
|
|
future<bool> perform_offstrategy(compaction::compaction_group_view& t, tasks::task_info info);
|
|
|
|
// Submit a table to be cleaned up and wait for its termination.
|
|
//
|
|
// Performs a cleanup on each sstable of the table, excluding
|
|
// those ones that are irrelevant to this node or being compacted.
|
|
// Cleanup is about discarding keys that are no longer relevant for a
|
|
// given sstable, e.g. after node loses part of its token range because
|
|
// of a newly added node.
|
|
future<> perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::compaction_group_view& t, tasks::task_info info);
|
|
private:
|
|
future<> try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::compaction_group_view& t, tasks::task_info info);
|
|
|
|
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
|
|
bool update_sstable_cleanup_state(compaction_group_view& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
|
|
|
future<> on_compaction_completion(compaction_group_view& t, compaction_completion_desc desc, sstables::offstrategy offstrategy);
|
|
public:
|
|
// Submit a table to be upgraded and wait for its termination.
|
|
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::compaction_group_view& t, bool exclude_current_version, tasks::task_info info);
|
|
|
|
// Submit a table to be scrubbed and wait for its termination.
|
|
future<compaction_stats_opt> perform_sstable_scrub(compaction::compaction_group_view& t, compaction_type_options::scrub opts, tasks::task_info info);
|
|
|
|
// Submit a table for major compaction.
|
|
future<> perform_major_compaction(compaction::compaction_group_view& t, tasks::task_info info, bool consider_only_existing_data = false);
|
|
|
|
// Splits a compaction group by segregating all its sstable according to the classifier[1].
|
|
// [1]: See compaction_type_options::splitting::classifier.
|
|
// Returns when all sstables in the main sstable set are split. The only exception is shutdown
|
|
// or user aborted splitting using stop API.
|
|
future<compaction_stats_opt> perform_split_compaction(compaction::compaction_group_view& t, compaction_type_options::split opt, tasks::task_info info);
|
|
|
|
// Splits a single SSTable by segregating all its data according to the classifier.
|
|
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
|
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
|
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
|
|
|
// Run a custom job for a given table, defined by a function
|
|
// it completes when future returned by job is ready or returns immediately
|
|
// if manager was asked to stop.
|
|
//
|
|
// parameter type is the compaction type the operation can most closely be
|
|
// associated with, use compaction_type::Compaction, if none apply.
|
|
// parameter job is a function that will carry the operation
|
|
future<> run_custom_job(compaction::compaction_group_view& s, compaction_type type, const char *desc, noncopyable_function<future<>(compaction_data&, compaction_progress_monitor&)> job, tasks::task_info info, throw_if_stopping do_throw_if_stopping);
|
|
|
|
// Disable compaction temporarily for a table t.
|
|
// Caller should call the compaction_reenabler::reenable
|
|
future<compaction_reenabler> stop_and_disable_compaction(sstring reason, compaction::compaction_group_view& t);
|
|
|
|
future<compaction_reenabler> await_and_disable_compaction(compaction::compaction_group_view& t);
|
|
|
|
future<seastar::rwlock::holder> get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason);
|
|
future<seastar::rwlock::holder> get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason);
|
|
|
|
// Run a function with compaction temporarily disabled for a table T.
|
|
future<> run_with_compaction_disabled(compaction::compaction_group_view& t, std::function<future<> ()> func, sstring reason = "custom operation");
|
|
|
|
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
|
future<> unplug_system_keyspace() noexcept;
|
|
|
|
// Adds a table to the compaction manager.
|
|
// Creates a compaction_state structure that can be used for submitting
|
|
// compaction jobs of all types.
|
|
void add(compaction::compaction_group_view& t);
|
|
// Adds a group with compaction temporarily disabled. Compaction is only enabled back
|
|
// when the compaction_reenabler returned is destroyed.
|
|
compaction_reenabler add_with_compaction_disabled(compaction::compaction_group_view& view);
|
|
|
|
// Remove a table from the compaction manager.
|
|
// Cancel requests on table and wait for possible ongoing compactions.
|
|
future<> remove(compaction::compaction_group_view& t, sstring reason = "table removal") noexcept;
|
|
|
|
const stats& get_stats() const {
|
|
return _stats;
|
|
}
|
|
|
|
const std::vector<compaction_info> get_compactions(std::function<bool(const compaction_group_view*)> filter = [] (auto) { return true; }) const;
|
|
|
|
// Returns true if table has an ongoing compaction, running on its behalf
|
|
bool has_table_ongoing_compaction(const compaction::compaction_group_view& t) const;
|
|
|
|
bool compaction_disabled(compaction::compaction_group_view& t) const;
|
|
|
|
// Stops ongoing compaction of a given type.
|
|
future<> stop_compaction(sstring type, std::function<bool(const compaction_group_view*)> filter = [] (auto) { return true; });
|
|
|
|
private:
|
|
std::vector<shared_ptr<compaction_task_executor>>
|
|
do_stop_ongoing_compactions(sstring reason, std::function<bool(const compaction_group_view*)> filter, std::optional<compaction_type> type_opt) noexcept;
|
|
future<> stop_ongoing_compactions(sstring reason, std::function<bool(const compaction_group_view*)> filter, std::optional<compaction_type> type_opt = {}) noexcept;
|
|
|
|
public:
|
|
// Stops ongoing compaction of a given table and/or compaction_type.
|
|
future<> stop_ongoing_compactions(sstring reason, compaction::compaction_group_view* t = nullptr, std::optional<compaction_type> type_opt = {}) noexcept;
|
|
|
|
future<> await_ongoing_compactions(compaction_group_view* t);
|
|
|
|
compaction_reenabler stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason);
|
|
|
|
double backlog() {
|
|
return _backlog_manager.backlog();
|
|
}
|
|
|
|
void register_backlog_tracker(compaction_backlog_tracker& backlog_tracker) {
|
|
_backlog_manager.register_backlog_tracker(backlog_tracker);
|
|
}
|
|
|
|
compaction_backlog_tracker& get_backlog_tracker(compaction::compaction_group_view& t);
|
|
|
|
static compaction_data create_compaction_data();
|
|
|
|
compaction::strategy_control& get_strategy_control() const noexcept;
|
|
|
|
const tombstone_gc_state& get_tombstone_gc_state() const noexcept {
|
|
return _tombstone_gc_state;
|
|
};
|
|
|
|
shared_tombstone_gc_state& get_shared_tombstone_gc_state() noexcept {
|
|
return _shared_tombstone_gc_state;
|
|
};
|
|
|
|
const shared_tombstone_gc_state& get_shared_tombstone_gc_state() const noexcept {
|
|
return _shared_tombstone_gc_state;
|
|
};
|
|
|
|
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
|
// Returns true iff sst was found and erased.
|
|
bool erase_sstable_cleanup_state(compaction_group_view& t, const sstables::shared_sstable& sst);
|
|
|
|
// checks if the sstable is in the respective compaction_state.sstables_requiring_cleanup set.
|
|
bool requires_cleanup(compaction_group_view& t, const sstables::shared_sstable& sst) const;
|
|
const std::unordered_set<sstables::shared_sstable>& sstables_requiring_cleanup(compaction_group_view& t) const;
|
|
|
|
friend class compacting_sstable_registration;
|
|
friend class compaction_weight_registration;
|
|
friend class sstables::test_env_compaction_manager;
|
|
|
|
friend class compaction::compaction_task_impl;
|
|
friend class compaction::compaction_task_executor;
|
|
friend class compaction::sstables_task_executor;
|
|
friend class compaction::major_compaction_task_executor;
|
|
friend class compaction::split_compaction_task_executor;
|
|
friend class compaction::custom_compaction_task_executor;
|
|
friend class compaction::regular_compaction_task_executor;
|
|
friend class compaction::offstrategy_compaction_task_executor;
|
|
friend class compaction::rewrite_sstables_compaction_task_executor;
|
|
friend class compaction::cleanup_sstables_compaction_task_executor;
|
|
friend class compaction::validate_sstables_compaction_task_executor;
|
|
friend compaction_reenabler;
|
|
};
|
|
|
|
class compaction_task_executor
|
|
: public enable_shared_from_this<compaction_task_executor>
|
|
, public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
|
public:
|
|
enum class state {
|
|
none, // initial and final state
|
|
pending, // task is blocked on a lock, may alternate with active
|
|
// counted in compaction_manager::stats::pending_tasks
|
|
active, // task initiated active compaction, may alternate with pending
|
|
// counted in compaction_manager::stats::active_tasks
|
|
done, // task completed successfully (may transition only to state::none, or
|
|
// state::pending for regular compaction)
|
|
// counted in compaction_manager::stats::completed_tasks
|
|
postponed, // task was postponed (may transition only to state::none)
|
|
// represented by the postponed_compactions metric
|
|
failed, // task failed (may transition only to state::none)
|
|
// counted in compaction_manager::stats::errors
|
|
};
|
|
protected:
|
|
compaction_manager& _cm;
|
|
::compaction::compaction_group_view* _compacting_table = nullptr;
|
|
compaction::compaction_state& _compaction_state;
|
|
compaction_data _compaction_data;
|
|
state _state = state::none;
|
|
throw_if_stopping _do_throw_if_stopping;
|
|
compaction_progress_monitor _progress_monitor;
|
|
|
|
private:
|
|
shared_future<compaction_manager::compaction_stats_opt> _compaction_done = make_ready_future<compaction_manager::compaction_stats_opt>();
|
|
exponential_backoff_retry _compaction_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
|
|
compaction_type _type;
|
|
sstables::run_id _output_run_identifier;
|
|
sstring _description;
|
|
compaction_manager::compaction_stats_opt _stats = std::nullopt;
|
|
|
|
public:
|
|
explicit compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, ::compaction::compaction_group_view* t, compaction_type type, sstring desc);
|
|
|
|
compaction_task_executor(compaction_task_executor&&) = delete;
|
|
compaction_task_executor(const compaction_task_executor&) = delete;
|
|
|
|
virtual ~compaction_task_executor() = default;
|
|
|
|
// called when a compaction replaces the exhausted sstables with the new set
|
|
struct on_replacement {
|
|
virtual ~on_replacement() {}
|
|
// called after the replacement completes
|
|
// @param sstables the old sstable which are replaced in this replacement
|
|
virtual void on_removal(const std::vector<sstables::shared_sstable>& sstables) = 0;
|
|
// called before the replacement happens
|
|
// @param sstables the new sstables to be added to the table's sstable set
|
|
virtual void on_addition(const std::vector<sstables::shared_sstable>& sstables) = 0;
|
|
};
|
|
|
|
protected:
|
|
future<> perform();
|
|
|
|
virtual future<compaction_manager::compaction_stats_opt> do_run() = 0;
|
|
|
|
state switch_state(state new_state);
|
|
|
|
future<semaphore_units<named_semaphore_exception_factory>> acquire_semaphore(named_semaphore& sem, size_t units = 1);
|
|
|
|
// Return true if the task isn't stopped
|
|
// and the compaction manager allows proceeding.
|
|
inline bool can_proceed(throw_if_stopping do_throw_if_stopping = throw_if_stopping::no) const;
|
|
void setup_new_compaction(sstables::run_id output_run_id = sstables::run_id::create_null_id());
|
|
void finish_compaction(state finish_state = state::done) noexcept;
|
|
|
|
// Compaction manager stop itself if it finds an storage I/O error which results in
|
|
// stop of transportation services. It cannot make progress anyway.
|
|
// Returns exception if error is judged fatal, and compaction task must be stopped,
|
|
// otherwise, returns stop_iteration::no after sleep for exponential retry.
|
|
future<stop_iteration> maybe_retry(std::exception_ptr err, bool throw_on_abort = false);
|
|
|
|
future<compaction_result> compact_sstables_and_update_history(compaction_descriptor descriptor, compaction_data& cdata, on_replacement&,
|
|
compaction_manager::can_purge_tombstones can_purge = compaction_manager::can_purge_tombstones::yes);
|
|
future<compaction_result> compact_sstables(compaction_descriptor descriptor, compaction_data& cdata, on_replacement&,
|
|
compaction_manager::can_purge_tombstones can_purge = compaction_manager::can_purge_tombstones::yes,
|
|
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
|
future<> update_history(::compaction::compaction_group_view& t, compaction_result&& res, const compaction_data& cdata);
|
|
bool should_update_history(compaction_type ct) {
|
|
return ct == compaction_type::Compaction || ct == compaction_type::Major;
|
|
}
|
|
public:
|
|
compaction_manager::compaction_stats_opt get_stats() const noexcept {
|
|
return _stats;
|
|
}
|
|
|
|
future<compaction_manager::compaction_stats_opt> run_compaction() noexcept;
|
|
|
|
const ::compaction::compaction_group_view* compacting_table() const noexcept {
|
|
return _compacting_table;
|
|
}
|
|
|
|
compaction_type compaction_type() const noexcept {
|
|
return _type;
|
|
}
|
|
|
|
bool compaction_running() const noexcept {
|
|
return _state == state::active;
|
|
}
|
|
|
|
const ::compaction::compaction_data& compaction_data() const noexcept {
|
|
return _compaction_data;
|
|
}
|
|
|
|
::compaction::compaction_data& compaction_data() noexcept {
|
|
return _compaction_data;
|
|
}
|
|
|
|
bool generating_output_run() const noexcept {
|
|
return compaction_running() && _output_run_identifier;
|
|
}
|
|
const sstables::run_id& output_run_id() const noexcept {
|
|
return _output_run_identifier;
|
|
}
|
|
|
|
const sstring& description() const noexcept {
|
|
return _description;
|
|
}
|
|
private:
|
|
// Before _compaction_done is set in compaction_task_executor::run_compaction(), compaction_done() returns ready future.
|
|
future<compaction_manager::compaction_stats_opt> compaction_done() noexcept {
|
|
return _compaction_done.get_future();
|
|
}
|
|
|
|
future<sstables::sstable_set> sstable_set_for_tombstone_gc(::compaction::compaction_group_view& t);
|
|
public:
|
|
bool stopping() const noexcept {
|
|
return _compaction_data.abort.abort_requested();
|
|
}
|
|
|
|
void abort(abort_source& as) noexcept;
|
|
|
|
void stop_compaction(sstring reason) noexcept;
|
|
|
|
compaction_stopped_exception make_compaction_stopped_exception() const;
|
|
|
|
template<typename TaskExecutor, typename... Args>
|
|
requires std::is_base_of_v<compaction_task_executor, TaskExecutor> &&
|
|
std::is_base_of_v<compaction_task_impl, TaskExecutor> &&
|
|
requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&... args) {
|
|
{TaskExecutor(cm, do_throw_if_stopping, std::forward<Args>(args)...)} -> std::same_as<TaskExecutor>;
|
|
}
|
|
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
|
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
|
|
friend fmt::formatter<compaction_task_executor>;
|
|
friend void compaction_manager::stop_tasks(const std::vector<shared_ptr<compaction_task_executor>>& tasks, sstring reason) noexcept;
|
|
friend future<> compaction_manager::await_tasks(std::vector<shared_ptr<compaction_task_executor>>, bool task_stopped) const noexcept;
|
|
friend sstables::test_env_compaction_manager;
|
|
};
|
|
|
|
}
|
|
|
|
template <>
|
|
struct fmt::formatter<compaction::compaction_task_executor::state> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(compaction::compaction_task_executor::state c, fmt::format_context& ctx) const -> decltype(ctx.out());
|
|
};
|
|
|
|
template <>
|
|
struct fmt::formatter<compaction::compaction_task_executor> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(const compaction::compaction_task_executor& ex, fmt::format_context& ctx) const -> decltype(ctx.out());
|
|
};
|
|
|
|
namespace compaction {
|
|
|
|
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges);
|
|
|
|
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
|
|
future<std::vector<sstables::shared_sstable>> in_strategy_sstables(compaction::compaction_group_view& table_s);
|
|
|
|
}
|