mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge "Make TWCS' cleanup bucket aware" from Raphael S. Carvalho
"
Quoting patch 3/4:
"This continues the work in a69d98c3d0,
by implementing the cleanup method in TWCS to make it bucket aware.
Till now, the default impl was used which cleanups on file at a
time, starting from the smallest.
The cleanup strategy for TWCS is simple. It's simply calling the
size tiered cleanup method for each bucket, so there will be
one job for each tier in each window.
The next strategies to receive this improvement are LCS and ICS
(the latter one being only available in enterprise).
Refs #10097."
** Simply put, the goal is to reduce writeamp when performing cleanup
on a TWCS table, therefore reducing the operation time. **
tests: unit(dev).
"
* 'twcs_cleanup_bucket_aware/v1' of https://github.com/raphaelsc/scylla:
tests: sstable_compaction_test: Add test for TWCS' bucket-aware cleanup
compaction: TWCS: Implement cleanup method for bucket awareness
compaction: TWCS: change get_buckets() signature to work with const qualified functions
compaction_strategy: get_cleanup_compaction_jobs: accept candidates by value
This commit is contained in:
@@ -1193,7 +1193,7 @@ public:
|
||||
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
||||
, _cleanup_options(std::move(options))
|
||||
, _compacting(std::move(compacting))
|
||||
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates))
|
||||
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), std::move(candidates)))
|
||||
{
|
||||
// Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs
|
||||
// will have more space available released by previous jobs.
|
||||
|
||||
@@ -40,7 +40,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor> compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
std::vector<compaction_descriptor> compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {
|
||||
// The default implementation is suboptimal and causes the writeamp problem described issue in #10097.
|
||||
// The compaction strategy relying on it should strive to implement its own method, to make cleanup bucket aware.
|
||||
return boost::copy_range<std::vector<compaction_descriptor>>(candidates | boost::adaptors::transformed([] (const shared_sstable& sst) {
|
||||
@@ -673,8 +673,8 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(table_state&
|
||||
return _compaction_strategy_impl->get_major_compaction_job(table_s, std::move(candidates));
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor> compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, candidates);
|
||||
std::vector<compaction_descriptor> compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {
|
||||
return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, std::move(candidates));
|
||||
}
|
||||
|
||||
void compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
|
||||
compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<shared_sstable> candidates);
|
||||
|
||||
std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const;
|
||||
std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const;
|
||||
|
||||
// Some strategies may look at the compacted and resulting sstables to
|
||||
// get some useful information for subsequent compactions.
|
||||
|
||||
@@ -47,7 +47,7 @@ public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates);
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const;
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const;
|
||||
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
|
||||
@@ -268,7 +268,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor>
|
||||
size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const {
|
||||
size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {
|
||||
std::vector<compaction_descriptor> ret;
|
||||
const auto& schema = table_s.schema();
|
||||
unsigned max_threshold = schema->max_compaction_threshold();
|
||||
|
||||
@@ -125,7 +125,7 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, const std::vector<shared_sstable>& candidates) const override;
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
static int64_t estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
|
||||
int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options);
|
||||
|
||||
@@ -303,7 +303,7 @@ time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sst
|
||||
}
|
||||
|
||||
std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
|
||||
time_window_compaction_strategy::get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
|
||||
time_window_compaction_strategy::get_buckets(std::vector<shared_sstable> files, const time_window_compaction_strategy_options& options) {
|
||||
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
|
||||
|
||||
timestamp_type max_timestamp = 0;
|
||||
@@ -408,4 +408,14 @@ void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std::
|
||||
_estimated_remaining_tasks = n;
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor>
|
||||
time_window_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {
|
||||
std::vector<compaction_descriptor> ret;
|
||||
for (auto&& [_, sstables] : get_buckets(std::move(candidates), _options).first) {
|
||||
auto per_window_jobs = size_tiered_compaction_strategy(_stcs_options).get_cleanup_compaction_jobs(table_s, std::move(sstables));
|
||||
std::move(per_window_jobs.begin(), per_window_jobs.end(), std::back_inserter(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -88,6 +88,8 @@ public:
|
||||
public:
|
||||
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidates) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const override;
|
||||
private:
|
||||
static timestamp_type
|
||||
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
|
||||
@@ -123,7 +125,7 @@ public:
|
||||
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
|
||||
// and the right is the highest timestamp seen
|
||||
static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
|
||||
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options);
|
||||
get_buckets(std::vector<shared_sstable> files, const time_window_compaction_strategy_options& options);
|
||||
|
||||
std::vector<shared_sstable>
|
||||
newest_bucket(table_state& table_s, strategy_control& control, std::map<timestamp_type, std::vector<shared_sstable>> buckets,
|
||||
|
||||
@@ -4926,12 +4926,15 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
constexpr size_t all_files = 64;
|
||||
|
||||
auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type) {
|
||||
auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type,
|
||||
std::map<sstring, sstring> strategy_options = {},
|
||||
const api::timestamp_clock::duration step_base = 0ms) {
|
||||
auto builder = schema_builder("tests", "test_compaction_strategy_cleanup_method")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(compaction_strategy_type);
|
||||
builder.set_compaction_strategy_options(std::move(strategy_options));
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
@@ -4944,18 +4947,24 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
|
||||
auto make_mutation = [&](unsigned pkey_idx) {
|
||||
using namespace std::chrono;
|
||||
auto now = gc_clock::now().time_since_epoch() + duration_cast<microseconds>(seconds(tests::random::get_int(0, 3600*24)));
|
||||
auto next_timestamp = [&now] (microseconds step) mutable -> api::timestamp_type {
|
||||
return (now + step).count();
|
||||
};
|
||||
auto make_mutation = [&] (unsigned pkey_idx, api::timestamp_type ts) {
|
||||
auto pkey = partition_key::from_exploded(*s, {to_bytes(tokens[pkey_idx].first)});
|
||||
mutation m(s, pkey);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), gc_clock::now().time_since_epoch().count());
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), ts);
|
||||
return m;
|
||||
};
|
||||
|
||||
std::vector<sstables::shared_sstable> candidates;
|
||||
candidates.reserve(all_files);
|
||||
for (auto i = 0; i < all_files; i++) {
|
||||
candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i)}));
|
||||
auto current_step = duration_cast<microseconds>(step_base) * i;
|
||||
candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i, next_timestamp(current_step))}));
|
||||
}
|
||||
|
||||
auto strategy = cf->get_compaction_strategy();
|
||||
@@ -4963,9 +4972,10 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
return std::make_pair(std::move(candidates), std::move(jobs));
|
||||
};
|
||||
|
||||
auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files) {
|
||||
auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files, auto&&... args) {
|
||||
testlog.info("Running cleanup test for strategy type {}", compaction_strategy::name(compaction_strategy_type));
|
||||
size_t target_job_count = all_files / per_job_files;
|
||||
auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type);
|
||||
auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type, std::forward<decltype(args)>(args)...);
|
||||
BOOST_REQUIRE(descriptors.size() == target_job_count);
|
||||
auto generations = boost::copy_range<std::unordered_set<unsigned>>(candidates | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::generation)));
|
||||
auto check_desc = [&] (const auto& desc) {
|
||||
@@ -4984,5 +4994,12 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
|
||||
// Default implementation: check that it will return one job for each file
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::null, 1);
|
||||
|
||||
// TWCS: Check that it will return one job for each time window
|
||||
std::map<sstring, sstring> twcs_opts = {
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
|
||||
};
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::time_window, 1, std::move(twcs_opts), 1h);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user