mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
replica: Introduce add_new_sstables_and_update_cache()
Piggyback on new add_new_sstable_and_update_cache(), replacing the previous add_sstables_and_update_cache(). Will be used by intra-node migration since we want it to be safe when loading the cloned sstables. An unsplit sstable can be cloned into destination which already ACKed split, so we need this variant which splits sstable if needed, while it's unsealed. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -618,6 +618,9 @@ public:
|
||||
add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
|
||||
add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add);
|
||||
|
||||
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
sstables::shared_sstable make_sstable();
|
||||
|
||||
@@ -1448,6 +1448,36 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
co_return std::move(ret);
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add) {
|
||||
std::exception_ptr ex;
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
// We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it,
|
||||
// so the exception handling below will only have to unlink sstables not processed yet.
|
||||
try {
|
||||
for (auto& sst: new_ssts) {
|
||||
auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add);
|
||||
std::ranges::move(ssts, std::back_inserter(ret));
|
||||
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
if (ex) {
|
||||
co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
co_await sst->unlink();
|
||||
}
|
||||
});
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
co_return std::move(ret);
|
||||
}
|
||||
|
||||
future<>
|
||||
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
|
||||
Reference in New Issue
Block a user