streaming: Leave sstables unsealed until attached to the table

We want the invariant that after ACK, all sealed sstables will be split.

This guarantee that on restart, no unsplit sstables will be found
sealed.

The paths that generate unsplit sstables are streaming and file
streaming consumers. It includes intra-node streaming, which
is local but can clone an unsplit sstable into destination.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 48d243f32f)
This commit is contained in:
Raphael S. Carvalho
2025-12-08 15:56:56 -03:00
committed by Botond Dénes
parent 2f1880ce20
commit 2f4e79336a
5 changed files with 41 additions and 16 deletions

View File

@@ -1373,7 +1373,8 @@ public:
// Clones storage of a given tablet. Memtable is flushed first to guarantee that the
// snapshot (list of sstables) will include all the data written up to the time it was taken.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid);
// If leave_unsealead is set, all the destination sstables will be left unsealed.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
friend class compaction_group;
friend class compaction::compaction_task_impl;

View File

@@ -1327,7 +1327,7 @@ future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_
}
future<utils::chunked_vector<sstables::entry_descriptor>>
table::clone_tablet_storage(locator::tablet_id tid) {
table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
utils::chunked_vector<sstables::entry_descriptor> ret;
auto holder = async_gate().hold();
@@ -1339,7 +1339,7 @@ table::clone_tablet_storage(locator::tablet_id tid) {
// by compaction while we are waiting for the lock.
auto deletion_guard = co_await get_sstable_list_permit();
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed));
});
co_return ret;
}

View File

@@ -6406,14 +6406,19 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
leaving.host, pending.host));
}
auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
bool leave_unsealed = true;
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto& table = _db.local().find_column_family(tablet.table);
auto op = table.stream_in_progress();
co_return co_await table.clone_tablet_storage(tablet.tablet);
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
});
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto& mng = t.get_sstables_manager();
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
@@ -6421,7 +6426,8 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
// will still point to leaving replica at this stage in migration. If node goes down,
// SSTables will be loaded at pending replica and migration is retried, so correctness
// wise, we're good.
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true };
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
.unsealed_sstable = leave_unsealed };
co_await sst->load(sharder, cfg);
co_return sst;
};
@@ -6429,18 +6435,22 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
// Loads cloned sstables from leaving replica into pending one.
auto& table = _db.local().find_column_family(tablet.table);
auto& sstm = table.get_sstables_manager();
auto op = table.stream_in_progress();
dht::auto_refreshing_sharder sharder(table.shared_from_this());
std::vector<sstables::shared_sstable> ssts;
ssts.reserve(d.size());
std::unordered_set<sstables::shared_sstable> ssts;
for (auto&& sst_desc : d) {
ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc)));
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
auto on_add = [] (sstables::shared_sstable loading_sst) -> future<> {
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (ssts.contains(loading_sst)) {
auto cfg = sstm.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(ssts, on_add);
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);

View File

@@ -62,7 +62,10 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
}
schema_ptr s = reader.schema();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
auto cfg = cf->get_sstables_manager().configure_writer(origin);
cfg.leave_unsealed = true;
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cfg, encoding_stats{}).then([sst] {
return sst->open_data();
@@ -71,6 +74,9 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
if (on_sstable_written) {
on_sstable_written(loading_sst);
}
if (loading_sst == sst) {
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
if (offstrategy && sstables::repair_origin == origin) {

View File

@@ -52,9 +52,14 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
co_await sst->load(erm->get_sharder(*t.schema()));
auto on_add = [] (sstables::shared_sstable) -> future<> {
return make_ready_future<>();
sstables::sstable_open_config cfg { .unsealed_sstable = true };
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto cfg = sstm.configure_writer(sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
@@ -346,7 +351,10 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables };
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
.leave_unsealed = true };
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{