From 2f4e79336ae84beb4feba38155c2973a81e6dd2f Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 8 Dec 2025 15:56:56 -0300 Subject: [PATCH] streaming: Leave sstables unsealed until attached to the table We want the invariant that after ACK, all sealed sstables will be split. This guarantee that on restart, no unsplit sstables will be found sealed. The paths that generate unsplit sstables are streaming and file streaming consumers. It includes intra-node streaming, which is local but can clone an unsplit sstable into destination. Signed-off-by: Raphael S. Carvalho (cherry picked from commit 48d243f32fcefbd19b55b02a9cba9d99f9e0d99c) --- replica/database.hh | 3 ++- replica/table.cc | 4 ++-- service/storage_service.cc | 28 +++++++++++++++++++--------- streaming/consumer.cc | 6 ++++++ streaming/stream_blob.cc | 16 ++++++++++++---- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 0da1a465bb..d3076bf6f1 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1373,7 +1373,8 @@ public: // Clones storage of a given tablet. Memtable is flushed first to guarantee that the // snapshot (list of sstables) will include all the data written up to the time it was taken. - future> clone_tablet_storage(locator::tablet_id tid); + // If leave_unsealead is set, all the destination sstables will be left unsealed. + future> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed); friend class compaction_group; friend class compaction::compaction_task_impl; diff --git a/replica/table.cc b/replica/table.cc index c9cc732452..f0001db245 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1327,7 +1327,7 @@ future> table::take_sstable_set_ } future> -table::clone_tablet_storage(locator::tablet_id tid) { +table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) { utils::chunked_vector ret; auto holder = async_gate().hold(); @@ -1339,7 +1339,7 @@ table::clone_tablet_storage(locator::tablet_id tid) { // by compaction while we are waiting for the lock. auto deletion_guard = co_await get_sstable_list_permit(); co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> { - ret.push_back(co_await sst->clone(calculate_generation_for_new_table())); + ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed)); }); co_return ret; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 17963dc943..933e03d75e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6406,14 +6406,19 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id leaving.host, pending.host)); } - auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future> { + // All sstables cloned locally will be left unsealed, until they're loaded into the table. + // This is to guarantee no unsplit sstables will be left sealed on disk, which could + // cause problems if unsplit sstables are found after split was ACKed to coordinator. + bool leave_unsealed = true; + + auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future> { auto& table = _db.local().find_column_family(tablet.table); auto op = table.stream_in_progress(); - co_return co_await table.clone_tablet_storage(tablet.tablet); + co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed); }); rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size()); - auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future { + auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future { auto& mng = t.get_sstables_manager(); auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal), d.version, d.format, db_clock::now(), default_io_error_handler_gen()); @@ -6421,7 +6426,8 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id // will still point to leaving replica at this stage in migration. If node goes down, // SSTables will be loaded at pending replica and migration is retried, so correctness // wise, we're good. - auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true }; + auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true, + .unsealed_sstable = leave_unsealed }; co_await sst->load(sharder, cfg); co_return sst; }; @@ -6429,18 +6435,22 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> { // Loads cloned sstables from leaving replica into pending one. auto& table = _db.local().find_column_family(tablet.table); + auto& sstm = table.get_sstables_manager(); auto op = table.stream_in_progress(); dht::auto_refreshing_sharder sharder(table.shared_from_this()); - std::vector ssts; - ssts.reserve(d.size()); + std::unordered_set ssts; for (auto&& sst_desc : d) { - ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc))); + ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc))); } - auto on_add = [] (sstables::shared_sstable loading_sst) -> future<> { + auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> { + if (ssts.contains(loading_sst)) { + auto cfg = sstm.configure_writer(loading_sst->get_origin()); + co_await loading_sst->seal_sstable(cfg.backup); + } co_return; }; - auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(ssts, on_add); + auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add); _view_building_worker.local().load_sstables(tablet.table, loaded_ssts); }); rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 97f6b898bd..a155057e99 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -62,7 +62,10 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, } schema_ptr s = reader.schema(); + // SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't + // left sealed on the table directory. auto cfg = cf->get_sstables_manager().configure_writer(origin); + cfg.leave_unsealed = true; return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, cfg, encoding_stats{}).then([sst] { return sst->open_data(); @@ -71,6 +74,9 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, if (on_sstable_written) { on_sstable_written(loading_sst); } + if (loading_sst == sst) { + co_await loading_sst->seal_sstable(cfg.backup); + } co_return; }; if (offstrategy && sstables::repair_origin == origin) { diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index 1287340a89..a915fa9944 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -52,9 +52,14 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d auto erm = t.get_effective_replication_map(); auto& sstm = t.get_sstables_manager(); auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format); - co_await sst->load(erm->get_sharder(*t.schema())); - auto on_add = [] (sstables::shared_sstable) -> future<> { - return make_ready_future<>(); + sstables::sstable_open_config cfg { .unsealed_sstable = true }; + co_await sst->load(erm->get_sharder(*t.schema()), cfg); + auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> { + if (loading_sst == sst) { + auto cfg = sstm.configure_writer(sst->get_origin()); + co_await loading_sst->seal_sstable(cfg.backup); + } + co_return; }; auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add); blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename()); @@ -346,7 +351,10 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work auto& table = db.find_column_family(meta.table); auto& sstm = table.get_sstables_manager(); - sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables }; + // SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't + // left sealed on the table directory. + sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables, + .leave_unsealed = true }; auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg); auto out = co_await sstable_sink->output(foptions, stream_options); co_return output_result{