Every table and sstable set keeps track of the total file size of contained sstables. Due to a feature request, we also want to keep track of the hypothetical file size if Data files were uncompressed, to add a metric that shows the compression ratio of sstables. We achieve this by replacing the relevant `uint_64 bytes_on_disk` counters everywhere with a struct that contains both the actual (post-compression) size and the hypothetical pre-compression size. This patch isn't supposed to change any observable behavior. In the next patch, we will use these changes to add a new metric.
970 lines
45 KiB
C++
970 lines
45 KiB
C++
/*
|
|
* Copyright (C) 2023-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
|
|
#include "compaction/task_manager_module.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "replica/database.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/sstable_directory.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/pretty_printers.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
namespace replica {
|
|
|
|
// Helper structure for resharding.
|
|
//
|
|
// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and
|
|
// need to be resharded. Each shard will keep one such descriptor, that contains the list of
|
|
// SSTables assigned to it, and their total size. The total size is used to make sure we are
|
|
// fairly balancing SSTables among shards.
|
|
struct reshard_shard_descriptor {
|
|
sstables::sstable_directory::sstable_open_info_vector info_vec;
|
|
uint64_t uncompressed_data_size = 0;
|
|
|
|
bool total_size_smaller(const reshard_shard_descriptor& rhs) const {
|
|
return uncompressed_data_size < rhs.uncompressed_data_size;
|
|
}
|
|
|
|
uint64_t size() const {
|
|
return uncompressed_data_size;
|
|
}
|
|
};
|
|
|
|
} // namespace replica
|
|
|
|
namespace compaction {
|
|
|
|
// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all.
|
|
// This function assumes that the list of SSTables can be fairly big so it is careful to
|
|
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
|
|
future<sstables::sstable_directory::sstable_open_info_vector>
|
|
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) {
|
|
auto info_vec = sstables::sstable_directory::sstable_open_info_vector();
|
|
|
|
// We want to make sure that each distributed object reshards about the same amount of data.
|
|
// Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
|
|
// all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
|
|
// their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
|
|
// move in bulk and that's efficient. That shard can then distribute the work among all the
|
|
// others who will reshard.
|
|
auto coordinator = this_shard_id();
|
|
// We will first move all of the foreign open info to temporary storage so that we can sort
|
|
// them. We want to distribute bigger sstables first.
|
|
const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get();
|
|
co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
|
|
auto shared_sstables = d.retrieve_shared_sstables();
|
|
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
|
|
if (sorted_owned_ranges_ptr) {
|
|
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
|
|
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
|
|
need_cleanup.push_back(co_await sst->get_open_info());
|
|
co_return false;
|
|
}
|
|
co_return true;
|
|
});
|
|
}
|
|
if (shared_sstables.empty() && need_cleanup.empty()) {
|
|
co_return;
|
|
}
|
|
co_await smp::submit_to(coordinator, [&] () -> future<> {
|
|
info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size());
|
|
for (auto& info : shared_sstables) {
|
|
info_vec.emplace_back(std::move(info));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
for (auto& info : need_cleanup) {
|
|
info_vec.emplace_back(std::move(info));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
});
|
|
});
|
|
|
|
co_return info_vec;
|
|
}
|
|
|
|
// Given a vector of shared sstables to be resharded, distribute it among all shards.
|
|
// The vector is first sorted to make sure that we are moving the biggest SSTables first.
|
|
//
|
|
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
|
|
future<std::vector<replica::reshard_shard_descriptor>>
|
|
distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) {
|
|
auto destinations = std::vector<replica::reshard_shard_descriptor>(smp::count);
|
|
|
|
std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
|
|
// Sort on descending SSTable sizes.
|
|
return a.uncompressed_data_size > b.uncompressed_data_size;
|
|
});
|
|
|
|
for (auto& info : source) {
|
|
// Choose the stable shard owner with the smallest amount of accumulated work.
|
|
// Note that for sstables that need cleanup via resharding, owners may contain
|
|
// a single shard.
|
|
auto shard_it = std::ranges::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) {
|
|
return destinations[lhs].total_size_smaller(destinations[rhs]);
|
|
});
|
|
auto& dest = destinations[*shard_it];
|
|
dest.uncompressed_data_size += info.uncompressed_data_size;
|
|
dest.info_vec.push_back(std::move(info));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
co_return destinations;
|
|
}
|
|
|
|
// reshards a collection of SSTables.
|
|
//
|
|
// A reference to the compaction manager must be passed so we can register with it. Knowing
|
|
// which table is being processed is a requirement of the compaction manager, so this must be
|
|
// passed too.
|
|
//
|
|
// We will reshard max_sstables_per_job at once.
|
|
//
|
|
// A creator function must be passed that will create an SSTable object in the correct shard,
|
|
// and an I/O priority must be specified.
|
|
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
|
|
compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, tasks::task_info parent_info)
|
|
{
|
|
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
|
|
// to reshard in this shard.
|
|
if (shared_info.empty()) {
|
|
co_return;
|
|
}
|
|
|
|
// We want to reshard many SSTables at a time for efficiency. However if we have too many we may
|
|
// be risking OOM.
|
|
auto max_sstables_per_job = table.schema()->max_compaction_threshold();
|
|
auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job;
|
|
auto sstables_per_job = shared_info.size() / num_jobs;
|
|
|
|
std::vector<std::vector<sstables::shared_sstable>> buckets;
|
|
buckets.reserve(num_jobs);
|
|
buckets.emplace_back();
|
|
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
|
|
auto sst = co_await dir.load_foreign_sstable(info);
|
|
// Last bucket gets leftover SSTables
|
|
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
|
|
buckets.emplace_back();
|
|
}
|
|
buckets.back().push_back(std::move(sst));
|
|
});
|
|
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
|
|
// parallel_for_each so the statistics about pending jobs are updated to reflect all
|
|
// jobs. But only one will run in parallel at a time
|
|
auto& t = table.try_get_compaction_group_view_with_static_sharding();
|
|
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
|
|
return table.get_compaction_manager().run_custom_job(t, compaction_type::Reshard, "Reshard compaction", [&] (compaction_data& info, compaction_progress_monitor& progress_monitor) -> future<> {
|
|
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
|
|
|
|
compaction_descriptor desc(sstlist);
|
|
desc.options = compaction_type_options::make_reshard();
|
|
desc.creator = creator;
|
|
desc.sharder = &erm->get_sharder(*table.schema());
|
|
desc.owned_ranges = owned_ranges_ptr;
|
|
|
|
auto result = co_await compact_sstables(std::move(desc), info, t, progress_monitor);
|
|
// input sstables are moved, to guarantee their resources are released once we're done
|
|
// resharding them.
|
|
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
|
|
}, parent_info, throw_if_stopping::no);
|
|
});
|
|
}
|
|
|
|
struct table_tasks_info {
|
|
current_task_type task;
|
|
table_info ti;
|
|
|
|
table_tasks_info(current_task_type t, table_info info)
|
|
: task(std::move(t))
|
|
, ti(info)
|
|
{}
|
|
};
|
|
|
|
future<> run_on_table(sstring op, replica::database& db, std::string keyspace, table_info ti, std::function<future<> (replica::table&)> func) {
|
|
std::exception_ptr ex;
|
|
tasks::tmlogger.debug("Starting {} on {}.{}", op, keyspace, ti.name);
|
|
try {
|
|
auto& t = db.find_column_family(ti.id);
|
|
auto holder = t.hold();
|
|
co_await func(t);
|
|
} catch (const replica::no_such_column_family& e) {
|
|
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what());
|
|
} catch (const gate_closed_exception& e) {
|
|
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what());
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
tasks::tmlogger.error("Failed {} of {}.{}: {}", op, keyspace, ti.name, ex);
|
|
}
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<> wait_for_your_turn(seastar::condition_variable& cv, current_task_type& current_task, tasks::task_id id) {
|
|
co_await cv.wait([&] {
|
|
return current_task && current_task->id() == id;
|
|
});
|
|
}
|
|
|
|
future<> run_table_tasks(replica::database& db, std::vector<table_tasks_info> table_tasks, seastar::condition_variable& cv, current_task_type& current_task, bool sort) {
|
|
std::exception_ptr ex;
|
|
|
|
// While compaction is run on one table, the size of tables may significantly change.
|
|
// Thus, they are sorted before each individual compaction and the smallest table is chosen.
|
|
while (!table_tasks.empty()) {
|
|
try {
|
|
if (sort) {
|
|
// Major compact smaller tables first, to increase chances of success if low on space.
|
|
// Tables will be kept in descending order.
|
|
std::ranges::sort(table_tasks, std::greater<>(), [&] (const table_tasks_info& tti) {
|
|
try {
|
|
return db.find_column_family(tti.ti.id).get_stats().live_disk_space_used.on_disk;
|
|
} catch (const replica::no_such_column_family& e) {
|
|
return int64_t(-1);
|
|
}
|
|
});
|
|
}
|
|
// Task responsible for the smallest table.
|
|
current_task = table_tasks.back().task;
|
|
table_tasks.pop_back();
|
|
cv.broadcast();
|
|
co_await current_task->done();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
current_task = nullptr;
|
|
cv.broken(ex);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (ex) {
|
|
// Wait for all tasks even on failure.
|
|
for (auto& tti: table_tasks) {
|
|
co_await tti.task->done();
|
|
}
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
struct keyspace_tasks_info {
|
|
current_task_type task;
|
|
sstring keyspace;
|
|
std::vector<table_info> table_infos;
|
|
|
|
keyspace_tasks_info(current_task_type t, sstring ks_name, std::vector<table_info> t_infos)
|
|
: task(std::move(t))
|
|
, keyspace(std::move(ks_name))
|
|
, table_infos(std::move(t_infos))
|
|
{}
|
|
};
|
|
|
|
future<> run_keyspace_tasks(replica::database& db, std::vector<keyspace_tasks_info> keyspace_tasks, seastar::condition_variable& cv, current_task_type& current_task, bool sort) {
|
|
std::exception_ptr ex;
|
|
|
|
// While compaction is run on one table, the size of tables may significantly change.
|
|
// Thus, they are sorted before each individual compaction and the smallest keyspace is chosen.
|
|
while (!keyspace_tasks.empty()) {
|
|
try {
|
|
if (sort) {
|
|
// Major compact smaller tables first, to increase chances of success if low on space.
|
|
// Tables will be kept in descending order.
|
|
std::ranges::sort(keyspace_tasks, std::greater<>(), [&] (const keyspace_tasks_info& kti) {
|
|
try {
|
|
return std::accumulate(kti.table_infos.begin(), kti.table_infos.end(), int64_t(0), [&] (int64_t sum, const table_info& t) {
|
|
try {
|
|
sum += db.find_column_family(t.id).get_stats().live_disk_space_used.on_disk;
|
|
} catch (const replica::no_such_column_family&) {
|
|
// ignore
|
|
}
|
|
return sum;
|
|
});
|
|
} catch (const replica::no_such_keyspace&) {
|
|
return int64_t(-1);
|
|
}
|
|
});
|
|
}
|
|
// Task responsible for the smallest keyspace.
|
|
current_task = keyspace_tasks.back().task;
|
|
keyspace_tasks.pop_back();
|
|
cv.broadcast();
|
|
co_await current_task->done();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
current_task = nullptr;
|
|
cv.broken(ex);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (ex) {
|
|
// Wait for all tasks even on failure.
|
|
for (auto& kti: keyspace_tasks) {
|
|
co_await kti.task->done();
|
|
}
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<tasks::task_manager::task::progress> compaction_task_impl::get_progress(const compaction_data& cdata, const compaction_progress_monitor& progress_monitor) const {
|
|
if (cdata.compaction_size == 0) {
|
|
co_return tasks::task_manager::task::progress{};
|
|
}
|
|
|
|
co_return tasks::task_manager::task::progress{
|
|
.completed = is_done() ? cdata.compaction_size : progress_monitor.get_progress(), // Consider tasks which skip all files.
|
|
.total = cdata.compaction_size
|
|
};
|
|
}
|
|
|
|
tasks::is_abortable compaction_task_impl::is_abortable() const noexcept {
|
|
return tasks::is_abortable{!_parent_id};
|
|
}
|
|
|
|
future<uint64_t> compaction_task_impl::get_table_task_workload(replica::database& db, const table_info& ti) const {
|
|
uint64_t bytes = 0;
|
|
co_await run_on_table("find_compaction_task_progress", db, _status.keyspace, ti, [&bytes] (replica::table& t) -> future<> {
|
|
co_await t.parallel_foreach_compaction_group_view(coroutine::lambda([&bytes, &t] (compaction::compaction_group_view& view) -> future<> {
|
|
auto candidates = co_await t.get_compaction_manager().get_candidates(view);
|
|
bytes += std::ranges::fold_left(candidates | std::views::transform([] (auto& sst) { return sst->data_size(); }), int64_t(0), std::plus{});
|
|
}));
|
|
});
|
|
co_return bytes;
|
|
}
|
|
|
|
future<uint64_t> compaction_task_impl::get_shard_task_workload(replica::database& db, const std::vector<table_info>& tables) const {
|
|
uint64_t bytes = 0;
|
|
for (const auto& ti : tables) {
|
|
bytes += co_await get_table_task_workload(db, ti);
|
|
}
|
|
co_return bytes;
|
|
}
|
|
|
|
future<uint64_t> compaction_task_impl::get_keyspace_task_workload(sharded<replica::database>& db, const std::vector<table_info>& tables) const {
|
|
return db.map_reduce0([&tables, this] (replica::database& local_db) {
|
|
return get_shard_task_workload(local_db, tables);
|
|
}, uint64_t{0}, std::plus<uint64_t>{});
|
|
}
|
|
|
|
static future<bool> maybe_flush_commitlog(sharded<replica::database>& db, bool force_flush) {
|
|
// flush commitlog if
|
|
// (a) force_flush == true (or)
|
|
// (b) flush_all_tables_before_major > 0s and the configured seconds have elapsed since last all tables flush
|
|
if (!force_flush) {
|
|
auto interval = db.local().get_compaction_manager().flush_all_tables_before_major();
|
|
if (interval <= 0s) {
|
|
co_return false;
|
|
}
|
|
|
|
auto when = db_clock::now() - interval;
|
|
if (co_await replica::database::get_all_tables_flushed_at(db) > when) {
|
|
co_return false;
|
|
}
|
|
}
|
|
|
|
co_await db.invoke_on_all([&] (replica::database& db) -> future<> {
|
|
co_await db.flush_commitlog();
|
|
});
|
|
co_return true;
|
|
}
|
|
|
|
tasks::is_user_task global_major_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
std::unordered_map<sstring, std::vector<table_info>> get_tables_by_keyspace(replica::database& db) {
|
|
std::unordered_map<sstring, std::vector<table_info>> tables_by_keyspace;
|
|
auto tables_meta = db.get_tables_metadata().get_column_families_copy();
|
|
for (const auto& [table_id, t] : tables_meta) {
|
|
const auto& ks_name = t->schema()->ks_name();
|
|
const auto& table_name = t->schema()->cf_name();
|
|
tables_by_keyspace[ks_name].emplace_back(table_name, table_id);
|
|
}
|
|
return tables_by_keyspace;
|
|
}
|
|
|
|
future<> global_major_compaction_task_impl::run() {
|
|
bool flushed_all_tables = false;
|
|
if (_flush_mode == flush_mode::all_tables) {
|
|
flushed_all_tables = co_await maybe_flush_commitlog(_db, _consider_only_existing_data);
|
|
}
|
|
|
|
auto tables_by_keyspace = get_tables_by_keyspace(_db.local());
|
|
seastar::condition_variable cv;
|
|
current_task_type current_task;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
std::vector<keyspace_tasks_info> keyspace_tasks;
|
|
flush_mode fm = flushed_all_tables ? flush_mode::skip : _flush_mode;
|
|
for (auto& [ks, table_infos] : tables_by_keyspace) {
|
|
auto task = co_await _module->make_and_start_task<major_keyspace_compaction_task_impl>(parent_info, ks, parent_info.id, _db, table_infos, fm,
|
|
_consider_only_existing_data, &cv, ¤t_task);
|
|
keyspace_tasks.emplace_back(std::move(task), ks, std::move(table_infos));
|
|
}
|
|
co_await run_keyspace_tasks(_db.local(), keyspace_tasks, cv, current_task, false);
|
|
}
|
|
|
|
future<std::optional<double>> global_major_compaction_task_impl::expected_total_workload() const {
|
|
if (_expected_workload) {
|
|
co_return _expected_workload;
|
|
}
|
|
uint64_t bytes = 0;
|
|
auto tables_by_keyspace = get_tables_by_keyspace(_db.local());
|
|
for (const auto& [_, table_infos] : tables_by_keyspace) {
|
|
bytes += co_await get_keyspace_task_workload(_db, table_infos);
|
|
}
|
|
co_return _expected_workload = bytes;
|
|
}
|
|
|
|
tasks::is_user_task major_keyspace_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task{!_parent_id};
|
|
}
|
|
|
|
future<> major_keyspace_compaction_task_impl::run() {
|
|
co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", utils::wait_for_message(10s));
|
|
|
|
if (_cv) {
|
|
co_await wait_for_your_turn(*_cv, *_current_task, _status.id);
|
|
}
|
|
|
|
bool flushed_all_tables = false;
|
|
if (_flush_mode == flush_mode::all_tables) {
|
|
flushed_all_tables = co_await maybe_flush_commitlog(_db, _consider_only_existing_data);
|
|
}
|
|
|
|
flush_mode fm = flushed_all_tables ? flush_mode::skip : _flush_mode;
|
|
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& module = db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await module.make_and_start_task<shard_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, fm, _consider_only_existing_data);
|
|
co_await task->done();
|
|
});
|
|
|
|
utils::get_local_injector().inject("major_keyspace_compaction_task_impl_run_fail", [] () {
|
|
throw std::runtime_error("Injected failure in major_keyspace_compaction_task_impl::run");
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> major_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_keyspace_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
future<> shard_major_keyspace_compaction_task_impl::run() {
|
|
seastar::condition_variable cv;
|
|
current_task_type current_task;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
std::vector<table_tasks_info> table_tasks;
|
|
for (auto& ti : _local_tables) {
|
|
table_tasks.emplace_back(co_await _module->make_and_start_task<table_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, ti.name, _status.id, _db, ti, cv, current_task, _flush_mode, _consider_only_existing_data), ti);
|
|
}
|
|
|
|
co_await run_table_tasks(_db, std::move(table_tasks), cv, current_task, true);
|
|
|
|
utils::get_local_injector().inject("shard_major_keyspace_compaction_task_impl_run_fail", [] () {
|
|
throw std::runtime_error("Injected failure in shard_major_keyspace_compaction_task_impl::run");
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> shard_major_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_shard_task_workload(_db, _local_tables);
|
|
}
|
|
|
|
future<> table_major_keyspace_compaction_task_impl::run() {
|
|
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
replica::table::do_flush do_flush(_flush_mode != flush_mode::skip);
|
|
co_await run_on_table("force_keyspace_compaction", _db, _status.keyspace, _ti, [info, do_flush, consider_only_existing_data = _consider_only_existing_data] (replica::table& t) {
|
|
return t.compact_all_sstables(info, do_flush, consider_only_existing_data);
|
|
});
|
|
|
|
utils::get_local_injector().inject("table_major_keyspace_compaction_task_impl_run_fail", [] () {
|
|
throw std::runtime_error("Injected failure in table_major_keyspace_compaction_task_impl::run");
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> table_major_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_table_task_workload(_db, _ti);
|
|
}
|
|
|
|
tasks::is_user_task cleanup_keyspace_compaction_task_impl::is_user_task() const noexcept {
|
|
return _is_user_task;
|
|
}
|
|
|
|
future<> cleanup_keyspace_compaction_task_impl::run() {
|
|
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
|
if (_flush_mode == flush_mode::all_tables) {
|
|
co_await db.flush_all_tables();
|
|
}
|
|
auto& module = db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>({_status.id, _status.shard}, _status.keyspace, _status.id, db, _table_infos);
|
|
co_await task->done();
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> cleanup_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_keyspace_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
tasks::is_user_task global_cleanup_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
future<> global_cleanup_compaction_task_impl::run() {
|
|
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
|
co_await db.flush_all_tables();
|
|
// Local keyspaces do not require cleanup.
|
|
// Keyspaces using tablets do not support cleanup.
|
|
const auto keyspaces = _db.local().get_non_local_vnode_based_strategy_keyspaces();
|
|
co_await coroutine::parallel_for_each(keyspaces, [&] (const sstring& ks) -> future<> {
|
|
std::vector<table_info> tables;
|
|
const auto& cf_meta_data = db.find_keyspace(ks).metadata().get()->cf_meta_data();
|
|
for (auto& [name, schema] : cf_meta_data) {
|
|
tables.emplace_back(name, schema->id());
|
|
}
|
|
auto& module = db.get_compaction_manager().get_task_manager_module();
|
|
const tasks::task_info task_info{_status.id, _status.shard};
|
|
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>(
|
|
task_info, ks, _status.id, db, std::move(tables));
|
|
co_await task->done();
|
|
});
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> global_cleanup_compaction_task_impl::expected_total_workload() const {
|
|
if (_expected_workload) {
|
|
co_return _expected_workload;
|
|
}
|
|
uint64_t bytes = 0;
|
|
auto keyspaces = _db.local().get_non_local_vnode_based_strategy_keyspaces();
|
|
for (const auto& ks : keyspaces) {
|
|
std::vector<table_info> tables;
|
|
const auto& cf_meta_data = _db.local().find_keyspace(ks).metadata().get()->cf_meta_data();
|
|
for (auto& [name, schema] : cf_meta_data) {
|
|
tables.emplace_back(name, schema->id());
|
|
}
|
|
bytes += co_await get_keyspace_task_workload(_db, tables);
|
|
}
|
|
co_return _expected_workload = bytes;
|
|
}
|
|
|
|
future<> shard_cleanup_keyspace_compaction_task_impl::run() {
|
|
seastar::condition_variable cv;
|
|
current_task_type current_task;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
std::vector<table_tasks_info> table_tasks;
|
|
for (auto& ti : _local_tables) {
|
|
table_tasks.emplace_back(co_await _module->make_and_start_task<table_cleanup_keyspace_compaction_task_impl>(parent_info, _status.keyspace, ti.name, _status.id, _db, ti, cv, current_task), ti);
|
|
}
|
|
|
|
co_await run_table_tasks(_db, std::move(table_tasks), cv, current_task, true);
|
|
}
|
|
|
|
future<std::optional<double>> shard_cleanup_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_shard_task_workload(_db, _local_tables);
|
|
}
|
|
|
|
future<> table_cleanup_keyspace_compaction_task_impl::run() {
|
|
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
|
// Note that we do not hold an effective_replication_map_ptr throughout
|
|
// the cleanup operation, so the topology might change.
|
|
// Since clenaup is an admin operation required for vnodes,
|
|
// it is the responsibility of the system operator to not
|
|
// perform additional incompatible range movements during cleanup.
|
|
auto get_owned_ranges = [&] (std::string_view ks_name) -> future<owned_ranges_ptr> {
|
|
const auto& erm = _db.find_keyspace(ks_name).get_static_effective_replication_map();
|
|
co_return compaction::make_owned_ranges_ptr(co_await _db.get_keyspace_local_ranges(erm));
|
|
};
|
|
auto owned_ranges_ptr = co_await get_owned_ranges(_status.keyspace);
|
|
co_await run_on_table("force_keyspace_cleanup", _db, _status.keyspace, _ti, [&] (replica::table& t) {
|
|
// skip the flush, as cleanup_keyspace_compaction_task_impl::run should have done this.
|
|
return t.perform_cleanup_compaction(owned_ranges_ptr, tasks::task_info{_status.id, _status.shard}, replica::table::do_flush::no);
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> table_cleanup_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_table_task_workload(_db, _ti);
|
|
}
|
|
|
|
tasks::is_user_task offstrategy_keyspace_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
future<> offstrategy_keyspace_compaction_task_impl::run() {
|
|
bool res = co_await _db.map_reduce0([&] (replica::database& db) -> future<bool> {
|
|
bool needed = false;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& module = db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await module.make_and_start_task<shard_offstrategy_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, needed);
|
|
co_await task->done();
|
|
co_return needed;
|
|
}, false, std::plus<bool>());
|
|
if (_needed) {
|
|
*_needed = res;
|
|
}
|
|
}
|
|
|
|
future<std::optional<double>> offstrategy_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_keyspace_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
future<> shard_offstrategy_keyspace_compaction_task_impl::run() {
|
|
seastar::condition_variable cv;
|
|
current_task_type current_task;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
std::vector<table_tasks_info> table_tasks;
|
|
for (auto& ti : _table_infos) {
|
|
table_tasks.emplace_back(co_await _module->make_and_start_task<table_offstrategy_keyspace_compaction_task_impl>(parent_info, _status.keyspace, ti.name, _status.id, _db, ti, cv, current_task, _needed), ti);
|
|
}
|
|
|
|
co_await run_table_tasks(_db, std::move(table_tasks), cv, current_task, false);
|
|
}
|
|
|
|
future<std::optional<double>> shard_offstrategy_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_shard_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
future<> table_offstrategy_keyspace_compaction_task_impl::run() {
|
|
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
co_await run_on_table("perform_keyspace_offstrategy_compaction", _db, _status.keyspace, _ti, [this, info] (replica::table& t) -> future<> {
|
|
_needed |= co_await t.perform_offstrategy_compaction(info);
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> table_offstrategy_keyspace_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_table_task_workload(_db, _ti);
|
|
}
|
|
|
|
tasks::is_user_task upgrade_sstables_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
future<> upgrade_sstables_compaction_task_impl::run() {
|
|
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& compaction_module = db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await compaction_module.make_and_start_task<shard_upgrade_sstables_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, _exclude_current_version);
|
|
co_await task->done();
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> upgrade_sstables_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_keyspace_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
future<> shard_upgrade_sstables_compaction_task_impl::run() {
|
|
seastar::condition_variable cv;
|
|
current_task_type current_task;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
std::vector<table_tasks_info> table_tasks;
|
|
for (auto& ti : _table_infos) {
|
|
table_tasks.emplace_back(co_await _module->make_and_start_task<table_upgrade_sstables_compaction_task_impl>(parent_info, _status.keyspace, ti.name, _status.id, _db, ti, cv, current_task, _exclude_current_version), ti);
|
|
}
|
|
|
|
co_await run_table_tasks(_db, std::move(table_tasks), cv, current_task, false);
|
|
}
|
|
|
|
future<std::optional<double>> shard_upgrade_sstables_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_shard_task_workload(_db, _table_infos);
|
|
}
|
|
|
|
future<> table_upgrade_sstables_compaction_task_impl::run() {
|
|
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
|
auto get_owned_ranges = [&] (std::string_view keyspace_name) -> future<owned_ranges_ptr> {
|
|
const auto& ks = _db.find_keyspace(keyspace_name);
|
|
if (ks.get_replication_strategy().is_per_table()) {
|
|
co_return nullptr;
|
|
}
|
|
const auto& erm = ks.get_static_effective_replication_map();
|
|
co_return compaction::make_owned_ranges_ptr(co_await _db.get_keyspace_local_ranges(erm));
|
|
};
|
|
auto owned_ranges_ptr = co_await get_owned_ranges(_status.keyspace);
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
co_await run_on_table("upgrade_sstables", _db, _status.keyspace, _ti, [&] (replica::table& t) -> future<> {
|
|
return t.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
auto lock_holder = co_await t.get_compaction_manager().get_incremental_repair_read_lock(ts, "upgrade_sstables_compaction");
|
|
co_await t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version, info);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> table_upgrade_sstables_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_table_task_workload(_db, _ti);
|
|
}
|
|
|
|
tasks::is_user_task scrub_sstables_compaction_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
future<> scrub_sstables_compaction_task_impl::run() {
|
|
auto res = co_await _db.map_reduce0([&] (replica::database& db) -> future<compaction_stats> {
|
|
compaction_stats stats;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& compaction_module = db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await compaction_module.make_and_start_task<shard_scrub_sstables_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _column_families, _opts, stats);
|
|
co_await task->done();
|
|
co_return stats;
|
|
}, compaction_stats{}, std::plus<compaction_stats>());
|
|
if (_stats) {
|
|
*_stats = res;
|
|
}
|
|
}
|
|
|
|
future<std::optional<double>> scrub_sstables_compaction_task_impl::expected_total_workload() const {
|
|
try {
|
|
std::vector<table_info> table_infos;
|
|
table_infos.reserve(_column_families.size());
|
|
for (const auto& cf : _column_families) {
|
|
auto id = _db.local().find_uuid(_status.keyspace, cf);
|
|
table_infos.push_back(table_info{
|
|
.name = cf,
|
|
.id = id
|
|
});
|
|
}
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_keyspace_task_workload(_db, std::move(table_infos));
|
|
} catch (...) {
|
|
// Expected total workload cannot be found.
|
|
}
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
future<> shard_scrub_sstables_compaction_task_impl::run() {
|
|
_stats = co_await map_reduce(_column_families, [&] (sstring cfname) -> future<compaction_stats> {
|
|
compaction_stats stats{};
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& compaction_module = _db.get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await compaction_module.make_and_start_task<table_scrub_sstables_compaction_task_impl>(parent_info, _status.keyspace, cfname, _status.id, _db, _opts, stats);
|
|
co_await task->done();
|
|
co_return stats;
|
|
}, compaction_stats{}, std::plus<compaction_stats>());
|
|
}
|
|
|
|
future<std::optional<double>> shard_scrub_sstables_compaction_task_impl::expected_total_workload() const {
|
|
try {
|
|
std::vector<table_info> table_infos;
|
|
table_infos.reserve(_column_families.size());
|
|
for (auto& cf : _column_families) {
|
|
auto id = _db.find_uuid(_status.keyspace, cf);
|
|
table_infos.push_back(table_info{
|
|
.name = cf,
|
|
.id = id
|
|
});
|
|
}
|
|
co_return _expected_workload = _expected_workload ? _expected_workload : co_await get_shard_task_workload(_db, std::move(table_infos));
|
|
} catch (...) {
|
|
// Expected total workload cannot be found.
|
|
}
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
future<> table_scrub_sstables_compaction_task_impl::run() {
|
|
auto& cm = _db.get_compaction_manager();
|
|
auto& cf = _db.find_column_family(_status.keyspace, _status.table);
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) mutable -> future<> {
|
|
auto lock_holder = co_await cm.get_incremental_repair_read_lock(ts, "scrub_sstables_compaction");
|
|
auto r = co_await cm.perform_sstable_scrub(ts, _opts, info);
|
|
_stats += r.value_or(compaction_stats{});
|
|
});
|
|
}
|
|
|
|
future<std::optional<double>> table_scrub_sstables_compaction_task_impl::expected_total_workload() const {
|
|
try {
|
|
if (!_expected_workload) {
|
|
auto id = _db.find_uuid(_status.keyspace, _status.table);
|
|
table_info ti{
|
|
.name = _status.table,
|
|
.id = id
|
|
};
|
|
_expected_workload = co_await get_table_task_workload(_db, ti);
|
|
}
|
|
co_return _expected_workload;
|
|
} catch (...) {
|
|
// Expected total workload cannot be found.
|
|
}
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
future<> table_reshaping_compaction_task_impl::run() {
|
|
auto start = std::chrono::steady_clock::now();
|
|
auto total_size = co_await _dir.map_reduce0([&] (sstables::sstable_directory& d) -> future<uint64_t> {
|
|
uint64_t total_shard_size = 0;
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
|
|
auto task = co_await compaction_module.make_and_start_task<shard_reshaping_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, d, _db, _mode, _creator, _filter, total_shard_size);
|
|
co_await task->done();
|
|
co_return total_shard_size;
|
|
}, uint64_t(0), std::plus<uint64_t>());
|
|
|
|
if (total_size > 0) {
|
|
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
|
|
dblog.info("Reshaped {} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), duration.count(), utils::pretty_printed_throughput(total_size, duration));
|
|
}
|
|
}
|
|
|
|
future<> shard_reshaping_compaction_task_impl::run() {
|
|
auto& table = _db.local().find_column_family(_status.keyspace, _status.table);
|
|
auto holder = table.async_gate().hold();
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
|
|
std::unordered_map<compaction::compaction_group_view*, std::unordered_set<sstables::shared_sstable>> sstables_grouped_by_compaction_group;
|
|
for (auto& sstable : _dir.get_unshared_local_sstables()) {
|
|
auto& t = table.compaction_group_view_for_sstable(sstable);
|
|
sstables_grouped_by_compaction_group[&t].insert(sstable);
|
|
}
|
|
|
|
// reshape sstables individually within the compaction groups
|
|
for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) {
|
|
auto lock_holder = co_await table.get_compaction_manager().get_incremental_repair_read_lock(*sstables_in_cg.first, "reshaping_compaction");
|
|
co_await reshape_compaction_group(*sstables_in_cg.first, sstables_in_cg.second, table, info);
|
|
}
|
|
}
|
|
|
|
future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(compaction::compaction_group_view& t, std::unordered_set<sstables::shared_sstable>& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) {
|
|
|
|
while (true) {
|
|
auto reshape_candidates = sstables_in_cg
|
|
| std::views::filter([&filter = _filter] (const auto& sst) {
|
|
return filter(sst);
|
|
}) | std::ranges::to<std::vector>();
|
|
if (reshape_candidates.empty()) {
|
|
break;
|
|
}
|
|
// all sstables were found in the same sstable_directory instance, so they share the same underlying storage.
|
|
auto& storage = reshape_candidates.front()->get_storage();
|
|
auto cfg = co_await make_reshape_config(storage, _mode);
|
|
auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), cfg);
|
|
if (desc.sstables.empty()) {
|
|
break;
|
|
}
|
|
|
|
if (!_total_shard_size) {
|
|
dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
|
|
}
|
|
|
|
uint64_t reshaped_size = 0;
|
|
std::vector<sstables::shared_sstable> sstlist;
|
|
for (auto& sst : desc.sstables) {
|
|
reshaped_size += sst->data_size();
|
|
sstlist.push_back(sst);
|
|
}
|
|
|
|
desc.creator = _creator;
|
|
|
|
try {
|
|
co_await table.get_compaction_manager().run_custom_job(t, compaction_type::Reshape, "Reshape compaction", [&dir = _dir, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, &t] (compaction_data& info, compaction_progress_monitor& progress_monitor) mutable -> future<> {
|
|
compaction_result result = co_await compact_sstables(std::move(desc), info, t, progress_monitor);
|
|
// update the sstables_in_cg set with new sstables and remove the reshaped ones
|
|
for (auto& sst : sstlist) {
|
|
sstables_in_cg.erase(sst);
|
|
}
|
|
sstables_in_cg.insert(result.new_sstables.begin(), result.new_sstables.end());
|
|
// remove the reshaped sstables from the sstable directory and collect the new ones
|
|
co_await dir.remove_unshared_sstables(std::move(sstlist));
|
|
co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no);
|
|
}, info, throw_if_stopping::yes);
|
|
} catch (compaction::compaction_stopped_exception& e) {
|
|
dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
|
|
break;
|
|
} catch (...) {
|
|
dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception());
|
|
break;
|
|
}
|
|
|
|
// reshape succeeded - update the total reshaped size
|
|
_total_shard_size += reshaped_size;
|
|
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
}
|
|
|
|
future<> table_resharding_compaction_task_impl::run() {
|
|
auto all_jobs = co_await collect_all_shared_sstables(_dir, _db, _status.keyspace, _status.table, _owned_ranges_ptr);
|
|
auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));
|
|
|
|
uint64_t total_size = std::ranges::fold_left(destinations | std::views::transform(std::mem_fn(&replica::reshard_shard_descriptor::size)), uint64_t(0), std::plus{});
|
|
_expected_workload = total_size;
|
|
if (total_size == 0) {
|
|
co_return;
|
|
}
|
|
|
|
auto start = std::chrono::steady_clock::now();
|
|
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table);
|
|
|
|
co_await _db.invoke_on_all(coroutine::lambda([&] (replica::database& db) -> future<> {
|
|
tasks::task_info parent_info{_status.id, _status.shard};
|
|
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
|
|
// make shard-local copy of owned_ranges
|
|
compaction::owned_ranges_ptr local_owned_ranges_ptr;
|
|
if (_owned_ranges_ptr) {
|
|
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
|
|
}
|
|
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), destinations);
|
|
co_await task->done();
|
|
}));
|
|
|
|
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
|
|
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table, duration.count(), utils::pretty_printed_throughput(total_size, duration));
|
|
}
|
|
|
|
future<std::optional<double>> table_resharding_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload ? std::make_optional<double>(_expected_workload) : std::nullopt;
|
|
}
|
|
|
|
shard_resharding_compaction_task_impl::shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
|
|
std::string keyspace,
|
|
std::string table,
|
|
tasks::task_id parent_id,
|
|
sharded<sstables::sstable_directory>& dir,
|
|
replica::database& db,
|
|
compaction_sstable_creator_fn creator,
|
|
compaction::owned_ranges_ptr local_owned_ranges_ptr,
|
|
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
|
|
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), std::move(table), "", parent_id)
|
|
, _dir(dir)
|
|
, _db(db)
|
|
, _creator(std::move(creator))
|
|
, _local_owned_ranges_ptr(std::move(local_owned_ranges_ptr))
|
|
, _destinations(destinations)
|
|
{
|
|
_expected_workload = _destinations[this_shard_id()].size();
|
|
}
|
|
|
|
future<> shard_resharding_compaction_task_impl::run() {
|
|
auto& table = _db.find_column_family(_status.keyspace, _status.table);
|
|
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
|
|
tasks::task_info info{_status.id, _status.shard};
|
|
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), info);
|
|
co_await _dir.local().move_foreign_sstables(_dir);
|
|
}
|
|
|
|
future<std::optional<double>> shard_resharding_compaction_task_impl::expected_total_workload() const {
|
|
co_return _expected_workload;
|
|
}
|
|
|
|
}
|
|
|
|
auto fmt::formatter<compaction::flush_mode>::format(compaction::flush_mode fm, fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
|
std::string_view name;
|
|
switch (fm) {
|
|
using enum compaction::flush_mode;
|
|
case skip:
|
|
name = "skip";
|
|
break;
|
|
case compacted_tables:
|
|
name = "compacted_tables";
|
|
break;
|
|
case all_tables:
|
|
name = "all_tables";
|
|
break;
|
|
}
|
|
return fmt::format_to(ctx.out(), "{}", name);
|
|
}
|