sstables_loader: synchronously unlink streamed sstables before returning

mark_for_deletion() only set an in-memory flag; the actual file
deletion ran lazily when the last shared_sstable reference dropped,
leaving a window in which a follow-up scan of the upload directory
(e.g. a second 'nodetool refresh --load-and-stream') could observe a
partially-deleted sstable and fail with malformed_sstable_exception.

Force the unlink to complete before stream() returns. For tablet
streaming, partially-contained sstables span multiple per-tablet
batches, so a defer_unlinking flag postpones the unlink until after
all sstables are streamed; for vnodes and fully-contained sstables are streamed
only once and could be removed just after being streamed.

Added a FIXME on object_storage_base::wipe and strengthened the doc on storage::wipe to
make the never-fails contract explicit

(cherry picked from commit 784127c40b)
This commit is contained in:
Taras Veretilnyk
2026-04-21 22:41:10 +02:00
committed by scylladbbot
parent fae12d069e
commit ca9abcdcbc
3 changed files with 31 additions and 9 deletions

View File

@@ -847,6 +847,10 @@ future<> object_storage_base::change_state(const sstable& sst, sstable_state sta
}
future<> object_storage_base::wipe(const sstable& sst, sync_dir) noexcept {
// FIXME: unlike filesystem_storage::wipe, this implementation does not
// catch exceptions from delete_object / sstables_registry calls and may
// return an exceptional future, breaking the contract documented on
// storage::wipe.
auto& sstables_registry = sst.manager().sstables_registry();
co_await sstables_registry.update_entry_status(owner(), sst.generation(), status_removing);

View File

@@ -109,6 +109,8 @@ public:
virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0;
// runs in async context
virtual void open(sstable& sst) = 0;
// Must never return an exceptional future: implementations are expected
// to catch and log any errors internally.
virtual future<> wipe(const sstable& sst, sync_dir) noexcept = 0;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) = 0;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) = 0;

View File

@@ -134,6 +134,7 @@ public:
using primary_replica_only = bool_class<struct primary_replica_only_tag>;
using unlink_sstables = bool_class<struct unlink_sstables_tag>;
using defer_unlinking = bool_class<struct defer_unlinking_tag>;
class sstable_streamer {
protected:
@@ -173,10 +174,10 @@ public:
virtual future<> stream(shared_ptr<stream_progress> progress);
host_id_vector_replica_set get_endpoints(const dht::token& token) const;
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>);
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>, defer_unlinking defer);
protected:
virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const;
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, shared_ptr<stream_progress> progress);
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, shared_ptr<stream_progress> progress, defer_unlinking defer);
private:
host_id_vector_replica_set get_all_endpoints(const dht::token& token) const;
};
@@ -236,7 +237,7 @@ private:
future<>
stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
if (_stream_scope != stream_scope::node) {
co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress));
co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress), defer_unlinking::no);
}
llog.debug("Directly downloading {} fully contained SSTables to local node from object storage.", sstables.size());
auto downloaded_ssts = co_await download_fully_contained_sstables(std::move(sstables));
@@ -349,6 +350,14 @@ private:
co_return downloaded_sstables;
}
future<> unlink_marked_sstables() {
co_await coroutine::parallel_for_each(_sstables, [] (sstables::shared_sstable& sst) -> future<> {
if (sst->marked_for_deletion()) {
co_await sst->unlink();
}
});
}
bool tablet_in_scope(locator::tablet_id) const;
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
@@ -409,7 +418,7 @@ future<> sstable_streamer::stream(shared_ptr<stream_progress> progress) {
}
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress));
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress), defer_unlinking::no);
}
bool tablet_sstable_streamer::tablet_in_scope(locator::tablet_id tid) const {
@@ -547,16 +556,18 @@ future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
auto tablet_pr = dht::to_partition_range(tablet_range);
if (!sstables_partially_contained.empty()) {
llog.debug("Streaming {} partially contained SSTables.",sstables_partially_contained.size());
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress);
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress, defer_unlinking::yes);
}
if (!sstables_fully_contained.empty()) {
llog.debug("Streaming {} fully contained SSTables.",sstables_fully_contained.size());
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), per_tablet_progress);
}
}
co_await unlink_marked_sstables();
}
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress, defer_unlinking defer) {
size_t nr_sst_total = sstables.size();
size_t nr_sst_current = 0;
@@ -576,14 +587,14 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total,
fmt::join(sst_processed | std::views::transform([] (auto sst) { return sst->get_filename(); }), ", "));
nr_sst_current += sst_processed.size();
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed));
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed), defer);
if (progress) {
progress->advance(batch_sst_nr);
}
}
}
future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, defer_unlinking defer) {
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
auto s = _table.schema();
const auto cf_id = s->id();
@@ -666,7 +677,12 @@ future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid,
co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) {
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
ops_uuid, s->ks_name(), s->cf_name(), sst->toc_filename());
return sst->mark_for_deletion();
if (defer) {
sst->mark_for_deletion();
return make_ready_future<>();
} else {
return sst->unlink();
}
});
} catch (...) {
failed = true;