diff --git a/sstables/storage.cc b/sstables/storage.cc index 4f7f49c4bf..73479a0cd4 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -847,6 +847,10 @@ future<> object_storage_base::change_state(const sstable& sst, sstable_state sta } future<> object_storage_base::wipe(const sstable& sst, sync_dir) noexcept { + // FIXME: unlike filesystem_storage::wipe, this implementation does not + // catch exceptions from delete_object / sstables_registry calls and may + // return an exceptional future, breaking the contract documented on + // storage::wipe. auto& sstables_registry = sst.manager().sstables_registry(); co_await sstables_registry.update_entry_status(owner(), sst.generation(), status_removing); diff --git a/sstables/storage.hh b/sstables/storage.hh index 65a4b4fe2f..f3c35c8a6b 100644 --- a/sstables/storage.hh +++ b/sstables/storage.hh @@ -109,6 +109,8 @@ public: virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0; // runs in async context virtual void open(sstable& sst) = 0; + // Must never return an exceptional future: implementations are expected + // to catch and log any errors internally. virtual future<> wipe(const sstable& sst, sync_dir) noexcept = 0; virtual future open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) = 0; virtual future make_data_or_index_sink(sstable& sst, component_type type) = 0; diff --git a/sstables_loader.cc b/sstables_loader.cc index 19848f951b..ecfa91e963 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -134,6 +134,7 @@ public: using primary_replica_only = bool_class; using unlink_sstables = bool_class; +using defer_unlinking = bool_class; class sstable_streamer { protected: @@ -173,10 +174,10 @@ public: virtual future<> stream(shared_ptr progress); host_id_vector_replica_set get_endpoints(const dht::token& token) const; - future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector); + future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector, defer_unlinking defer); protected: virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function filter) const; - future<> stream_sstables(const dht::partition_range&, std::vector, shared_ptr progress); + future<> stream_sstables(const dht::partition_range&, std::vector, shared_ptr progress, defer_unlinking defer); private: host_id_vector_replica_set get_all_endpoints(const dht::token& token) const; }; @@ -236,7 +237,7 @@ private: future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector sstables, shared_ptr progress) { if (_stream_scope != stream_scope::node) { - co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress)); + co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress), defer_unlinking::no); } llog.debug("Directly downloading {} fully contained SSTables to local node from object storage.", sstables.size()); auto downloaded_ssts = co_await download_fully_contained_sstables(std::move(sstables)); @@ -349,6 +350,14 @@ private: co_return downloaded_sstables; } + future<> unlink_marked_sstables() { + co_await coroutine::parallel_for_each(_sstables, [] (sstables::shared_sstable& sst) -> future<> { + if (sst->marked_for_deletion()) { + co_await sst->unlink(); + } + }); + } + bool tablet_in_scope(locator::tablet_id) const; friend future> get_sstables_for_tablets_for_tests(const std::vector& sstables, @@ -409,7 +418,7 @@ future<> sstable_streamer::stream(shared_ptr progress) { } const auto full_partition_range = dht::partition_range::make_open_ended_both_sides(); - co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress)); + co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress), defer_unlinking::no); } bool tablet_sstable_streamer::tablet_in_scope(locator::tablet_id tid) const { @@ -547,16 +556,18 @@ future<> tablet_sstable_streamer::stream(shared_ptr progress) { auto tablet_pr = dht::to_partition_range(tablet_range); if (!sstables_partially_contained.empty()) { llog.debug("Streaming {} partially contained SSTables.",sstables_partially_contained.size()); - co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress); + co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress, defer_unlinking::yes); } if (!sstables_fully_contained.empty()) { llog.debug("Streaming {} fully contained SSTables.",sstables_fully_contained.size()); co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), per_tablet_progress); } } + + co_await unlink_marked_sstables(); } -future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector sstables, shared_ptr progress) { +future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector sstables, shared_ptr progress, defer_unlinking defer) { size_t nr_sst_total = sstables.size(); size_t nr_sst_current = 0; @@ -576,14 +587,14 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std:: ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, fmt::join(sst_processed | std::views::transform([] (auto sst) { return sst->get_filename(); }), ", ")); nr_sst_current += sst_processed.size(); - co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed)); + co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed), defer); if (progress) { progress->advance(batch_sst_nr); } } } -future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector sstables) { +future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector sstables, defer_unlinking defer) { const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token)); auto s = _table.schema(); const auto cf_id = s->id(); @@ -666,7 +677,12 @@ future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) { llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}", ops_uuid, s->ks_name(), s->cf_name(), sst->toc_filename()); - return sst->mark_for_deletion(); + if (defer) { + sst->mark_for_deletion(); + return make_ready_future<>(); + } else { + return sst->unlink(); + } }); } catch (...) { failed = true;