Merge "Improve efficiency of cleanup compaction by making it bucket aware" from Raphael S. Carvalho
" Cleanup compaction works by rewriting all sstables that need clean up, one at a time. This approach can cause bad write amplification because the output data is being made incrementally available for regular compaction. Cleanup is a long operation on large data sets, and while it's happening, new data can be written to buckets, triggering regular compaction. Cleanup fighting for resources with regular compaction is a known problem. With cleanup adding one file at a time to buckets, regular may require multiple rounds to compact the data in a given bucket B, producing bad writeamp. To fix this problem, cleanup will be made bucket aware. As each compaction strategy has its own definition of bucket, strategies will implement their own method to retrieve cleanup jobs. The method will be implemented such that all files in a bucket B will be cleaned up together, and on completion, they'll be made available for regular at once. For STCS / ICS, a bucket is a size tier. For TWCS, a bucket is a window. For LCS, a bucket is a level. In this way, writeamp problem is fixed as regular won't have to perform multiple rounds to compact the data in a given bucket. Additionally, cleanup will now be able to deduplicate data and will become way more efficient at garbage collecting expired data. The space requirement shouldn't be an issue, as compacting an entire bucket happens during regular compaction anyway. With leveled strategy, compacting an entire level is also not a problem because files in a level L don't overlap and therefore incremental compaction is employed to limit the space requirement. By the time being, only STCS cleanup was made bucket aware. The others will be using a default method, where one file is cleaned up at a time. Making cleanup of other strategies bucket aware is relatively easy now and will be done soon. Refs #10097. " * 'cleanup-compaction-revamp/v3' of https://github.com/raphaelsc/scylla: test: sstable_compaction_test: Add test for strategy cleanup method compaction: STCS: Implement cleanup strategy compaction_manager: Wire cleanup task into the strategy cleanup method compaction_strategy: Allow strategies to define their own cleanup strategy compaction: Introduce compaction_descriptor::sstables_size compaction: Move decision of garbage collection from strategy to task type
This commit is contained in:
@@ -1791,4 +1791,8 @@ unsigned compaction_descriptor::fan_in() const {
|
||||
return boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
|
||||
}
|
||||
|
||||
uint64_t compaction_descriptor::sstables_size() const {
|
||||
return boost::accumulate(sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -163,14 +163,12 @@ struct compaction_descriptor {
|
||||
static constexpr uint64_t default_max_sstable_bytes = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
explicit compaction_descriptor(std::vector<sstables::shared_sstable> sstables,
|
||||
std::optional<sstables::sstable_set> all_sstables_snapshot,
|
||||
::io_priority_class io_priority,
|
||||
int level = default_level,
|
||||
uint64_t max_sstable_bytes = default_max_sstable_bytes,
|
||||
utils::UUID run_identifier = utils::make_random_uuid(),
|
||||
compaction_type_options options = compaction_type_options::make_regular())
|
||||
: sstables(std::move(sstables))
|
||||
, all_sstables_snapshot(std::move(all_sstables_snapshot))
|
||||
, level(level)
|
||||
, max_sstable_bytes(max_sstable_bytes)
|
||||
, run_identifier(run_identifier)
|
||||
@@ -180,10 +178,8 @@ struct compaction_descriptor {
|
||||
|
||||
explicit compaction_descriptor(sstables::has_only_fully_expired has_only_fully_expired,
|
||||
std::vector<sstables::shared_sstable> sstables,
|
||||
std::optional<sstables::sstable_set> all_sstables_snapshot,
|
||||
::io_priority_class io_priority)
|
||||
: sstables(std::move(sstables))
|
||||
, all_sstables_snapshot(std::move(all_sstables_snapshot))
|
||||
, level(default_level)
|
||||
, max_sstable_bytes(default_max_sstable_bytes)
|
||||
, run_identifier(utils::make_random_uuid())
|
||||
@@ -194,6 +190,10 @@ struct compaction_descriptor {
|
||||
|
||||
// Return fan-in of this job, which is equal to its number of runs.
|
||||
unsigned fan_in() const;
|
||||
// Enables garbage collection for this descriptor, meaning that compaction will be able to purge expired data
|
||||
void enable_garbage_collection(sstables::sstable_set snapshot) { all_sstables_snapshot = std::move(snapshot); }
|
||||
// Returns total size of all sstables contained in this descriptor
|
||||
uint64_t sstables_size() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ static inline int calculate_weight(const sstables::compaction_descriptor& descri
|
||||
if (descriptor.sstables.empty() || descriptor.has_only_fully_expired) {
|
||||
return 0;
|
||||
}
|
||||
return calculate_weight(boost::accumulate(descriptor.sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0)));
|
||||
return calculate_weight(descriptor.sstables_size());
|
||||
}
|
||||
|
||||
unsigned compaction_manager::current_compaction_fan_in_threshold() const {
|
||||
@@ -284,13 +284,16 @@ future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> t
|
||||
}
|
||||
}
|
||||
|
||||
future<> compaction_manager::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted) {
|
||||
future<> compaction_manager::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) {
|
||||
if (!descriptor.sstables.size()) {
|
||||
// if there is nothing to compact, just return.
|
||||
co_return;
|
||||
}
|
||||
|
||||
replica::table& t = *_compacting_table;
|
||||
if (can_purge) {
|
||||
descriptor.enable_garbage_collection(t.get_sstable_set());
|
||||
}
|
||||
descriptor.creator = [&t] (shard_id dummy) {
|
||||
auto sst = t.make_sstable();
|
||||
return sst;
|
||||
@@ -1059,9 +1062,8 @@ private:
|
||||
replica::table& t = *_compacting_table;
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto sstable_set_snapshot = _can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
||||
// FIXME: this compaction should run with maintenance priority.
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(),
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options);
|
||||
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
@@ -1076,7 +1078,7 @@ private:
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await compact_sstables(std::move(descriptor), _compaction_data, std::move(release_exhausted));
|
||||
co_await compact_sstables(std::move(descriptor), _compaction_data, std::move(release_exhausted), _can_purge);
|
||||
finish_compaction();
|
||||
_cm.reevaluate_postponed_compactions();
|
||||
co_return; // done with current sstable
|
||||
@@ -1093,7 +1095,9 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
||||
template<typename TaskType, typename... Args>
|
||||
requires std::derived_from<TaskType, compaction_manager::task>
|
||||
future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
|
||||
if (_state != state::enabled) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1117,7 +1121,11 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
return a->data_size() > b->data_size();
|
||||
});
|
||||
});
|
||||
co_await perform_task(seastar::make_shared<rewrite_sstables_compaction_task>(*this, t, std::move(options), std::move(sstables), std::move(compacting), can_purge));
|
||||
co_await perform_task(seastar::make_shared<TaskType>(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
||||
return perform_task_on_all_files<rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
|
||||
}
|
||||
|
||||
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
|
||||
@@ -1143,7 +1151,6 @@ private:
|
||||
try {
|
||||
auto desc = sstables::compaction_descriptor(
|
||||
{ sst },
|
||||
{},
|
||||
_cm._maintenance_sg.io,
|
||||
sst->get_sstable_level(),
|
||||
sstables::compaction_descriptor::default_max_sstable_bytes,
|
||||
@@ -1176,6 +1183,74 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table*
|
||||
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
|
||||
}
|
||||
|
||||
class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task {
|
||||
const sstables::compaction_type_options _cleanup_options;
|
||||
compacting_sstable_registration _compacting;
|
||||
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
|
||||
public:
|
||||
cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options,
|
||||
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
|
||||
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
||||
, _cleanup_options(std::move(options))
|
||||
, _compacting(std::move(compacting))
|
||||
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates))
|
||||
{
|
||||
// Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs
|
||||
// will have more space available released by previous jobs.
|
||||
std::ranges::sort(_pending_cleanup_jobs, std::ranges::greater(), std::mem_fn(&sstables::compaction_descriptor::sstables_size));
|
||||
_cm._stats.pending_tasks += _pending_cleanup_jobs.size();
|
||||
}
|
||||
|
||||
virtual ~cleanup_sstables_compaction_task() {
|
||||
_cm._stats.pending_tasks -= _pending_cleanup_jobs.size();
|
||||
}
|
||||
protected:
|
||||
virtual future<> do_run() override {
|
||||
switch_state(state::pending);
|
||||
auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1);
|
||||
|
||||
while (!_pending_cleanup_jobs.empty() && can_proceed()) {
|
||||
auto active_job = std::move(_pending_cleanup_jobs.back());
|
||||
active_job.options = _cleanup_options;
|
||||
co_await run_cleanup_job(std::move(active_job));
|
||||
_pending_cleanup_jobs.pop_back();
|
||||
_cm._stats.pending_tasks--;
|
||||
}
|
||||
}
|
||||
private:
|
||||
// Releases reference to cleaned files such that respective used disk space can be freed.
|
||||
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
|
||||
_compacting.release_compacting(exhausted_sstables);
|
||||
}
|
||||
|
||||
future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
|
||||
co_await coroutine::switch_to(_cm._compaction_controller.sg());
|
||||
|
||||
for (;;) {
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
|
||||
_cm.register_backlog_tracker(user_initiated);
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
co_await compact_sstables(descriptor, _compaction_data,
|
||||
std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1));
|
||||
finish_compaction();
|
||||
_cm.reevaluate_postponed_compactions();
|
||||
co_return; // done with current job
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
finish_compaction(state::failed);
|
||||
// retry current job or rethrows exception
|
||||
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
const dht::token_range_vector& sorted_owned_ranges,
|
||||
schema_ptr s) {
|
||||
@@ -1207,8 +1282,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
|
||||
});
|
||||
};
|
||||
if (check_for_cleanup()) {
|
||||
return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
t->schema()->ks_name(), t->schema()->cf_name())));
|
||||
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
t->schema()->ks_name(), t->schema()->cf_name()));
|
||||
}
|
||||
|
||||
auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name());
|
||||
@@ -1225,7 +1300,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
|
||||
});
|
||||
};
|
||||
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables));
|
||||
co_await perform_task_on_all_files<cleanup_sstables_compaction_task>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
|
||||
std::move(get_sstables));
|
||||
}
|
||||
|
||||
// Submit a table to be upgraded and wait for its termination.
|
||||
|
||||
@@ -78,6 +78,9 @@ private:
|
||||
};
|
||||
|
||||
public:
|
||||
class can_purge_tombstones_tag;
|
||||
using can_purge_tombstones = bool_class<can_purge_tombstones_tag>;
|
||||
|
||||
class task {
|
||||
public:
|
||||
enum class state {
|
||||
@@ -146,7 +149,8 @@ public:
|
||||
|
||||
// Compacts set of SSTables according to the descriptor.
|
||||
using release_exhausted_func_t = std::function<void(const std::vector<sstables::shared_sstable>& exhausted_sstables)>;
|
||||
future<> compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted);
|
||||
future<> compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted,
|
||||
can_purge_tombstones can_purge = can_purge_tombstones::yes);
|
||||
public:
|
||||
future<> run() noexcept;
|
||||
|
||||
@@ -218,6 +222,7 @@ public:
|
||||
class regular_compaction_task;
|
||||
class offstrategy_compaction_task;
|
||||
class rewrite_sstables_compaction_task;
|
||||
class cleanup_sstables_compaction_task;
|
||||
class validate_sstables_compaction_task;
|
||||
class compaction_manager_test_task;
|
||||
|
||||
@@ -322,8 +327,12 @@ private:
|
||||
future<> perform_sstable_scrub_validate_mode(replica::table* t);
|
||||
|
||||
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
|
||||
class can_purge_tombstones_tag;
|
||||
using can_purge_tombstones = bool_class<can_purge_tombstones_tag>;
|
||||
|
||||
// Guarantees that a maintenance task, e.g. cleanup, will be performed on all files available at the time
|
||||
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
|
||||
template<typename TaskType, typename... Args>
|
||||
requires std::derived_from<TaskType, task>
|
||||
future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args);
|
||||
|
||||
future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
|
||||
public:
|
||||
|
||||
@@ -37,7 +37,16 @@ logging::logger leveled_manifest::logger("LeveledManifest");
|
||||
namespace sstables {
|
||||
|
||||
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
|
||||
return compaction_descriptor(std::move(candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor> compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
// The default implementation is suboptimal and causes the writeamp problem described issue in #10097.
|
||||
// The compaction strategy relying on it should strive to implement its own method, to make cleanup bucket aware.
|
||||
return boost::copy_range<std::vector<compaction_descriptor>>(candidates | boost::adaptors::transformed([] (const shared_sstable& sst) {
|
||||
return compaction_descriptor({ sst }, service::get_local_compaction_priority(),
|
||||
sst->get_sstable_level(), sstables::compaction_descriptor::default_max_sstable_bytes, sst->run_identifier());
|
||||
}));
|
||||
}
|
||||
|
||||
bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time) {
|
||||
@@ -614,7 +623,7 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti
|
||||
|
||||
if (!sstables.empty()) {
|
||||
date_tiered_manifest::logger.debug("datetiered: Compacting {} out of {} sstables", sstables.size(), candidates.size());
|
||||
return sstables::compaction_descriptor(std::move(sstables), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return sstables::compaction_descriptor(std::move(sstables), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
|
||||
@@ -630,7 +639,7 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti
|
||||
auto it = std::min_element(candidates.begin(), candidates.end(), [] (auto& i, auto& j) {
|
||||
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
|
||||
});
|
||||
return sstables::compaction_descriptor({ *it }, table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return sstables::compaction_descriptor({ *it }, service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
size_tiered_compaction_strategy::size_tiered_compaction_strategy(const std::map<sstring, sstring>& options)
|
||||
@@ -664,6 +673,10 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(table_state&
|
||||
return _compaction_strategy_impl->get_major_compaction_job(table_s, std::move(candidates));
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor> compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, candidates);
|
||||
}
|
||||
|
||||
void compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
||||
_compaction_strategy_impl->notify_completion(removed, added);
|
||||
}
|
||||
|
||||
@@ -49,6 +49,8 @@ public:
|
||||
|
||||
compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<shared_sstable> candidates);
|
||||
|
||||
std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const;
|
||||
|
||||
// Some strategies may look at the compacted and resulting sstables to
|
||||
// get some useful information for subsequent compactions.
|
||||
void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added);
|
||||
|
||||
@@ -47,6 +47,7 @@ public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates);
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const;
|
||||
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
|
||||
@@ -51,7 +51,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(t
|
||||
auto gc_before2 = j->get_gc_before_for_drop_estimation(compaction_time);
|
||||
return i->estimate_droppable_tombstone_ratio(gc_before1) < j->estimate_droppable_tombstone_ratio(gc_before2);
|
||||
});
|
||||
return sstables::compaction_descriptor({ sst }, table_s.get_sstable_set(), service::get_local_compaction_priority(), sst->get_sstable_level());
|
||||
return sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(), sst->get_sstable_level());
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@@ -64,7 +64,7 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
|
||||
auto& sst = *std::max_element(candidates.begin(), candidates.end(), [&] (sstables::shared_sstable& sst1, sstables::shared_sstable& sst2) {
|
||||
return sst1->get_sstable_level() < sst2->get_sstable_level();
|
||||
});
|
||||
return compaction_descriptor(std::move(candidates), table_s.get_sstable_set(), service::get_local_compaction_priority(),
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority(),
|
||||
sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024);
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS - 1);
|
||||
|
||||
// This is really unexpected, so we'll just compact it all to fix it
|
||||
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, leveled_manifest::MAX_LEVELS - 1, max_sstable_size_in_bytes);
|
||||
compaction_descriptor desc(std::move(input), iop, leveled_manifest::MAX_LEVELS - 1, max_sstable_size_in_bytes);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -187,7 +187,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
unsigned ideal_level = std::ceil(log_fanout(total_bytes / max_sstable_size_in_bytes));
|
||||
|
||||
leveled_manifest::logger.info("Reshaping {} disjoint sstables in level 0 into level {}", level_info[0].size(), ideal_level);
|
||||
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, ideal_level, max_sstable_size_in_bytes);
|
||||
compaction_descriptor desc(std::move(input), iop, ideal_level, max_sstable_size_in_bytes);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -207,7 +207,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
if (!disjoint) {
|
||||
leveled_manifest::logger.warn("Turns out that level {} is not disjoint, found {} overlapping SSTables, so compacting everything on behalf of {}.{}", level, overlapping_sstables, schema->ks_name(), schema->cf_name());
|
||||
// Unfortunately no good limit to limit input size to max_sstables for LCS major
|
||||
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, max_filled_level, max_sstable_size_in_bytes);
|
||||
compaction_descriptor desc(std::move(input), iop, max_filled_level, max_sstable_size_in_bytes);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ public:
|
||||
if (info.can_promote) {
|
||||
info.candidates = get_overlapping_starved_sstables(next_level, std::move(info.candidates), compaction_counter);
|
||||
}
|
||||
return sstables::compaction_descriptor(std::move(info.candidates), _table_s.get_sstable_set(),
|
||||
return sstables::compaction_descriptor(std::move(info.candidates),
|
||||
service::get_local_compaction_priority(), next_level, _max_sstable_size_in_bytes);
|
||||
} else {
|
||||
logger.debug("No compaction candidates for L{}", level);
|
||||
@@ -231,7 +231,7 @@ public:
|
||||
_table_s.min_compaction_threshold(), _schema->max_compaction_threshold(), _stcs_options);
|
||||
if (!most_interesting.empty()) {
|
||||
logger.debug("L0 is too far behind, performing size-tiering there first");
|
||||
return sstables::compaction_descriptor(std::move(most_interesting), _table_s.get_sstable_set(),
|
||||
return sstables::compaction_descriptor(std::move(most_interesting),
|
||||
service::get_local_compaction_priority());
|
||||
}
|
||||
}
|
||||
@@ -246,7 +246,7 @@ public:
|
||||
auto info = get_candidates_for(0, last_compacted_keys);
|
||||
if (!info.candidates.empty()) {
|
||||
auto next_level = get_next_level(info.candidates, info.can_promote);
|
||||
return sstables::compaction_descriptor(std::move(info.candidates), _table_s.get_sstable_set(),
|
||||
return sstables::compaction_descriptor(std::move(info.candidates),
|
||||
service::get_local_compaction_priority(), next_level, _max_sstable_size_in_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,13 +156,13 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
|
||||
|
||||
if (is_any_bucket_interesting(buckets, min_threshold)) {
|
||||
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
|
||||
return sstables::compaction_descriptor(std::move(most_interesting), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return sstables::compaction_descriptor(std::move(most_interesting), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
// If we are not enforcing min_threshold explicitly, try any pair of SStables in the same tier.
|
||||
if (!table_s.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
|
||||
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
|
||||
return sstables::compaction_descriptor(std::move(most_interesting), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return sstables::compaction_descriptor(std::move(most_interesting), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
|
||||
@@ -182,7 +182,7 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
|
||||
auto it = std::min_element(sstables.begin(), sstables.end(), [] (auto& i, auto& j) {
|
||||
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
|
||||
});
|
||||
return sstables::compaction_descriptor({ *it }, table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return sstables::compaction_descriptor({ *it }, service::get_local_compaction_priority());
|
||||
}
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
@@ -242,7 +242,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
// All sstables can be reshaped at once if the amount of overlapping will not cause memory usage to be high,
|
||||
// which is possible because partitioned set is able to incrementally open sstables during compaction
|
||||
if (sstable_set_overlapping_count(schema, input) <= max_sstables) {
|
||||
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop);
|
||||
compaction_descriptor desc(std::move(input), iop);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -258,7 +258,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
});
|
||||
bucket.resize(max_sstables);
|
||||
}
|
||||
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
|
||||
compaction_descriptor desc(std::move(bucket), iop);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -267,4 +267,30 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor>
|
||||
size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
std::vector<compaction_descriptor> ret;
|
||||
const auto& schema = table_s.schema();
|
||||
unsigned max_threshold = schema->max_compaction_threshold();
|
||||
|
||||
for (auto& bucket : get_buckets(candidates)) {
|
||||
if (bucket.size() > max_threshold) {
|
||||
// preserve token contiguity
|
||||
std::ranges::sort(bucket, [&schema] (const shared_sstable& a, const shared_sstable& b) {
|
||||
return a->get_first_decorated_key().tri_compare(*schema, b->get_first_decorated_key()) < 0;
|
||||
});
|
||||
}
|
||||
auto it = bucket.begin();
|
||||
while (it != bucket.end()) {
|
||||
unsigned remaining = std::distance(it, bucket.end());
|
||||
unsigned needed = std::min(remaining, max_threshold);
|
||||
std::vector<shared_sstable> sstables;
|
||||
std::move(it, it + needed, std::back_inserter(sstables));
|
||||
ret.push_back(compaction_descriptor(std::move(sstables), service::get_local_compaction_priority()));
|
||||
std::advance(it, needed);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -125,6 +125,8 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const override;
|
||||
|
||||
static int64_t estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
|
||||
int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options);
|
||||
virtual int64_t estimated_pending_compactions(table_state& table_s) const override;
|
||||
|
||||
@@ -178,7 +178,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
});
|
||||
multi_window.resize(max_sstables);
|
||||
}
|
||||
compaction_descriptor desc(std::move(multi_window), std::optional<sstables::sstable_set>(), iop);
|
||||
compaction_descriptor desc(std::move(multi_window), iop);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -204,7 +204,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
}
|
||||
}
|
||||
if (!single_window.empty()) {
|
||||
compaction_descriptor desc(std::move(single_window), std::optional<sstables::sstable_set>(), iop);
|
||||
compaction_descriptor desc(std::move(single_window), iop);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
@@ -233,11 +233,11 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
|
||||
|
||||
if (!expired.empty()) {
|
||||
clogger.debug("Going to compact {} expired sstables", expired.size());
|
||||
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
|
||||
return compaction_descriptor(std::move(compaction_candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
|
||||
return compaction_descriptor(std::move(compaction_candidates), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
time_window_compaction_strategy::bucket_compaction_mode
|
||||
|
||||
@@ -408,7 +408,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
|
||||
// jobs. But only one will run in parallel at a time
|
||||
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
|
||||
return cm.run_custom_job(&table, compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) {
|
||||
sstables::compaction_descriptor desc(sstlist, {}, iop);
|
||||
sstables::compaction_descriptor desc(sstlist, iop);
|
||||
desc.options = sstables::compaction_type_options::make_reshard();
|
||||
desc.creator = std::move(creator);
|
||||
|
||||
|
||||
@@ -3030,7 +3030,7 @@ static flat_mutation_reader compacted_sstable_reader(test_env& env, schema_ptr s
|
||||
auto sstables = open_sstables(env, s, format("test/resource/sstables/3.x/uncompressed/{}", table_name), generations);
|
||||
auto new_generation = generations.back() + 1;
|
||||
|
||||
auto desc = sstables::compaction_descriptor(std::move(sstables), cf->get_sstable_set(), default_priority_class());
|
||||
auto desc = sstables::compaction_descriptor(std::move(sstables), default_priority_class());
|
||||
desc.creator = [s, &tmp, &env, new_generation] (shard_id dummy) {
|
||||
return env.make_sstable(s, tmp.path().string(), new_generation,
|
||||
sstables::sstable_version_types::mc, sstable::format_types::big, 4096);
|
||||
|
||||
@@ -279,7 +279,7 @@ SEASTAR_TEST_CASE(compact) {
|
||||
return env.make_sstable(s, tmpdir_path,
|
||||
(*gen)++, sstables::get_highest_sstable_version(), sstables::sstable::format_types::big);
|
||||
};
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables), cf->get_sstable_set(), default_priority_class()), *cf, new_sstable).then([&env, s, generation, cf, cm, tmpdir_path] (auto) {
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables), default_priority_class()), *cf, new_sstable).then([&env, s, generation, cf, cm, tmpdir_path] (auto) {
|
||||
// Verify that the compacted sstable has the right content. We expect to see:
|
||||
// name | age | height
|
||||
// -------+-----+--------
|
||||
@@ -428,7 +428,7 @@ static future<std::vector<unsigned long>> compact_sstables(test_env& env, sstrin
|
||||
auto sstables_to_compact = sstables::size_tiered_compaction_strategy::most_interesting_bucket(*sstables, min_threshold, max_threshold);
|
||||
// We do expect that all candidates were selected for compaction (in this case).
|
||||
BOOST_REQUIRE(sstables_to_compact.size() == sstables->size());
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables_to_compact), cf->get_sstable_set(),
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables_to_compact),
|
||||
default_priority_class()), *cf, new_sstable).then([generation] (auto) {});
|
||||
} else if (strategy == compaction_strategy_type::leveled) {
|
||||
for (auto& sst : *sstables) {
|
||||
@@ -447,7 +447,7 @@ static future<std::vector<unsigned long>> compact_sstables(test_env& env, sstrin
|
||||
BOOST_REQUIRE(candidate.level == 1);
|
||||
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
|
||||
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(candidate.sstables), cf->get_sstable_set(),
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(candidate.sstables),
|
||||
default_priority_class(), candidate.level, 1024*1024), *cf, new_sstable).then([generation] (auto) {});
|
||||
} else {
|
||||
throw std::runtime_error("unexpected strategy");
|
||||
@@ -1067,7 +1067,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
|
||||
for (auto&& sst : all) {
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
}
|
||||
return compact_sstables(sstables::compaction_descriptor(to_compact, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen).get0().new_sstables;
|
||||
return compact_sstables(sstables::compaction_descriptor(to_compact, default_priority_class()), *cf, sst_gen).get0().new_sstables;
|
||||
};
|
||||
|
||||
auto next_timestamp = [] {
|
||||
@@ -1278,7 +1278,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
|
||||
std::vector<shared_sstable> sstables;
|
||||
sstables.push_back(std::move(sstp));
|
||||
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables), cf->get_sstable_set(), default_priority_class()), *cf, creator).then([&env, s, key, new_tables] (auto) {
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(sstables), default_priority_class()), *cf, creator).then([&env, s, key, new_tables] (auto) {
|
||||
BOOST_REQUIRE(new_tables->size() == 1);
|
||||
auto newsst = (*new_tables)[0];
|
||||
BOOST_REQUIRE(newsst->generation() == 52);
|
||||
@@ -1350,7 +1350,7 @@ SEASTAR_TEST_CASE(test_sstable_max_local_deletion_time_2) {
|
||||
BOOST_REQUIRE(now.time_since_epoch().count() == sst2->get_stats_metadata().max_local_deletion_time);
|
||||
|
||||
auto creator = [&env, s, tmpdir_path, version, gen = make_lw_shared<unsigned>(56)] { return env.make_sstable(s, tmpdir_path, (*gen)++, version, big); };
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({sst1, sst2}, cf->get_sstable_set(), default_priority_class()), *cf, creator).get0();
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({sst1, sst2}, default_priority_class()), *cf, creator).get0();
|
||||
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
||||
BOOST_REQUIRE(((now + gc_clock::duration(100)).time_since_epoch().count()) ==
|
||||
info.new_sstables.front()->get_stats_metadata().max_local_deletion_time);
|
||||
@@ -1443,7 +1443,7 @@ SEASTAR_TEST_CASE(compaction_with_fully_expired_table) {
|
||||
auto expired_sst = *expired.begin();
|
||||
BOOST_REQUIRE(expired_sst->generation() == 1);
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen).get0();
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(ssts, default_priority_class()), *cf, sst_gen).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.empty());
|
||||
BOOST_REQUIRE(ret.end_size == 0);
|
||||
});
|
||||
@@ -1746,7 +1746,7 @@ SEASTAR_TEST_CASE(time_window_strategy_size_tiered_behavior_correctness) {
|
||||
auto close_cf = deferred_stop(cf);
|
||||
auto major_compact_bucket = [&] (api::timestamp_type window_ts) {
|
||||
auto bound = time_window_compaction_strategy::get_window_lower_bound(window_size, window_ts);
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(std::move(buckets[bound]), cf->get_sstable_set(), default_priority_class()), *cf, sst_gen).get0();
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(std::move(buckets[bound]), default_priority_class()), *cf, sst_gen).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
buckets[bound] = std::move(ret.new_sstables);
|
||||
};
|
||||
@@ -1847,7 +1847,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test_2) {
|
||||
check_min_max_column_names(sst2, {"9ck101"}, {"9ck298"});
|
||||
|
||||
auto creator = [&env, s, &tmp, version] { return env.make_sstable(s, tmp.path().string(), 3, version, big); };
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({sst, sst2}, cf->get_sstable_set(), default_priority_class()), *cf, creator).get0();
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({sst, sst2}, default_priority_class()), *cf, creator).get0();
|
||||
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
||||
check_min_max_column_names(info.new_sstables.front(), {"0ck100"}, {"9ck298"});
|
||||
}
|
||||
@@ -1927,7 +1927,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big);
|
||||
return sst;
|
||||
};
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({ sst }, cf->get_sstable_set(), default_priority_class()), *cf, creator).get0();
|
||||
auto info = compact_sstables(sstables::compaction_descriptor({ sst }, default_priority_class()), *cf, creator).get0();
|
||||
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
||||
BOOST_REQUIRE(info.new_sstables.front()->estimate_droppable_tombstone_ratio(gc_before) == 0.0f);
|
||||
BOOST_REQUIRE_CLOSE(info.new_sstables.front()->data_size(), uncompacted_size*(1-expired), 5);
|
||||
@@ -2003,7 +2003,7 @@ SEASTAR_TEST_CASE(compaction_correctness_with_partitioned_sstable_set) {
|
||||
std::for_each(all.begin(), all.end(), [] (auto& sst) { sst->set_sstable_level(1); });
|
||||
column_family_for_tests cf(env.manager(), s);
|
||||
auto close_cf = deferred_stop(cf);
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(all), cf->get_sstable_set(), default_priority_class(), 0, 0 /*std::numeric_limits<uint64_t>::max()*/),
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(all), default_priority_class(), 0, 0 /*std::numeric_limits<uint64_t>::max()*/),
|
||||
*cf, sst_gen).get0().new_sstables;
|
||||
};
|
||||
|
||||
@@ -2130,7 +2130,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
|
||||
cf->start();
|
||||
|
||||
dht::token_range_vector local_ranges = db.get_keyspace_local_ranges(ks_name);
|
||||
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, cf->get_sstable_set(), default_priority_class(), compaction_descriptor::default_level,
|
||||
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, default_priority_class(), compaction_descriptor::default_level,
|
||||
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(std::move(local_ranges)));
|
||||
auto ret = compact_sstables(std::move(descriptor), *cf, sst_gen).get0();
|
||||
|
||||
@@ -3017,7 +3017,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
|
||||
cf->start();
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
auto compact = [&, s] (std::vector<shared_sstable> all, auto replacer) -> std::vector<shared_sstable> {
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(all), cf->get_sstable_set(), default_priority_class(), 1, 0), *cf, sst_gen, replacer).get0().new_sstables;
|
||||
return compact_sstables(sstables::compaction_descriptor(std::move(all), default_priority_class(), 1, 0), *cf, sst_gen, replacer).get0().new_sstables;
|
||||
};
|
||||
auto make_insert = [&] (auto p) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(p.first)});
|
||||
@@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
|
||||
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
|
||||
// That's done to assert backlog will work for compaction that is finished and was stopped tracking.
|
||||
|
||||
auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen);
|
||||
auto fut = compact_sstables(sstables::compaction_descriptor(ssts, default_priority_class()), *cf, sst_gen);
|
||||
|
||||
// set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
@@ -3564,7 +3564,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
|
||||
try {
|
||||
// The goal is to have one sstable generated for each mutation to trigger the issue.
|
||||
auto max_sstable_size = 0;
|
||||
auto result = compact_sstables(sstables::compaction_descriptor(sstables, cf->get_sstable_set(), default_priority_class(), 0, max_sstable_size), *cf, sst_gen, replacer).get0().new_sstables;
|
||||
auto result = compact_sstables(sstables::compaction_descriptor(sstables, default_priority_class(), 0, max_sstable_size), *cf, sst_gen, replacer).get0().new_sstables;
|
||||
BOOST_REQUIRE_EQUAL(2, result.size());
|
||||
} catch (...) {
|
||||
// swallow exception
|
||||
@@ -3639,11 +3639,11 @@ SEASTAR_TEST_CASE(twcs_major_compaction_test) {
|
||||
|
||||
auto original_together = make_sstable_containing(sst_gen, {mut3, mut4});
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor({original_together}, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor({original_together}, default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
|
||||
auto original_apart = make_sstable_containing(sst_gen, {mut1, mut2});
|
||||
ret = compact_sstables(sstables::compaction_descriptor({original_apart}, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
ret = compact_sstables(sstables::compaction_descriptor({original_apart}, default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 2);
|
||||
});
|
||||
}
|
||||
@@ -3800,7 +3800,7 @@ SEASTAR_TEST_CASE(test_bug_6472) {
|
||||
forward_jump_clocks(std::chrono::hours(101));
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows,
|
||||
cf->get_sstable_set(),default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -3916,7 +3916,7 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
|
||||
};
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows,
|
||||
cf->get_sstable_set(), default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
// The real test here is that we don't assert() in
|
||||
// sstables::prepare_summary() with the compact_sstables() call above,
|
||||
// this is only here as a sanity check.
|
||||
@@ -4110,7 +4110,7 @@ SEASTAR_TEST_CASE(test_twcs_compaction_across_buckets) {
|
||||
sstables_spanning_many_windows.push_back(make_sstable_containing(sst_gen, {deletion_mut}));
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(std::move(sstables_spanning_many_windows),
|
||||
std::nullopt, default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
default_priority_class()), *cf, sst_gen, replacer_fn_no_op(), can_purge_tombstones::no).get0();
|
||||
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
assert_that(sstable_reader(ret.new_sstables[0], s, env.make_reader_permit()))
|
||||
@@ -4921,3 +4921,68 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
|
||||
run_controller_test(sstables::compaction_strategy_type::leveled, env);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
constexpr size_t all_files = 64;
|
||||
|
||||
auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type) {
|
||||
auto builder = schema_builder("tests", "test_compaction_strategy_cleanup_method")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(compaction_strategy_type);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto tokens = token_generation_for_shard(all_files, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
column_family_for_tests cf(env.manager(), s, tmp.path().string());
|
||||
auto close_cf = deferred_stop(cf);
|
||||
auto sst_gen = [&env, &cf, s, &tmp]() mutable {
|
||||
return env.make_sstable(s, tmp.path().string(), column_family_test::calculate_generation_for_new_table(*cf),
|
||||
sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
|
||||
auto make_mutation = [&](unsigned pkey_idx) {
|
||||
auto pkey = partition_key::from_exploded(*s, {to_bytes(tokens[pkey_idx].first)});
|
||||
mutation m(s, pkey);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), gc_clock::now().time_since_epoch().count());
|
||||
return m;
|
||||
};
|
||||
|
||||
std::vector<sstables::shared_sstable> candidates;
|
||||
candidates.reserve(all_files);
|
||||
for (auto i = 0; i < all_files; i++) {
|
||||
candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i)}));
|
||||
}
|
||||
|
||||
auto strategy = cf->get_compaction_strategy();
|
||||
auto jobs = strategy.get_cleanup_compaction_jobs(cf->as_table_state(), candidates);
|
||||
return std::make_pair(std::move(candidates), std::move(jobs));
|
||||
};
|
||||
|
||||
auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files) {
|
||||
size_t target_job_count = all_files / per_job_files;
|
||||
auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type);
|
||||
BOOST_REQUIRE(descriptors.size() == target_job_count);
|
||||
auto generations = boost::copy_range<std::unordered_set<unsigned>>(candidates | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::generation)));
|
||||
auto check_desc = [&] (const auto& desc) {
|
||||
BOOST_REQUIRE(desc.sstables.size() == per_job_files);
|
||||
for (auto& sst: desc.sstables) {
|
||||
BOOST_REQUIRE(generations.erase(sst->generation()));
|
||||
}
|
||||
};
|
||||
for (auto& desc : descriptors) {
|
||||
check_desc(desc);
|
||||
}
|
||||
};
|
||||
|
||||
// STCS: Check that 2 jobs are returned for a size tier containing 2x more files than max threshold.
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::size_tiered, 32);
|
||||
|
||||
// Default implementation: check that it will return one job for each file
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::null, 1);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ void run_sstable_resharding_test() {
|
||||
auto filter_fname = sstables::test(sst).filename(component_type::Filter);
|
||||
uint64_t bloom_filter_size_before = file_size(filter_fname).get0();
|
||||
|
||||
auto descriptor = sstables::compaction_descriptor({sst}, std::nullopt, default_priority_class(), 0, std::numeric_limits<uint64_t>::max());
|
||||
auto descriptor = sstables::compaction_descriptor({sst}, default_priority_class(), 0, std::numeric_limits<uint64_t>::max());
|
||||
descriptor.options = sstables::compaction_type_options::make_reshard();
|
||||
descriptor.creator = [&env, &cf, &tmp, version] (shard_id shard) mutable {
|
||||
// we need generation calculated by instance of cf at requested shard,
|
||||
|
||||
@@ -42,7 +42,7 @@ compaction_descriptor sstable_run_based_compaction_strategy_for_tests::get_sstab
|
||||
v.insert(v.end(), run.all().begin(), run.all().end());
|
||||
return std::move(v);
|
||||
});
|
||||
return sstables::compaction_descriptor(std::move(all), std::nullopt, default_priority_class(), 0, static_fragment_size_for_run);
|
||||
return sstables::compaction_descriptor(std::move(all), default_priority_class(), 0, static_fragment_size_for_run);
|
||||
}
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
@@ -156,11 +156,15 @@ token_generation_for_shard(unsigned tokens_to_generate, unsigned shard,
|
||||
return key_and_token_pair;
|
||||
}
|
||||
|
||||
future<compaction_result> compact_sstables(sstables::compaction_descriptor descriptor, replica::column_family& cf, std::function<shared_sstable()> creator, compaction_sstable_replacer_fn replacer) {
|
||||
future<compaction_result> compact_sstables(sstables::compaction_descriptor descriptor, replica::column_family& cf, std::function<shared_sstable()> creator, compaction_sstable_replacer_fn replacer,
|
||||
can_purge_tombstones can_purge) {
|
||||
descriptor.creator = [creator = std::move(creator)] (shard_id dummy) mutable {
|
||||
return creator();
|
||||
};
|
||||
descriptor.replacer = std::move(replacer);
|
||||
if (can_purge) {
|
||||
descriptor.enable_garbage_collection(cf.get_sstable_set());
|
||||
}
|
||||
auto cmt = compaction_manager_test(cf.get_compaction_manager());
|
||||
sstables::compaction_result ret;
|
||||
co_await cmt.run(descriptor.run_identifier, &cf, [&] (sstables::compaction_data& cdata) {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "sstables/index_reader.hh"
|
||||
#include "sstables/binary_search.hh"
|
||||
#include "sstables/writer.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "replica/memtable-sstable.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
@@ -352,8 +353,10 @@ private:
|
||||
void deregister_compaction(const sstables::compaction_data& c);
|
||||
};
|
||||
|
||||
using can_purge_tombstones = compaction_manager::can_purge_tombstones;
|
||||
future<compaction_result> compact_sstables(sstables::compaction_descriptor descriptor, replica::column_family& cf,
|
||||
std::function<shared_sstable()> creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op());
|
||||
std::function<shared_sstable()> creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op(),
|
||||
can_purge_tombstones can_purge = can_purge_tombstones::yes);
|
||||
|
||||
shared_sstable make_sstable_easy(test_env& env, const fs::path& path, flat_mutation_reader_v2 rd, sstable_writer_config cfg,
|
||||
int64_t generation = 1, const sstables::sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1);
|
||||
|
||||
@@ -175,7 +175,8 @@ public:
|
||||
|
||||
auto start = perf_sstable_test_env::now();
|
||||
|
||||
auto descriptor = sstables::compaction_descriptor(std::move(ssts), cf->get_sstable_set(), default_priority_class());
|
||||
auto descriptor = sstables::compaction_descriptor(std::move(ssts), default_priority_class());
|
||||
descriptor.enable_garbage_collection(cf->get_sstable_set());
|
||||
descriptor.creator = [sst_gen = std::move(sst_gen)] (unsigned dummy) mutable {
|
||||
return sst_gen();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user