/* * Copyright (C) 2018-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "db/view/view_building_worker.hh" #include "sstables/shared_sstable.hh" #include "utils/assert.hh" #include #include #include #include #include #include #include "distributed_loader.hh" #include "replica/database.hh" #include "replica/global_table_ptr.hh" #include "db/config.hh" #include "db/extensions.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" #include "compaction/compaction_manager.hh" #include "compaction/task_manager_module.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "sstables/sstable_directory.hh" #include "auth/common.hh" #include "tracing/trace_keyspace_helper.hh" #include "db/view/view_update_checks.hh" #include "ent/encryption/replicated_key_provider.hh" #include #include "db/view/view_builder.hh" extern logging::logger dblog; static const std::unordered_set system_keyspaces = { db::system_keyspace::NAME, db::schema_tables::NAME, }; bool is_system_keyspace(std::string_view name) { return system_keyspaces.contains(name); } static const std::unordered_set internal_keyspaces = { db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::NAME_EVERYWHERE, db::system_keyspace::NAME, db::schema_tables::NAME, auth::meta::legacy::AUTH_KS, tracing::trace_keyspace_helper::KEYSPACE_NAME, encryption::replicated_key_provider_factory::KSNAME }; bool is_internal_keyspace(std::string_view name) { return internal_keyspaces.contains(name); } namespace replica { static io_error_handler error_handler_for_upload_dir() { return [] (std::exception_ptr eptr) { // do nothing about sstable exception and caller will just rethrow it. }; } io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy) { return error_handler_for_upload_dir(); } future<> distributed_loader::process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags) { co_await dir.invoke_on(0, [flags] (sstables::sstable_directory& d) -> future<> { co_await d.prepare(flags); }); co_await dir.invoke_on_all([&dir, flags] (sstables::sstable_directory& d) -> future<> { // Supposed to be called with the node either down or on behalf of maintenance tasks // like nodetool refresh co_await d.process_sstable_dir(flags); co_await d.move_foreign_sstables(dir); }); co_await dir.invoke_on_all(&sstables::sstable_directory::commit_directory_changes); } future<> distributed_loader::lock_table(global_table_ptr& table, sharded& dir) { return dir.invoke_on_all([&table] (sstables::sstable_directory& d) { d.store_phaser(table->write_in_progress()); return make_ready_future<>(); }); } // Global resharding function. Done in two parts: // - The first part spreads the foreign_sstable_open_info across shards so that all of them are // resharding about the same amount of data // - The second part calls each shard's distributed object to reshard the SSTables they were // assigned. future<> distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) { auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr)); co_await task->done(); } future highest_version_seen(sharded& dir, sstables::sstable_version_types system_version) { using version = sstables::sstable_version_types; return dir.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_version_seen), system_version, [] (version a, version b) { return std::max(a, b); }); } future<> distributed_loader::reshape(sharded& dir, sharded& db, compaction::reshape_mode mode, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function filter) { auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, mode, std::move(creator), std::move(filter)); co_await task->done(); } // Loads SSTables into the main directory (or staging) and returns how many were loaded future distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& vb, sharded& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); co_await dir.do_for_each_sstable([&table, needs_view_update, &new_sstables] (sstables::shared_sstable sst) -> future<> { auto gen = table.calculate_generation_for_new_table(); dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update == db::view::sstable_destination_decision::normal_directory ? "base" : "staging", gen); co_await sst->pick_up_from_upload(needs_view_update == db::view::sstable_destination_decision::normal_directory ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen); // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels. sst->set_sstable_level(0); new_sstables.push_back(std::move(sst)); }); // nothing loaded if (new_sstables.empty()) { co_return 0; } co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) { dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep); abort(); }); if (needs_view_update == db::view::sstable_destination_decision::staging_managed_by_vbc) { co_await vbw.local().register_staging_sstable_tasks(new_sstables, table.schema()->id()); } else if (needs_view_update == db::view::sstable_destination_decision::staging_directly_to_generator) { co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> { if (sst->requires_view_building()) { co_await vb.local().register_staging_sstable(sst, table.shared_from_this()); } }); } co_return new_sstables.size(); } future<> distributed_loader::process_upload_dir(sharded& db, sharded& vb, sharded& vbw, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) { const auto& rs = db.local().find_keyspace(ks).get_replication_strategy(); if (rs.is_per_table()) { on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } return seastar::async([&db, &vb, &vbw, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; directory.start(global_table.as_sharded_parameter(), sstables::sstable_state::upload, &error_handler_gen_for_upload_dir ).get(); auto stop_directory = deferred_stop(directory); lock_table(global_table, directory).get(); sstables::sstable_directory::process_flags flags { .need_mutate_level = true, .enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(), .allow_loading_materialized_view = false, .sstable_open_config = { .ignore_component_digest_mismatch = db.local().get_config().ignore_component_digest_mismatch(), }, }; process_sstable_dir(directory, flags).get(); auto make_sstable = [&] (shard_id shard) { auto& sstm = global_table->get_sstables_manager(); auto& gen = global_table->get_sstable_generation_generator(); auto generation = gen(); return sstm.make_sstable(global_table->schema(), global_table->get_storage_options(), generation, sstables::sstable_state::upload, sstm.get_preferred_sstable_version(), sstables::sstable_format_types::big, db_clock::now(), &error_handler_gen_for_upload_dir); }; // Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction. // Note that needs_cleanup() is inaccurate and may return false positives, // maybe triggering resharding+cleanup unnecessarily for some sstables. // But this is resharding on refresh (sstable loading via upload dir), // which will usually require resharding anyway. // // FIXME: take multiple compaction groups into account // - segregate resharded tables into compaction groups // - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction // so that cleanup can be considered per compaction group const auto& erm = db.local().find_keyspace(ks).get_static_effective_replication_map(); auto owned_ranges_ptr = skip_cleanup ? lw_shared_ptr(nullptr) : compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get()); reshard(directory, db, ks, cf, make_sstable, owned_ranges_ptr).get(); if (!skip_reshape) { reshape(directory, db, compaction::reshape_mode::strict, ks, cf, make_sstable, [] (const sstables::shared_sstable&) { return true; }).get(); } // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. const auto use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get(); size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb, &vbw] (sstables::sstable_directory& dir) { return make_sstables_available(dir, db, vb, vbw, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); }); } future>>> distributed_loader::get_sstables_from_upload_dir(sharded& db, sstring ks, sstring cf, sstables::sstable_open_config cfg) { return get_sstables_from(db, ks, cf, cfg, [] (auto& global_table, auto& directory) { return directory.start(global_table.as_sharded_parameter(), sstables::sstable_state::upload, &error_handler_gen_for_upload_dir ); }); } future>>> distributed_loader::get_sstables_from_object_store(sharded& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function get_abort_src) { return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) { return directory.start(global_table.as_sharded_parameter(), sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] { seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr; auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as); return make_lw_shared(std::move(opts)); }), sstables, &error_handler_gen_for_upload_dir); }); } future>>> distributed_loader::get_sstables_from(sharded& db, sstring ks, sstring cf, sstables::sstable_open_config cfg, noncopyable_function(global_table_ptr&, sharded&)> start_dir) { return seastar::async([&db, ks = std::move(ks), cf = std::move(cf), start_dir = std::move(start_dir), cfg] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); auto table_id = global_table->schema()->id(); sharded directory; start_dir(global_table, directory).get(); auto stop = deferred_stop(directory); std::vector> sstables_on_shards(smp::count); lock_table(global_table, directory).get(); sstables::sstable_directory::process_flags flags { .need_mutate_level = true, .enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(), .allow_loading_materialized_view = false, .sort_sstables_according_to_owner = false, .sstable_open_config = cfg, }; process_sstable_dir(directory, flags).get(); directory.invoke_on_all([&sstables_on_shards] (sstables::sstable_directory& d) mutable { sstables_on_shards[this_shard_id()] = d.get_unsorted_sstables(); }).get(); return std::make_tuple(table_id, std::move(sstables_on_shards)); }); } class table_populator { sharded& _db; sstring _ks; sstring _cf; global_table_ptr& _global_table; std::vector>> _sstable_directories; sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format; public: table_populator(global_table_ptr& ptr, sharded& db, sstring ks, sstring cf) : _db(db) , _ks(std::move(ks)) , _cf(std::move(cf)) , _global_table(ptr) { } ~table_populator() { // All directories must have been stopped // using table_populator::stop() SCYLLA_ASSERT(_sstable_directories.empty()); } future<> start(); future<> stop(); private: using allow_offstrategy_compaction = bool_class; future<> collect_subdirs(); future<> collect_subdirs(const data_dictionary::storage_options::local&, sstables::sstable_state state); future<> collect_subdirs(const data_dictionary::storage_options::object_storage&, sstables::sstable_state state); future<> process_subdir(sharded&); future<> populate_subdir(sharded&); }; future<> table_populator::start() { SCYLLA_ASSERT(this_shard_id() == 0); co_await collect_subdirs(); for (auto dir : _sstable_directories) { co_await process_subdir(*dir); } co_await smp::invoke_on_all([this] { return _global_table->disable_auto_compaction(); }); for (auto dir : _sstable_directories) { co_await populate_subdir(*dir); } } future<> table_populator::stop() { while (!_sstable_directories.empty()) { co_await _sstable_directories.back()->stop(); _sstable_directories.pop_back(); } } future<> table_populator::collect_subdirs(const data_dictionary::storage_options::local& so, sstables::sstable_state state) { auto datadirs = _global_table->get_sstables_manager().get_local_directories(so); co_await coroutine::parallel_for_each(datadirs, [&] (const std::filesystem::path& datadir) -> future<> { auto dptr = make_lw_shared>(); co_await dptr->start(_global_table.as_sharded_parameter(), state, sharded_parameter([datadir] { auto opts = data_dictionary::make_local_options(datadir); return make_lw_shared(std::move(opts)); }), default_io_error_handler_gen()); _sstable_directories.push_back(std::move(dptr)); }); } future<> table_populator::collect_subdirs(const data_dictionary::storage_options::object_storage& so, sstables::sstable_state state) { auto dptr = make_lw_shared>(); co_await dptr->start(_global_table.as_sharded_parameter(), state, default_io_error_handler_gen()); _sstable_directories.push_back(std::move(dptr)); } future<> table_populator::collect_subdirs() { // The table base directory (with sstable_state::normal) must be // loaded and processed first as it now may contain the shared // pending_delete_dir, possibly referring to sstables in sub-directories. for (auto state : { sstables::sstable_state::normal, sstables::sstable_state::staging, sstables::sstable_state::quarantine }) { co_await std::visit([this, state] (const auto& so) -> future<> { co_await collect_subdirs(so, state); }, _global_table->get_storage_options().value); } // directory must be stopped using table_populator::stop below } future<> table_populator::process_subdir(sharded& directory) { co_await distributed_loader::lock_table(_global_table, directory); sstables::sstable_directory::process_flags flags { .throw_on_missing_toc = true, .enable_dangerous_direct_import_of_cassandra_counters = _db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(), .allow_loading_materialized_view = true, .garbage_collect = true, .sstable_open_config = { .ignore_component_digest_mismatch = _db.local().get_config().ignore_component_digest_mismatch(), }, }; co_await distributed_loader::process_sstable_dir(directory, flags); // If we are resharding system tables before we can read them, we will not // know which is the highest format we support: this information is itself stored // in the system tables. In that case we'll rely on what we find on disk: we'll // at least not downgrade any files. If we already know that we support a higher // format than the one we see then we use that. auto sst_version = co_await highest_version_seen(directory, sstables::oldest_writable_sstable_format); _version_for_reshaping = _global_table->get_sstables_manager().get_safe_sstable_version_for_rewrites(sst_version); } sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_state state, sstables::generation_type generation, sstables::sstable_version_types v) { return table.get_sstables_manager().make_sstable(table.schema(), table.get_storage_options(), generation, state, v, sstables::sstable_format_types::big); } future<> table_populator::populate_subdir(sharded& directory) { auto state = directory.local().state(); dblog.debug("Populating {}/{}/{} state={}", _ks, _cf, _global_table->get_storage_options(), state); co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable { auto gen = smp::submit_to(shard, [this] () { return _global_table->calculate_generation_for_new_table(); }).get(); return make_sstable(*_global_table, state, gen, _version_for_reshaping); }); // The node is offline at this point so we are very lenient with what we consider // offstrategy. // SSTables created by repair may not conform to compaction strategy layout goal // because data segregation is only performed by compaction // Instead of reshaping them on boot, let's add them to maintenance set and allow // off-strategy compaction to reshape them. This will allow node to become online // ASAP. Given that SSTables with repair origin are disjoint, they can be efficiently // read from. auto eligible_for_reshape_on_boot = [] (const sstables::shared_sstable& sst) { return sst->get_origin() != sstables::repair_origin; }; co_await distributed_loader::reshape(directory, _db, compaction::reshape_mode::relaxed, _ks, _cf, [this, state](shard_id shard) { auto gen = _global_table->calculate_generation_for_new_table(); return make_sstable(*_global_table, state, gen, _version_for_reshaping); }, eligible_for_reshape_on_boot); auto do_allow_offstrategy_compaction = allow_offstrategy_compaction(state == sstables::sstable_state::normal); co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst)); return _global_table->add_sstable_and_update_cache(sst, requires_offstrategy); }); if (do_allow_offstrategy_compaction) { _global_table->trigger_offstrategy_compaction(); } }); } future<> distributed_loader::populate_keyspace(sharded& db, sharded& sys_ks, keyspace& ks, sstring ks_name) { dblog.info("Populating Keyspace {}", ks_name); co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | std::views::values, [&] (schema_ptr s) -> future<> { auto uuid = s->id(); sstring cfname = s->cf_name(); auto gtable = co_await get_table_on_all_shards(db, ks_name, cfname); auto& cf = *gtable; dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options()); auto metadata = table_populator(gtable, db, ks_name, cfname); std::exception_ptr ex; try { co_await metadata.start(); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = format("Exception while populating keyspace '{}' with column family '{}' from '{}': {}", ks_name, cfname, cf.get_storage_options(), eptr); dblog.error("{}", msg); try { std::rethrow_exception(eptr); } catch (compaction::compaction_stopped_exception& e) { // swallow compaction stopped exception, to allow clean shutdown. } catch (...) { ex = std::make_exception_ptr(std::runtime_error(msg.c_str())); } } co_await metadata.stop(); if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } // system tables are made writable through sys_ks::mark_writable if (!is_system_keyspace(ks_name)) { co_await smp::invoke_on_all([&] { auto s = gtable->schema(); db.local().find_column_family(s).mark_ready_for_writes(db.local().commitlog_for(s)); }); } }); // here we can be sure that the 'truncated' table is loaded, // now we load and initialize the truncation times for the tables { const auto truncation_times = co_await sys_ks.local().load_truncation_times(); co_await smp::invoke_on_all([&] { db.local().get_tables_metadata().for_each_table([&](table_id id, lw_shared_ptr table) { if (table->schema()->ks_name() != ks_name) { return; } const auto it = truncation_times.find(id); const auto truncation_time = it == truncation_times.end() ? db_clock::time_point::min() : it->second; if (this_shard_id() == 0 && table->schema()->ks_name() == db::schema_tables::NAME && truncation_time != db_clock::time_point::min()) { // replay_position stored in the truncation record may belong to // the old (default) commitlog domain. It's not safe to interpret // that replay position in the schema commitlog domain. // Refuse to boot in this case. We assume no one truncated schema tables. // We will hit this during rolling upgrade, in which case the user will // roll back and let us know. throw std::runtime_error(format("Schema table {}.{} has a truncation record. Booting is not safe.", table->schema()->ks_name(), table->schema()->cf_name())); } table->set_truncation_time(truncation_time); }); }); } } future<> distributed_loader::init_system_keyspace(sharded& sys_ks, sharded& erm_factory, sharded& db) { return seastar::async([&sys_ks, &erm_factory, &db] { sys_ks.invoke_on_all([&erm_factory, &db] (auto& sys_ks) { return sys_ks.make(erm_factory.local(), db.local()); }).get(); for (auto ksname : system_keyspaces) { auto& ks = db.local().get_keyspaces(); auto i = ks.find(ksname); if (i != ks.end()) { distributed_loader::populate_keyspace(db, sys_ks, i->second, sstring(ksname)).get(); } } }); } future<> distributed_loader::init_non_system_keyspaces(sharded& db, sharded& proxy, sharded& sys_ks) { return seastar::async([&db, &proxy, &sys_ks] { db.invoke_on_all([&proxy, &sys_ks] (replica::database& db) { return db.parse_system_tables(proxy, sys_ks); }).get(); const auto& cfg = db.local().get_config(); for (bool prio_only : { true, false}) { std::vector> futures; for (auto& ks : db.local().get_keyspaces()) { auto& ks_name = ks.first; if (is_system_keyspace(ks_name)) { continue; } /** * Must process in two phases: Prio and non-prio. * This looks like it is not needed. And it is not * in open-source version. But essential for enterprise. * Do _not_ remove or refactor away. */ if (prio_only != cfg.extensions().is_extension_internal_keyspace(ks_name)) { continue; } futures.emplace_back(distributed_loader::populate_keyspace(db, sys_ks, ks.second, ks_name)); } when_all_succeed(futures.begin(), futures.end()).discard_result().get(); } }); } }