Merge 'Cleanup sstables in resharding and other compaction types' from Benny Halevy

This series extends sstable cleanup to resharding and other (offstrategy, major, and regular) compaction types so to:
* cleanup uploaded sstables (#11933)
* cleanup staging sstables after they are moved back to the main directory and become eligible for compaction (#9559)

When perform_cleanup is called, all sstables are scanned, and those that require cleanup are marked as such, and are added for tracking to table_state::cleanup_sstable_set.  They are removed from that set once released by compaction.
Along with that sstables set, we keep the owned_ranges_ptr used by cleanup in the table_state to allow other compaction types (offstrategy, major, or regular) to cleanup those sstables that are marked as require_cleanup and that were skipped by cleanup compaction for either being in the maintenance set (requiring offstrategy compaction) or in staging.

Resharding is using a more straightforward mechanism of passing the owned token ranges when resharding uploaded sstables and using it to detect sstable that require cleanup, now done as piggybacked on resharding compaction.

Closes #12422

* github.com:scylladb/scylladb:
  table: discard_sstables: update_sstable_cleanup_state when deleting sstables
  compaction_manager: compact_sstables: retrieve owned ranges if required
  sstables: add a printer for shared_sstable
  compaction_manager: keep owned_ranges_ptr in compaction_state
  compaction_manager: perform_cleanup: keep sstables in compaction_state::sstables_requiring_cleanup
  compaction: refactor compaction_state out of compaction_manager
  compaction: refactor compaction_fwd.hh out of compaction_descriptor.hh
  compaction_manager: compacting_sstable_registration: keep a ref to the compaction_state
  compaction_manager: refactor get_candidates
  compaction_manager: get_candidates: mark as const
  table, compaction_manager: add requires_cleanup
  sstable_set: add for_each_sstable_until
  distributed_loader: reshard: update sstable cleanup state
  table, compaction_manager: add update_sstable_cleanup_state
  compaction_manager: needs_cleanup: delete unused schema param
  compaction_manager: perform_cleanup: disallow empty sorted_owened_ranges
  distributed_loader: reshard: consider sstables for cleanup
  distributed_loader: process_upload_dir: pass owned_ranges_ptr to reshard
  distributed_loader: reshard: add optional owned_ranges_ptr param
  distributed_loader: reshard: get a ref to table_state
  distributed_loader: reshard: capture creator by ref
  distributed_loader: reshard: reserve num_jobs buckets
  compaction: move owned ranges filtering to base class
  compaction: move owned_ranges into descriptor
This commit is contained in:
Botond Dénes
2023-04-11 14:52:29 +03:00
19 changed files with 432 additions and 183 deletions

View File

@@ -424,11 +424,7 @@ public:
}
}
formatted_sstables_list& operator+=(const shared_sstable& sst) {
if (_include_origin) {
_ssts.emplace_back(format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()));
} else {
_ssts.emplace_back(format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()));
}
_ssts.emplace_back(to_string(sst, _include_origin));
return *this;
}
friend std::ostream& operator<<(std::ostream& os, const formatted_sstables_list& lst);
@@ -475,6 +471,9 @@ protected:
// used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys.
std::optional<sstable_set::incremental_selector> _selector;
std::unordered_set<shared_sstable> _compacting_for_max_purgeable_func;
// optional owned_ranges vector for cleanup;
owned_ranges_ptr _owned_ranges = {};
std::optional<dht::incremental_owned_ranges_checker> _owned_ranges_checker;
// Garbage collected sstables that are sealed but were not added to SSTable set yet.
std::vector<shared_sstable> _unused_garbage_collected_sstables;
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
@@ -503,6 +502,8 @@ protected:
, _sstable_set(std::move(descriptor.all_sstables_snapshot))
, _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional<sstable_set::incremental_selector>{})
, _compacting_for_max_purgeable_func(std::unordered_set<shared_sstable>(_sstables.begin(), _sstables.end()))
, _owned_ranges(std::move(descriptor.owned_ranges))
, _owned_ranges_checker(_owned_ranges ? std::optional<dht::incremental_owned_ranges_checker>(*_owned_ranges) : std::nullopt)
{
for (auto& sst : _sstables) {
_stats_collector.update(sst->get_encoding_stats_for_compaction());
@@ -621,6 +622,21 @@ protected:
bool enable_garbage_collected_sstable_writer() const noexcept {
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max();
}
flat_mutation_reader_v2::filter make_partition_filter() const {
return [this] (const dht::decorated_key& dk) {
#ifdef SEASTAR_DEBUG
// sstables should never be shared with other shards at this point.
assert(dht::shard_of(*_schema, dk.token()) == this_shard_id());
#endif
if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) {
log_trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}
return true;
};
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -634,6 +650,17 @@ private:
// Default range sstable reader that will only return mutation that belongs to current shard.
virtual flat_mutation_reader_v2 make_sstable_reader() const = 0;
// Make a filtering reader if needed
// FIXME: the sstable reader itself should be pass the owned ranges
// so it can skip over the disowned ranges efficiently using the index.
// Ref https://github.com/scylladb/scylladb/issues/12998
flat_mutation_reader_v2 setup_sstable_reader() const {
if (!_owned_ranges_checker) {
return make_sstable_reader();
}
return make_filtering_reader(make_sstable_reader(), make_partition_filter());
}
virtual sstables::sstable_set make_sstable_set_for_input() const {
return _table_s.get_compaction_strategy().make_sstable_set(_schema);
}
@@ -703,7 +730,7 @@ private:
});
});
const auto& gc_state = _table_s.get_tombstone_gc_state();
return consumer(make_compacting_reader(make_sstable_reader(), compaction_time, max_purgeable_func(), gc_state));
return consumer(make_compacting_reader(setup_sstable_reader(), compaction_time, max_purgeable_func(), gc_state));
}
future<> consume() {
@@ -739,7 +766,7 @@ private:
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(make_sstable_reader());
return consumer(setup_sstable_reader());
}
virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) {
@@ -1175,8 +1202,6 @@ private:
};
class cleanup_compaction final : public regular_compaction {
owned_ranges_ptr _owned_ranges;
dht::incremental_owned_ranges_checker _owned_ranges_checker;
private:
// Called in a seastar thread
dht::partition_range_vector
@@ -1199,22 +1224,10 @@ protected:
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
}
private:
cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, owned_ranges_ptr owned_ranges)
: regular_compaction(table_s, std::move(descriptor), cdata)
, _owned_ranges(std::move(owned_ranges))
, _owned_ranges_checker(*_owned_ranges)
{
}
public:
cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::cleanup opts)
: cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {}
cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::upgrade opts)
: cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {}
flat_mutation_reader_v2 make_sstable_reader() const override {
return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter());
cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata)
: regular_compaction(table_s, std::move(descriptor), cdata)
{
}
std::string_view report_start_desc() const override {
@@ -1224,21 +1237,6 @@ public:
std::string_view report_finish_desc() const override {
return "Cleaned";
}
flat_mutation_reader_v2::filter make_partition_filter() const {
return [this] (const dht::decorated_key& dk) {
#ifdef SEASTAR_DEBUG
// sstables should never be shared with other shards at this point.
assert(dht::shard_of(*_schema, dk.token()) == this_shard_id());
#endif
if (!_owned_ranges_checker.belongs_to_current_node(dk.token())) {
log_trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}
return true;
};
}
};
class scrub_compaction final : public regular_compaction {
@@ -1710,11 +1708,11 @@ static std::unique_ptr<compaction> make_compaction(table_state& table_s, sstable
std::unique_ptr<compaction> operator()(compaction_type_options::regular) {
return std::make_unique<regular_compaction>(table_s, std::move(descriptor), cdata);
}
std::unique_ptr<compaction> operator()(compaction_type_options::cleanup options) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata, std::move(options));
std::unique_ptr<compaction> operator()(compaction_type_options::cleanup) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata);
}
std::unique_ptr<compaction> operator()(compaction_type_options::upgrade options) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata, std::move(options));
std::unique_ptr<compaction> operator()(compaction_type_options::upgrade) {
return std::make_unique<cleanup_compaction>(table_s, std::move(descriptor), cdata);
}
std::unique_ptr<compaction> operator()(compaction_type_options::scrub scrub_options) {
return std::make_unique<scrub_compaction>(table_s, std::move(descriptor), cdata, scrub_options);

View File

@@ -19,16 +19,7 @@
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "compaction_weight_registration.hh"
namespace compaction {
using owned_ranges_ptr = lw_shared_ptr<const dht::token_range_vector>;
inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) {
return make_lw_shared<const dht::token_range_vector>(std::move(ranges));
}
} // namespace compaction
#include "compaction_fwd.hh"
namespace sstables {
@@ -64,10 +55,8 @@ public:
struct regular {
};
struct cleanup {
compaction::owned_ranges_ptr owned_ranges;
};
struct upgrade {
compaction::owned_ranges_ptr owned_ranges;
};
struct scrub {
enum class mode {
@@ -112,12 +101,12 @@ public:
return compaction_type_options(regular{});
}
static compaction_type_options make_cleanup(compaction::owned_ranges_ptr owned_ranges) {
return compaction_type_options(cleanup{std::move(owned_ranges)});
static compaction_type_options make_cleanup() {
return compaction_type_options(cleanup{});
}
static compaction_type_options make_upgrade(compaction::owned_ranges_ptr owned_ranges) {
return compaction_type_options(upgrade{std::move(owned_ranges)});
static compaction_type_options make_upgrade() {
return compaction_type_options(upgrade{});
}
static compaction_type_options make_scrub(scrub::mode mode) {
@@ -160,6 +149,8 @@ struct compaction_descriptor {
// The options passed down to the compaction code.
// This also selects the kind of compaction to do.
compaction_type_options options = compaction_type_options::make_regular();
// If engaged, compaction will cleanup the input sstables by skipping non-owned ranges.
compaction::owned_ranges_ptr owned_ranges;
compaction_sstable_creator_fn creator;
compaction_sstable_replacer_fn replacer;
@@ -179,12 +170,14 @@ struct compaction_descriptor {
int level = default_level,
uint64_t max_sstable_bytes = default_max_sstable_bytes,
run_id run_identifier = run_id::create_random_id(),
compaction_type_options options = compaction_type_options::make_regular())
compaction_type_options options = compaction_type_options::make_regular(),
compaction::owned_ranges_ptr owned_ranges_ = {})
: sstables(std::move(sstables))
, level(level)
, max_sstable_bytes(max_sstable_bytes)
, run_identifier(run_identifier)
, options(options)
, owned_ranges(std::move(owned_ranges_))
, io_priority(io_priority)
{}

View File

@@ -0,0 +1,26 @@
/*
* Copyright (C) 2023-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "dht/i_partitioner.hh"
namespace compaction {
class table_state;
class strategy_control;
struct compaction_state;
using owned_ranges_ptr = lw_shared_ptr<const dht::token_range_vector>;
inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) {
return make_lw_shared<const dht::token_range_vector>(std::move(ranges));
}
} // namespace compaction

View File

@@ -32,14 +32,16 @@ using namespace compaction;
class compacting_sstable_registration {
compaction_manager& _cm;
compaction::compaction_state& _cs;
std::unordered_set<sstables::shared_sstable> _compacting;
public:
explicit compacting_sstable_registration(compaction_manager& cm) noexcept
explicit compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs) noexcept
: _cm(cm)
, _cs(cs)
{ }
compacting_sstable_registration(compaction_manager& cm, std::vector<sstables::shared_sstable> compacting)
: compacting_sstable_registration(cm)
compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs, std::vector<sstables::shared_sstable> compacting)
: compacting_sstable_registration(cm, cs)
{
register_compacting(compacting);
}
@@ -57,6 +59,7 @@ public:
compacting_sstable_registration(compacting_sstable_registration&& other) noexcept
: _cm(other._cm)
, _cs(other._cs)
, _compacting(std::move(other._compacting))
{ }
@@ -77,6 +80,10 @@ public:
_cm.deregister_compacting_sstables(sstables.begin(), sstables.end());
for (const auto& sst : sstables) {
_compacting.erase(sst);
_cs.sstables_requiring_cleanup.erase(sst);
}
if (_cs.sstables_requiring_cleanup.empty()) {
_cs.owned_ranges_ptr = nullptr;
}
}
};
@@ -205,9 +212,15 @@ std::vector<sstables::shared_sstable> in_strategy_sstables(table_state& table_s)
}));
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(table_state& t) {
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(table_state& t) const {
return get_candidates(t, *t.main_sstable_set().all());
}
template <std::ranges::range Range>
requires std::convertible_to<std::ranges::range_value_t<Range>, sstables::shared_sstable>
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(table_state& t, const Range& sstables) const {
std::vector<sstables::shared_sstable> candidates;
candidates.reserve(t.main_sstable_set().size());
candidates.reserve(sstables.size());
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
// selected for compaction, which could potentially result in wrong behavior.
auto partial_run_identifiers = boost::copy_range<std::unordered_set<sstables::run_id>>(_tasks
@@ -215,7 +228,10 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(table_s
| boost::adaptors::transformed(std::mem_fn(&compaction_task_executor::output_run_id)));
// Filter out sstables that are being compacted.
for (auto& sst : in_strategy_sstables(t)) {
for (const auto& sst : sstables) {
if (!sstables::is_eligible_for_compaction(sst)) {
continue;
}
if (_compacting_sstables.contains(sst)) {
continue;
}
@@ -261,7 +277,7 @@ private:
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
compaction_manager::compaction_state& compaction_manager::get_compaction_state(table_state* t) {
compaction::compaction_state& compaction_manager::get_compaction_state(table_state* t) {
try {
return _compaction_state.at(t);
} catch (std::out_of_range&) {
@@ -345,6 +361,24 @@ future<sstables::compaction_result> compaction_task_executor::compact_sstables(s
}
};
// retrieve owned_ranges if_required
if (!descriptor.owned_ranges) {
std::vector<sstables::shared_sstable> sstables_requiring_cleanup;
const auto& cs = _cm.get_compaction_state(_compacting_table);
for (const auto& sst : descriptor.sstables) {
if (cs.sstables_requiring_cleanup.contains(sst)) {
sstables_requiring_cleanup.emplace_back(sst);
}
}
if (!sstables_requiring_cleanup.empty()) {
cmlog.info("The following SSTables require cleaned up in this compaction: {}", sstables_requiring_cleanup);
if (!cs.owned_ranges_ptr) {
on_internal_error_noexcept(cmlog, "SSTables require cleanup but compaction state has null owned ranges");
}
descriptor.owned_ranges = cs.owned_ranges_ptr;
}
}
co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t);
}
future<> compaction_task_executor::update_history(table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
@@ -413,7 +447,7 @@ protected:
table_state* t = _compacting_table;
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t));
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables);
auto release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
@@ -568,7 +602,7 @@ inline compaction_controller make_compaction_controller(const compaction_manager
return compaction_controller(csg, static_shares, 250ms, std::move(fn));
}
compaction_manager::compaction_state::~compaction_state() {
compaction::compaction_state::~compaction_state() {
compaction_done.broken();
}
@@ -1011,7 +1045,7 @@ protected:
_cm.postpone_compaction_for_table(&t);
co_return std::nullopt;
}
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), descriptor.sstables);
auto weight_r = compaction_weight_registration(&_cm, weight);
auto release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
@@ -1268,15 +1302,17 @@ namespace compaction {
class rewrite_sstables_compaction_task_executor : public sstables_task_executor {
sstables::compaction_type_options _options;
owned_ranges_ptr _owned_ranges_ptr;
compacting_sstable_registration _compacting;
compaction_manager::can_purge_tombstones _can_purge;
public:
rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options,
rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr,
std::vector<sstables::shared_sstable> sstables, compacting_sstable_registration compacting,
compaction_manager::can_purge_tombstones can_purge)
: sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
, _options(std::move(options))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
, _compacting(std::move(compacting))
, _can_purge(can_purge)
{}
@@ -1308,7 +1344,7 @@ private:
auto run_identifier = sst->run_identifier();
// FIXME: this compaction should run with maintenance priority.
auto descriptor = sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(),
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options);
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options, _owned_ranges_ptr);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
auto release_exhausted = [this] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
@@ -1343,7 +1379,7 @@ private:
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction_task_executor>
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
co_return std::nullopt;
}
@@ -1353,7 +1389,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
// in the re-write, we need to barrier out any previously running
// compaction.
std::vector<sstables::shared_sstable> sstables;
compacting_sstable_registration compacting(*this);
compacting_sstable_registration compacting(*this, get_compaction_state(&t));
co_await run_with_compaction_disabled(t, [&sstables, &compacting, get_func = std::move(get_func)] () -> future<> {
// Getting sstables and registering them as compacting must be atomic, to avoid a race condition where
// regular compaction runs in between and picks the same files.
@@ -1367,11 +1403,11 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
return a->data_size() > b->data_size();
});
});
co_return co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
co_return co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
}
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task_executor>(t, std::move(options), std::move(get_func), can_purge);
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task_executor>(t, std::move(options), std::move(owned_ranges_ptr), std::move(get_func), can_purge);
}
namespace compaction {
@@ -1452,13 +1488,15 @@ namespace compaction {
class cleanup_sstables_compaction_task_executor : public compaction_task_executor {
const sstables::compaction_type_options _cleanup_options;
owned_ranges_ptr _owned_ranges_ptr;
compacting_sstable_registration _compacting;
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
public:
cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options,
cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr,
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
: compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
, _cleanup_options(std::move(options))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
, _compacting(std::move(compacting))
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates)))
{
@@ -1479,6 +1517,7 @@ protected:
while (!_pending_cleanup_jobs.empty() && can_proceed()) {
auto active_job = std::move(_pending_cleanup_jobs.back());
active_job.options = _cleanup_options;
active_job.owned_ranges = _owned_ranges_ptr;
co_await run_cleanup_job(std::move(active_job));
_pending_cleanup_jobs.pop_back();
_cm._stats.pending_tasks--;
@@ -1523,8 +1562,7 @@ private:
}
bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& sorted_owned_ranges,
schema_ptr s) {
const dht::token_range_vector& sorted_owned_ranges) {
auto first_token = sst->get_first_decorated_key().token();
auto last_token = sst->get_last_decorated_key().token();
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
@@ -1544,6 +1582,22 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
return true;
}
bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr) {
auto& cs = get_compaction_state(&t);
if (owned_ranges_ptr && needs_cleanup(sst, *owned_ranges_ptr)) {
cs.sstables_requiring_cleanup.insert(sst);
return true;
} else {
cs.sstables_requiring_cleanup.erase(sst);
return false;
}
}
bool compaction_manager::requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const {
const auto& cs = get_compaction_state(&t);
return cs.sstables_requiring_cleanup.contains(sst);
}
future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) {
auto check_for_cleanup = [this, &t] {
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
@@ -1555,20 +1609,33 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
t.schema()->ks_name(), t.schema()->cf_name()));
}
if (sorted_owned_ranges->empty()) {
throw std::runtime_error("cleanup request failed: sorted_owned_ranges is empty");
}
// Called with compaction_disabled
auto get_sstables = [this, &t, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto schema = t.schema();
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = get_candidates(t);
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return sorted_owned_ranges->empty() || needs_cleanup(sst, *sorted_owned_ranges, schema);
});
return sstables;
auto update_sstables_cleanup_state = [&] (const sstables::sstable_set& set) {
set.for_each_sstable([&] (const sstables::shared_sstable& sst) {
update_sstable_cleanup_state(t, sst, sorted_owned_ranges);
seastar::thread::maybe_yield();
});
};
update_sstables_cleanup_state(t.main_sstable_set());
update_sstables_cleanup_state(t.maintenance_sstable_set());
// Some sstables may remain in sstables_requiring_cleanup
// for later processing if they can't be cleaned up right now.
// They are erased from sstables_requiring_cleanup by compacting.release_compacting
auto& cs = get_compaction_state(&t);
if (!cs.sstables_requiring_cleanup.empty()) {
cs.owned_ranges_ptr = std::move(sorted_owned_ranges);
}
return get_candidates(t, cs.sstables_requiring_cleanup);
});
};
co_await perform_task_on_all_files<cleanup_sstables_compaction_task_executor>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
co_await perform_task_on_all_files<cleanup_sstables_compaction_task_executor>(t, sstables::compaction_type_options::make_cleanup(), std::move(sorted_owned_ranges),
std::move(get_sstables));
}
@@ -1597,7 +1664,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(std::move(sorted_owned_ranges)), std::move(get_sstables)).discard_result();
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables)).discard_result();
}
// Submit a table to be scrubbed and wait for its termination.
@@ -1606,7 +1673,8 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(t);
}
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [&t, opts] {
owned_ranges_ptr owned_ranges_ptr = {};
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] {
auto all_sstables = get_all_sstables(t);
std::vector<sstables::shared_sstable> sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(all_sstables
| boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) {
@@ -1627,7 +1695,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
}, can_purge_tombstones::no);
}
compaction_manager::compaction_state::compaction_state(table_state& t)
compaction::compaction_state::compaction_state(table_state& t)
: backlog_tracker(t.get_compaction_strategy().make_backlog_tracker())
{
}

View File

@@ -22,6 +22,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
#include "log.hh"
#include "sstables/shared_sstable.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/updateable_value.hh"
#include "utils/serialized_action.hh"
@@ -33,6 +34,7 @@
#include "compaction_weight_registration.hh"
#include "compaction_backlog_manager.hh"
#include "compaction/task_manager_module.hh"
#include "compaction_state.hh"
#include "strategy_control.hh"
#include "backlog_controller.hh"
#include "seastarx.hh"
@@ -83,31 +85,6 @@ public:
utils::updateable_value<float> static_shares = utils::updateable_value<float>(0);
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
};
private:
struct compaction_state {
// Used both by compaction tasks that refer to the compaction_state
// and by any function running under run_with_compaction_disabled().
seastar::gate gate;
// Prevents table from running major and minor compaction at the same time.
rwlock lock;
// Raised by any function running under run_with_compaction_disabled();
long compaction_disabled_counter = 0;
// Signaled whenever a compaction task completes.
condition_variable compaction_done;
compaction_backlog_tracker backlog_tracker;
explicit compaction_state(table_state& t);
compaction_state(compaction_state&&) = delete;
~compaction_state();
bool compaction_disabled() const noexcept {
return compaction_disabled_counter > 0;
}
};
public:
class can_purge_tombstones_tag;
@@ -203,7 +180,11 @@ private:
void deregister_weight(int weight);
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
std::vector<sstables::shared_sstable> get_candidates(compaction::table_state& t);
std::vector<sstables::shared_sstable> get_candidates(compaction::table_state& t) const;
template <std::ranges::range Range>
requires std::convertible_to<std::ranges::range_value_t<Range>, sstables::shared_sstable>
std::vector<sstables::shared_sstable> get_candidates(table_state& t, const Range& sstables) const;
template <typename Iterator, typename Sentinel>
requires std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>
@@ -216,6 +197,9 @@ private:
// gets the table's compaction state
// throws std::out_of_range exception if not found.
compaction_state& get_compaction_state(compaction::table_state* t);
const compaction_state& get_compaction_state(compaction::table_state* t) const {
return const_cast<compaction_manager*>(this)->get_compaction_state(t);
}
// Return true if compaction manager is enabled and
// table still exists and compaction is not disabled for the table.
@@ -236,9 +220,9 @@ private:
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction::compaction_task_executor>
future<compaction_stats_opt> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<compaction_stats_opt> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, Args... args);
future<compaction_stats_opt> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<compaction_stats_opt> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop() noexcept;
@@ -343,7 +327,7 @@ public:
class compaction_reenabler {
compaction_manager& _cm;
compaction::table_state* _table;
compaction_manager::compaction_state& _compaction_state;
compaction::compaction_state& _compaction_state;
gate::holder _holder;
public:
@@ -356,7 +340,7 @@ public:
return _table;
}
const compaction_manager::compaction_state& compaction_state() const noexcept {
const compaction::compaction_state& compaction_state() const noexcept {
return _compaction_state;
}
};
@@ -420,6 +404,12 @@ public:
return _tombstone_gc_state;
};
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr);
// checks if the sstable is in the respective compaction_state.sstables_requiring_cleanup set.
bool requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const;
friend class compacting_sstable_registration;
friend class compaction_weight_registration;
friend class compaction_manager_test;
@@ -457,7 +447,7 @@ public:
protected:
compaction_manager& _cm;
::compaction::table_state* _compacting_table = nullptr;
compaction_manager::compaction_state& _compaction_state;
compaction::compaction_state& _compaction_state;
sstables::compaction_data _compaction_data;
state _state = state::none;
@@ -562,7 +552,7 @@ std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_exe
}
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges);
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::table_state& table_s);

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/core/condition-variable.hh>
#include "seastarx.hh"
#include "compaction/compaction_fwd.hh"
#include "compaction/compaction_backlog_manager.hh"
namespace compaction {
struct compaction_state {
// Used both by compaction tasks that refer to the compaction_state
// and by any function running under run_with_compaction_disabled().
seastar::gate gate;
// Prevents table from running major and minor compaction at the same time.
seastar::rwlock lock;
// Raised by any function running under run_with_compaction_disabled();
long compaction_disabled_counter = 0;
// Signaled whenever a compaction task completes.
condition_variable compaction_done;
compaction_backlog_tracker backlog_tracker;
std::unordered_set<sstables::shared_sstable> sstables_requiring_cleanup;
owned_ranges_ptr owned_ranges_ptr;
explicit compaction_state(table_state& t);
compaction_state(compaction_state&&) = delete;
~compaction_state();
bool compaction_disabled() const noexcept {
return compaction_disabled_counter > 0;
}
};
} // namespace compaction_manager

View File

@@ -15,11 +15,6 @@
#include "compaction_descriptor.hh"
#include "tombstone_gc.hh"
namespace compaction {
class table_state;
class strategy_control;
}
namespace sstables {
class sstable_set_impl;

View File

@@ -9,9 +9,9 @@
#pragma once
namespace compaction {
#include "compaction/compaction_fwd.hh"
class table_state;
namespace compaction {
// Used by manager to set goals and constraints on compaction strategies
class strategy_control {

View File

@@ -11,13 +11,10 @@
#include "compaction/compaction_backlog_manager.hh"
#include "compaction/compaction_strategy_state.hh"
#include "sstables/sstable_set.hh"
#include "compaction/compaction_fwd.hh"
#pragma once
namespace compaction {
class table_state;
}
namespace replica {
using enable_backlog_tracker = bool_class<class enable_backlog_tracker_tag>;

View File

@@ -63,6 +63,7 @@
#include "db/rate_limiter.hh"
#include "db/operation_type.hh"
#include "utils/serialized_action.hh"
#include "compaction/compaction_fwd.hh"
#include "compaction/compaction_manager.hh"
#include "utils/disk-error-handler.hh"
#include "rust/wasmtime_bindings.hh"
@@ -98,10 +99,6 @@ class directory_semaphore;
}
namespace compaction {
class table_state;
}
namespace ser {
template<typename T>
class serializer;
@@ -549,7 +546,7 @@ private:
// Select a compaction group from a given key.
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept;
// Select a compaction group from a given sstable based on its token range.
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept;
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept;
// Returns a list of all compaction groups.
const std::vector<std::unique_ptr<compaction_group>>& compaction_groups() const noexcept;
// Safely iterate through compaction groups, while performing async operations on them.
@@ -1125,6 +1122,15 @@ public:
// Safely iterate through table states, while performing async operations on them.
future<> parallel_foreach_table_state(std::function<future<>(compaction::table_state&)> action);
// Add sst to or remove it from the sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr);
// Returns true if the sstable requries cleanup.
bool requires_cleanup(const sstables::shared_sstable& sst) const;
// Returns true if any of the sstables requries cleanup.
bool requires_cleanup(const sstables::sstable_set& set) const;
friend class compaction_group;
};

View File

@@ -145,11 +145,11 @@ struct reshard_shard_descriptor {
}
};
// Collects shared SSTables from all shards and returns a vector containing them all.
// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all.
// This function assumes that the list of SSTables can be fairly big so it is careful to
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
future<sstables::sstable_directory::sstable_open_info_vector>
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir) {
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto info_vec = sstables::sstable_directory::sstable_open_info_vector();
// We want to make sure that each distributed object reshards about the same amount of data.
@@ -161,10 +161,28 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir) {
auto coordinator = this_shard_id();
// We will first move all of the foreign open info to temporary storage so that we can sort
// them. We want to distribute bigger sstables first.
co_await dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) -> future<> {
co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
auto shared_sstables = d.retrieve_shared_sstables();
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (owned_ranges_ptr) {
auto& table = db.local().find_column_family(ks_name, table_name);
co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> {
if (table.update_sstable_cleanup_state(sst, owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
}
});
}
if (shared_sstables.empty() && need_cleanup.empty()) {
co_return;
}
co_await smp::submit_to(coordinator, [&] () -> future<> {
for (auto& info : d.retrieve_shared_sstables()) {
info_vec.push_back(std::move(info));
info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size());
for (auto& info : shared_sstables) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
for (auto& info : need_cleanup) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
});
@@ -207,7 +225,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
sstables::compaction_sstable_creator_fn creator, io_priority_class iop)
sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
@@ -221,9 +239,14 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job;
auto sstables_per_job = shared_info.size() / num_jobs;
std::vector<std::vector<sstables::shared_sstable>> buckets(1);
co_await coroutine::parallel_for_each(shared_info, [&dir, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) -> future<> {
std::vector<std::vector<sstables::shared_sstable>> buckets;
buckets.reserve(num_jobs);
buckets.emplace_back();
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
auto sst = co_await dir.load_foreign_sstable(info);
if (owned_ranges_ptr) {
table.update_sstable_cleanup_state(sst, owned_ranges_ptr);
}
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
@@ -233,13 +256,15 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
co_await coroutine::parallel_for_each(buckets, [&dir, &table, creator = std::move(creator), iop] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&dir, &table, creator, &sstlist, iop] (sstables::compaction_data& info) -> future<> {
auto& t = table.as_table_state();
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> {
sstables::compaction_descriptor desc(sstlist, iop);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = std::move(creator);
desc.creator = creator;
desc.owned_ranges = owned_ranges_ptr;
auto result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state());
auto result = co_await sstables::compact_sstables(std::move(desc), info, t);
// input sstables are moved, to guarantee their resources are released once we're done
// resharding them.
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
@@ -249,7 +274,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vector<reshard_shard_descriptor> reshard_jobs,
sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator,
io_priority_class iop) {
io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) {
uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
@@ -262,7 +287,12 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> {
auto& table = db.local().find_column_family(ks_name, table_name);
auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*owned_ranges_ptr);
}
co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop, std::move(local_owned_ranges_ptr));
co_await d.move_foreign_sstables(dir);
}));
@@ -276,10 +306,10 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
// - The second part calls each shard's distributed object to reshard the SSTables they were
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop) {
auto all_jobs = co_await collect_all_shared_sstables(dir);
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto all_jobs = co_await collect_all_shared_sstables(dir, db, ks_name, table_name, owned_ranges_ptr);
auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));
co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop);
co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop, std::move(owned_ranges_ptr));
}
future<sstables::sstable::version_types>
@@ -451,9 +481,20 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
return sstables::generation_type(gen);
};
// Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction.
// Note that needs_cleanup() is inaccurate and may return false positives,
// maybe triggerring resharding+cleanup unnecessarily for some sstables.
// But this is resharding on refresh (sstable loading via upload dir),
// which will usually require resharding anyway.
//
// FIXME: take multiple compaction groups into account
// - segregate resharded tables into compaction groups
// - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction
// so that cleanup can be considered per compaction group
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks));
reshard(directory, db, ks, cf, [&] (shard_id shard) mutable {
return make_sstable(*global_table, upload, new_generation_for_shard(shard));
}, service::get_local_streaming_priority()).get();
}, service::get_local_streaming_priority(), owned_ranges_ptr).get();
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [&] (shard_id shard) {
return make_sstable(*global_table, upload, new_generation_for_shard(shard));

View File

@@ -69,7 +69,7 @@ class distributed_loader {
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstables::reshape_mode mode,
sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function<bool (const sstables::shared_sstable&)> filter, io_priority_class iop);
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator,
io_priority_class iop);
io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr);
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring cf_name);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,

View File

@@ -557,7 +557,7 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const
return compaction_group_for_token(dht::get_token(*s, key));
}
compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept {
compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept {
// FIXME: a sstable can belong to more than one group, change interface to reflect that.
return compaction_group_for_token(sst->get_first_decorated_key().token());
}
@@ -1910,11 +1910,12 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
tlogger.debug("cleaning out row cache");
}));
rebuild_statistics();
co_await coroutine::parallel_for_each(p->remove, [p] (pruner::removed_sstable& r) {
co_await coroutine::parallel_for_each(p->remove, [this, p] (pruner::removed_sstable& r) -> future<> {
if (r.enable_backlog_tracker) {
remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst);
}
return sstables::sstable_directory::delete_atomically({r.sst});
co_await sstables::sstable_directory::delete_atomically({r.sst});
update_sstable_cleanup_state(r.sst, {});
});
co_return p->rp;
}
@@ -2835,4 +2836,22 @@ table::as_data_dictionary() const {
return _impl.wrap(*this);
}
bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr) {
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
auto& cg = compaction_group_for_sstable(sst);
return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, std::move(owned_ranges_ptr));
}
bool table::requires_cleanup(const sstables::shared_sstable& sst) const {
auto& cg = compaction_group_for_sstable(sst);
return get_compaction_manager().requires_cleanup(cg.as_table_state(), sst);
}
bool table::requires_cleanup(const sstables::sstable_set& set) const {
return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) {
auto& cg = compaction_group_for_sstable(sst);
return stop_iteration(_compaction_manager.requires_cleanup(cg.as_table_state(), sst));
}));
}
} // namespace replica

View File

@@ -12,6 +12,9 @@
#include <utility>
#include <functional>
#include <unordered_set>
#include <fmt/format.h>
#include <seastar/core/shared_ptr.hh>
namespace sstables {
@@ -35,6 +38,22 @@ namespace sstables {
using shared_sstable = seastar::lw_shared_ptr<sstable>;
using sstable_list = std::unordered_set<shared_sstable>;
std::string to_string(const shared_sstable& sst, bool include_origin = true);
} // namespace sstables
template <>
struct fmt::formatter<sstables::shared_sstable> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const sstables::shared_sstable& sst, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "{}", sstables::to_string(sst));
}
};
namespace std {
inline std::ostream& operator<<(std::ostream& os, const sstables::shared_sstable& sst) {
return os << fmt::format("{}", sst);
}
} // namespace std

View File

@@ -162,7 +162,14 @@ sstable_set::all() const {
}
void sstable_set::for_each_sstable(std::function<void(const shared_sstable&)> func) const {
return _impl->for_each_sstable(std::move(func));
_impl->for_each_sstable_until([func = std::move(func)] (const shared_sstable& sst) {
func(sst);
return stop_iteration::no;
});
}
stop_iteration sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
return _impl->for_each_sstable_until(std::move(func));
}
void
@@ -315,10 +322,13 @@ lw_shared_ptr<const sstable_list> partitioned_sstable_set::all() const {
return _all;
}
void partitioned_sstable_set::for_each_sstable(std::function<void(const shared_sstable&)> func) const {
stop_iteration partitioned_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
for (auto& sst : *_all) {
func(sst);
if (func(sst)) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
void partitioned_sstable_set::insert(shared_sstable sst) {
@@ -451,10 +461,13 @@ time_series_sstable_set::size() const noexcept {
return _sstables->size();
}
void time_series_sstable_set::for_each_sstable(std::function<void(const shared_sstable&)> func) const {
stop_iteration time_series_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
for (auto& entry : *_sstables) {
func(entry.second);
if (func(entry.second)) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
// O(log n)
@@ -1022,12 +1035,13 @@ lw_shared_ptr<const sstable_list> compound_sstable_set::all() const {
return ret;
}
void compound_sstable_set::for_each_sstable(std::function<void(const shared_sstable&)> func) const {
stop_iteration compound_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
for (auto& set : _sets) {
set->for_each_sstable([&func] (const shared_sstable& sst) {
func(sst);
});
if (set->for_each_sstable_until([&func] (const shared_sstable& sst) { return func(sst); })) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
void compound_sstable_set::insert(shared_sstable sst) {
@@ -1219,7 +1233,7 @@ flat_mutation_reader_v2 sstable_set::make_crawling_reader(
tracing::trace_state_ptr trace_ptr,
read_monitor_generator& monitor_generator) const {
std::vector<flat_mutation_reader_v2> readers;
_impl->for_each_sstable([&] (const shared_sstable& sst) mutable {
for_each_sstable([&] (const shared_sstable& sst) mutable {
readers.emplace_back(sst->make_crawling_reader(schema, permit, pc, trace_ptr, monitor_generator(sst)));
});
return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);

View File

@@ -66,6 +66,9 @@ public:
lw_shared_ptr<const sstable_list> all() const;
// Prefer for_each_sstable() over all() for iteration purposes, as the latter may have to copy all sstables into a temporary
void for_each_sstable(std::function<void(const shared_sstable&)> func) const;
// Calls func for each sstable or until it returns stop_iteration::yes
// Returns the last stop_iteration value.
stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const;
void insert(shared_sstable sst);
void erase(shared_sstable sst);
size_t size() const noexcept;

View File

@@ -30,7 +30,7 @@ public:
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
virtual lw_shared_ptr<const sstable_list> all() const = 0;
virtual void for_each_sstable(std::function<void(const shared_sstable&)> func) const = 0;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const = 0;
virtual void insert(shared_sstable sst) = 0;
virtual void erase(shared_sstable sst) = 0;
virtual size_t size() const noexcept = 0;
@@ -95,7 +95,7 @@ public:
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
virtual void for_each_sstable(std::function<void(const shared_sstable&)> func) const override;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const override;
virtual void insert(shared_sstable sst) override;
virtual void erase(shared_sstable sst) override;
virtual size_t size() const noexcept override;
@@ -121,7 +121,7 @@ public:
virtual std::unique_ptr<sstable_set_impl> clone() const override;
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
virtual void for_each_sstable(std::function<void(const shared_sstable&)> func) const override;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const override;
virtual void insert(shared_sstable sst) override;
virtual void erase(shared_sstable sst) override;
virtual size_t size() const noexcept override;
@@ -161,7 +161,7 @@ public:
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
virtual void for_each_sstable(std::function<void(const shared_sstable&)> func) const override;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const override;
virtual void insert(shared_sstable sst) override;
virtual void erase(shared_sstable sst) override;
virtual size_t size() const noexcept override;

View File

@@ -3608,6 +3608,12 @@ future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) {
}
}
std::string to_string(const shared_sstable& sst, bool include_origin) {
return include_origin ?
fmt::format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()) :
fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level());
}
} // namespace sstables
namespace seastar {

View File

@@ -1998,7 +1998,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name));
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, default_priority_class(), compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(std::move(local_ranges)));
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges));
auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0();
BOOST_REQUIRE(ret.new_sstables.size() == 1);
@@ -3571,23 +3571,23 @@ SEASTAR_TEST_CASE(sstable_needs_cleanup_test) {
{
auto local_ranges = { token_range(0, 9) };
auto sst = sst_gen(keys[0], keys[9]);
BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s));
BOOST_REQUIRE(!needs_cleanup(sst, local_ranges));
}
{
auto local_ranges = { token_range(0, 1), token_range(3, 4), token_range(5, 6) };
auto sst = sst_gen(keys[0], keys[1]);
BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s));
BOOST_REQUIRE(!needs_cleanup(sst, local_ranges));
auto sst2 = sst_gen(keys[2], keys[2]);
BOOST_REQUIRE(needs_cleanup(sst2, local_ranges, s));
BOOST_REQUIRE(needs_cleanup(sst2, local_ranges));
auto sst3 = sst_gen(keys[0], keys[6]);
BOOST_REQUIRE(needs_cleanup(sst3, local_ranges, s));
BOOST_REQUIRE(needs_cleanup(sst3, local_ranges));
auto sst5 = sst_gen(keys[7], keys[7]);
BOOST_REQUIRE(needs_cleanup(sst5, local_ranges, s));
BOOST_REQUIRE(needs_cleanup(sst5, local_ranges));
}
});
}
@@ -4886,3 +4886,28 @@ SEASTAR_TEST_CASE(compaction_manager_stop_and_drain_race_test) {
testlog.info("stopping compaction manager");
co_await cm.stop();
}
SEASTAR_TEST_CASE(test_print_shared_sstables_vector) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(2);
auto sst_gen = env.make_sst_factory(s);
std::vector<sstables::shared_sstable> ssts(2);
auto mut0 = mutation(s, pks[0]);
mut0.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
ssts[0] = make_sstable_containing(sst_gen, {std::move(mut0)});
auto mut1 = mutation(s, pks[1]);
mut1.partition().apply_insert(*s, ss.make_ckey(1), ss.new_timestamp());
ssts[1] = make_sstable_containing(sst_gen, {std::move(mut1)});
std::string msg = format("{}", ssts);
for (const auto& sst : ssts) {
auto gen_str = format("{}", sst->generation());
BOOST_REQUIRE(msg.find(gen_str) != std::string::npos);
}
});
}