diff --git a/replica/database.hh b/replica/database.hh index 0da1a465bb..d3076bf6f1 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1373,7 +1373,8 @@ public: // Clones storage of a given tablet. Memtable is flushed first to guarantee that the // snapshot (list of sstables) will include all the data written up to the time it was taken. - future> clone_tablet_storage(locator::tablet_id tid); + // If leave_unsealead is set, all the destination sstables will be left unsealed. + future> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed); friend class compaction_group; friend class compaction::compaction_task_impl; diff --git a/replica/table.cc b/replica/table.cc index c9cc732452..f0001db245 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1327,7 +1327,7 @@ future> table::take_sstable_set_ } future> -table::clone_tablet_storage(locator::tablet_id tid) { +table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) { utils::chunked_vector ret; auto holder = async_gate().hold(); @@ -1339,7 +1339,7 @@ table::clone_tablet_storage(locator::tablet_id tid) { // by compaction while we are waiting for the lock. auto deletion_guard = co_await get_sstable_list_permit(); co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> { - ret.push_back(co_await sst->clone(calculate_generation_for_new_table())); + ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed)); }); co_return ret; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 17963dc943..933e03d75e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6406,14 +6406,19 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id leaving.host, pending.host)); } - auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future> { + // All sstables cloned locally will be left unsealed, until they're loaded into the table. + // This is to guarantee no unsplit sstables will be left sealed on disk, which could + // cause problems if unsplit sstables are found after split was ACKed to coordinator. + bool leave_unsealed = true; + + auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future> { auto& table = _db.local().find_column_family(tablet.table); auto op = table.stream_in_progress(); - co_return co_await table.clone_tablet_storage(tablet.tablet); + co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed); }); rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size()); - auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future { + auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future { auto& mng = t.get_sstables_manager(); auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal), d.version, d.format, db_clock::now(), default_io_error_handler_gen()); @@ -6421,7 +6426,8 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id // will still point to leaving replica at this stage in migration. If node goes down, // SSTables will be loaded at pending replica and migration is retried, so correctness // wise, we're good. - auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true }; + auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true, + .unsealed_sstable = leave_unsealed }; co_await sst->load(sharder, cfg); co_return sst; }; @@ -6429,18 +6435,22 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> { // Loads cloned sstables from leaving replica into pending one. auto& table = _db.local().find_column_family(tablet.table); + auto& sstm = table.get_sstables_manager(); auto op = table.stream_in_progress(); dht::auto_refreshing_sharder sharder(table.shared_from_this()); - std::vector ssts; - ssts.reserve(d.size()); + std::unordered_set ssts; for (auto&& sst_desc : d) { - ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc))); + ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc))); } - auto on_add = [] (sstables::shared_sstable loading_sst) -> future<> { + auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> { + if (ssts.contains(loading_sst)) { + auto cfg = sstm.configure_writer(loading_sst->get_origin()); + co_await loading_sst->seal_sstable(cfg.backup); + } co_return; }; - auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(ssts, on_add); + auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add); _view_building_worker.local().load_sstables(tablet.table, loaded_ssts); }); rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 97f6b898bd..a155057e99 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -62,7 +62,10 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, } schema_ptr s = reader.schema(); + // SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't + // left sealed on the table directory. auto cfg = cf->get_sstables_manager().configure_writer(origin); + cfg.leave_unsealed = true; return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, cfg, encoding_stats{}).then([sst] { return sst->open_data(); @@ -71,6 +74,9 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, if (on_sstable_written) { on_sstable_written(loading_sst); } + if (loading_sst == sst) { + co_await loading_sst->seal_sstable(cfg.backup); + } co_return; }; if (offstrategy && sstables::repair_origin == origin) { diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index 1287340a89..a915fa9944 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -52,9 +52,14 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d auto erm = t.get_effective_replication_map(); auto& sstm = t.get_sstables_manager(); auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format); - co_await sst->load(erm->get_sharder(*t.schema())); - auto on_add = [] (sstables::shared_sstable) -> future<> { - return make_ready_future<>(); + sstables::sstable_open_config cfg { .unsealed_sstable = true }; + co_await sst->load(erm->get_sharder(*t.schema()), cfg); + auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> { + if (loading_sst == sst) { + auto cfg = sstm.configure_writer(sst->get_origin()); + co_await loading_sst->seal_sstable(cfg.backup); + } + co_return; }; auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add); blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename()); @@ -346,7 +351,10 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work auto& table = db.find_column_family(meta.table); auto& sstm = table.get_sstables_manager(); - sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables }; + // SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't + // left sealed on the table directory. + sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables, + .leave_unsealed = true }; auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg); auto out = co_await sstable_sink->output(foptions, stream_options); co_return output_result{