/* * Modified by ScyllaDB * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include #include #include #include #include #include #include #include #include #include "system_keyspace.hh" #include "cql3/untyped_result_set.hh" #include "cql3/query_processor.hh" #include "locator/host_id.hh" #include "locator/tablets.hh" #include "partition_slice_builder.hh" #include "db/config.hh" #include "gms/feature_service.hh" #include "system_keyspace_view_types.hh" #include "schema/schema_builder.hh" #include "mutation/timestamp.hh" #include "utils/assert.hh" #include "utils/hashers.hh" #include "utils/log.hh" #include #include "gms/inet_address.hh" #include "message/messaging_service.hh" #include "mutation_query.hh" #include "db/timeout_clock.hh" #include "sstables/sstables.hh" #include "db/schema_tables.hh" #include "gms/generation-number.hh" #include "service/storage_service.hh" #include "service/storage_proxy.hh" #include "service/paxos/paxos_state.hh" #include "query/query-result-set.hh" #include "idl/frozen_mutation.dist.hh" #include "idl/frozen_mutation.dist.impl.hh" #include "service/topology_state_machine.hh" #include "sstables/generation_type.hh" #include "cdc/generation.hh" #include "replica/tablets.hh" #include "replica/query.hh" #include "types/types.hh" #include "service/raft/raft_group0_client.hh" #include "message/shared_dict.hh" #include "replica/database.hh" #include "db/compaction_history_entry.hh" #include using days = std::chrono::duration>; static thread_local auto sstableinfo_type = user_type_impl::get_instance( "system", "sstableinfo", {"generation", "origin", "size"}, {uuid_type, utf8_type, long_type}, false); namespace db { namespace { const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) { // tables in the "system" keyspace which need to use null sharder static const std::unordered_set tables = { // empty }; if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) { props.use_null_sharder = true; } }); const auto set_wait_for_sync_to_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) { static const std::unordered_set tables = { system_keyspace::PAXOS, }; if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) { props.wait_for_sync_to_commitlog = true; } }); const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) { static const std::unordered_set tables = { schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY, system_keyspace::BROADCAST_KV_STORE, system_keyspace::CDC_GENERATIONS_V3, system_keyspace::RAFT, system_keyspace::RAFT_SNAPSHOTS, system_keyspace::RAFT_SNAPSHOT_CONFIG, system_keyspace::GROUP0_HISTORY, system_keyspace::DISCOVERY, system_keyspace::TABLETS, system_keyspace::TOPOLOGY, system_keyspace::TOPOLOGY_REQUESTS, system_keyspace::LOCAL, system_keyspace::PEERS, system_keyspace::SCYLLA_LOCAL, system_keyspace::COMMITLOG_CLEANUPS, system_keyspace::SERVICE_LEVELS_V2, system_keyspace::VIEW_BUILD_STATUS_V2, system_keyspace::CDC_STREAMS_STATE, system_keyspace::CDC_STREAMS_HISTORY, system_keyspace::ROLES, system_keyspace::ROLE_MEMBERS, system_keyspace::ROLE_ATTRIBUTES, system_keyspace::ROLE_PERMISSIONS, system_keyspace::v3::CDC_LOCAL, system_keyspace::DICTS, system_keyspace::VIEW_BUILDING_TASKS, system_keyspace::CLIENT_ROUTES, }; if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) { props.enable_schema_commitlog(); } }); const auto set_group0_table_options = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) { static const std::unordered_set tables = { // scylla_local may store a replicated tombstone related to schema // (see `make_group0_schema_version_mutation`), so we include it in the group0 tables list. system_keyspace::SCYLLA_LOCAL, system_keyspace::TOPOLOGY, system_keyspace::TOPOLOGY_REQUESTS, system_keyspace::CDC_GENERATIONS_V3, system_keyspace::TABLETS, system_keyspace::SERVICE_LEVELS_V2, system_keyspace::VIEW_BUILD_STATUS_V2, system_keyspace::CDC_STREAMS_STATE, system_keyspace::CDC_STREAMS_HISTORY, // auth tables system_keyspace::ROLES, system_keyspace::ROLE_MEMBERS, system_keyspace::ROLE_ATTRIBUTES, system_keyspace::ROLE_PERMISSIONS, system_keyspace::DICTS, system_keyspace::VIEW_BUILDING_TASKS, system_keyspace::CLIENT_ROUTES, }; if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) { props.is_group0_table = true; } }); } static logging::logger slogger("system_keyspace"); static const api::timestamp_type creation_timestamp = api::new_timestamp(); api::timestamp_type system_keyspace::schema_creation_timestamp() { return creation_timestamp; } // Currently, the type variables (uuid_type, etc.) are thread-local reference- // counted shared pointers. This forces us to also make the built in schemas // below thread-local as well. // We return schema_ptr, not schema&, because that's the "tradition" in our // other code. // We hide the thread_local variable inside a function, because if we later // we remove the thread_local, we'll start having initialization order // problems (we need the type variables to be constructed first), and using // functions will solve this problem. So we use functions right now. schema_ptr system_keyspace::hints() { static thread_local auto hints = [] { schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS, // partition key {{"target_id", uuid_type}}, // clustering key {{"hint_id", timeuuid_type}, {"message_version", int32_type}}, // regular columns {{"mutation", bytes_type}}, // static columns {}, // regular column name type utf8_type, // comment "hints awaiting delivery" ); builder.set_gc_grace_seconds(0); builder.set_compaction_strategy_options({{ "enabled", "false" }}); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::yes); }(); return hints; } schema_ptr system_keyspace::batchlog() { static thread_local auto batchlog = [] { schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG, // partition key {{"id", uuid_type}}, // clustering key {}, // regular columns {{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}}, // static columns {}, // regular column name type utf8_type, // comment "batches awaiting replay" // FIXME: the original Java code also had: // operations on resulting CFMetaData: // .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2")) ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return batchlog; } schema_ptr system_keyspace::batchlog_v2() { static thread_local auto batchlog_v2 = [] { schema_builder builder(generate_legacy_id(NAME, BATCHLOG_V2), NAME, BATCHLOG_V2, // partition key {{"version", int32_type}, {"stage", byte_type}, {"shard", int32_type}}, // clustering key {{"written_at", timestamp_type}, {"id", uuid_type}}, // regular columns {{"data", bytes_type}}, // static columns {}, // regular column name type utf8_type, // comment "batches awaiting replay" ); builder.set_gc_grace_seconds(0); builder.set_caching_options(caching_options::get_disabled_caching_options()); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return batchlog_v2; } /*static*/ schema_ptr system_keyspace::paxos() { static thread_local auto paxos = [] { // FIXME: switch to the new schema_builder interface (with_column(...), etc) schema_builder builder(generate_legacy_id(NAME, PAXOS), NAME, PAXOS, // partition key {{"row_key", bytes_type}}, // byte representation of a row key that hashes to the same token as original // clustering key {{"cf_id", uuid_type}}, // regular columns { {"promise", timeuuid_type}, {"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl {"most_recent_commit_at", timeuuid_type}, {"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl {"proposal_ballot", timeuuid_type}, }, // static columns {}, // regular column name type utf8_type, // comment "in-progress paxos proposals" // FIXME: the original Java code also had: // operations on resulting CFMetaData: // .compactionStrategyClass(LeveledCompactionStrategy.class); ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return paxos; } thread_local data_type cdc_generation_ts_id_type = tuple_type_impl::get_instance({timestamp_type, timeuuid_type}); schema_ptr system_keyspace::topology() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, TOPOLOGY); return schema_builder(NAME, TOPOLOGY, std::optional(id)) .with_column("key", utf8_type, column_kind::partition_key) .with_column("host_id", uuid_type, column_kind::clustering_key) .with_column("datacenter", utf8_type) .with_column("rack", utf8_type) .with_column("tokens", set_type_impl::get_instance(utf8_type, true)) .with_column("node_state", utf8_type) .with_column("release_version", utf8_type) .with_column("topology_request", utf8_type) .with_column("replaced_id", uuid_type) .with_column("rebuild_option", utf8_type) .with_column("num_tokens", int32_type) .with_column("tokens_string", utf8_type) .with_column("shard_count", int32_type) .with_column("ignore_msb", int32_type) .with_column("cleanup_status", utf8_type) .with_column("supported_features", set_type_impl::get_instance(utf8_type, true)) .with_column("request_id", timeuuid_type) .with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column) .with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column) .with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column) // deprecated .with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column) // deprecated .with_column("version", long_type, column_kind::static_column) .with_column("fence_version", long_type, column_kind::static_column) .with_column("transition_state", utf8_type, column_kind::static_column) .with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column) .with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column) .with_column("global_topology_request", utf8_type, column_kind::static_column) .with_column("global_topology_request_id", timeuuid_type, column_kind::static_column) .with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column) .with_column("session", uuid_type, column_kind::static_column) .with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column) .with_column("upgrade_state", utf8_type, column_kind::static_column) .with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column) .with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column) .set_comment("Current state of topology change machine") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::topology_requests() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, TOPOLOGY_REQUESTS); return schema_builder(NAME, TOPOLOGY_REQUESTS, std::optional(id)) .with_column("id", timeuuid_type, column_kind::partition_key) .with_column("initiating_host", uuid_type) .with_column("request_type", utf8_type) .with_column("start_time", timestamp_type) .with_column("done", boolean_type) .with_column("error", utf8_type) .with_column("end_time", timestamp_type) .with_column("truncate_table_id", uuid_type) .with_column("new_keyspace_rf_change_ks_name", utf8_type) .with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false)) .set_comment("Topology request tracking") .with_hash_version() .build(); }(); return schema; } extern thread_local data_type cdc_streams_set_type; /* An internal table used by nodes to store CDC generation data. * Written to by Raft Group 0. */ schema_ptr system_keyspace::cdc_generations_v3() { thread_local auto schema = [] { auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3); return schema_builder(NAME, CDC_GENERATIONS_V3, {id}) /* This is a single-partition table with key 'cdc_generations'. */ .with_column("key", utf8_type, column_kind::partition_key) /* The unique identifier of this generation. */ .with_column("id", timeuuid_type, column_kind::clustering_key) /* The generation describes a mapping from all tokens in the token ring to a set of stream IDs. * This mapping is built from a bunch of smaller mappings, each describing how tokens in a * subrange of the token ring are mapped to stream IDs; these subranges together cover the entire * token ring. Each such range-local mapping is represented by a row of this table. The second * column of the clustering key of the row is the end of the range being described by this row. * The start of this range is the range_end of the previous row (in the clustering order, which * is the integer order) or of the last row with the same id value if this is the first row with * such id. */ .with_column("range_end", long_type, column_kind::clustering_key) /* The set of streams mapped to in this range. The number of streams mapped to a single range in * a CDC generation is bounded from above by the number of shards on the owner of that range in * the token ring. In other words, the number of elements of this set is bounded by the maximum * of the number of shards over all nodes. The serialized size is obtained by counting about 20B * for each stream. For example, if all nodes in the cluster have at most 128 shards, the * serialized size of this set will be bounded by ~2.5 KB. */ .with_column("streams", cdc_streams_set_type) /* The value of the `ignore_msb` sharding parameter of the node which was the owner of this token * range when the generation was first created. Together with the set of streams above it fully * describes the mapping for this particular range. */ .with_column("ignore_msb", byte_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::cdc_streams_state() { thread_local auto schema = [] { auto id = generate_legacy_id(NAME, CDC_STREAMS_STATE); return schema_builder(NAME, CDC_STREAMS_STATE, {id}) .with_column("table_id", uuid_type, column_kind::partition_key) .with_column("last_token", long_type, column_kind::clustering_key) .with_column("stream_id", bytes_type) .with_column("timestamp", timestamp_type, column_kind::static_column) .set_comment("Oldest CDC stream set for tablets-based tables") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::cdc_streams_history() { thread_local auto schema = [] { auto id = generate_legacy_id(NAME, CDC_STREAMS_HISTORY); return schema_builder(NAME, CDC_STREAMS_HISTORY, {id}) .with_column("table_id", uuid_type, column_kind::partition_key) .with_column("timestamp", timestamp_type, column_kind::clustering_key) .with_column("stream_state", byte_type, column_kind::clustering_key) .with_column("last_token", long_type, column_kind::clustering_key) .with_column("stream_id", bytes_type) .set_comment("CDC stream sets for tablets-based tables described as differences from the previous state") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::raft() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, RAFT); return schema_builder(NAME, RAFT, std::optional(id)) .with_column("group_id", timeuuid_type, column_kind::partition_key) // raft log part .with_column("index", long_type, column_kind::clustering_key) .with_column("term", long_type) .with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant // persisted term and vote .with_column("vote_term", long_type, column_kind::static_column) .with_column("vote", uuid_type, column_kind::static_column) // id of the most recent persisted snapshot .with_column("snapshot_id", uuid_type, column_kind::static_column) .with_column("commit_idx", long_type, column_kind::static_column) .set_comment("Persisted RAFT log, votes and snapshot info") .with_hash_version() .set_caching_options(caching_options::get_disabled_caching_options()) .build(); }(); return schema; } // Note that this table does not include actula user snapshot data since it's dependent // on user-provided state machine and could be stored anywhere else in any other form. // This should be seen as a snapshot descriptor, instead. schema_ptr system_keyspace::raft_snapshots() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, RAFT_SNAPSHOTS); return schema_builder(NAME, RAFT_SNAPSHOTS, std::optional(id)) .with_column("group_id", timeuuid_type, column_kind::partition_key) .with_column("snapshot_id", uuid_type) // Index and term of last entry in the snapshot .with_column("idx", long_type) .with_column("term", long_type) .set_comment("Persisted RAFT snapshot descriptors info") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::raft_snapshot_config() { static thread_local auto schema = [] { auto id = generate_legacy_id(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG); return schema_builder(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG, std::optional(id)) .with_column("group_id", timeuuid_type, column_kind::partition_key) .with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS' .with_column("server_id", uuid_type, column_kind::clustering_key) .with_column("can_vote", boolean_type) .set_comment("RAFT configuration for the latest snapshot descriptor") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::repair_history() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, REPAIR_HISTORY); return schema_builder(NAME, REPAIR_HISTORY, std::optional(id)) .with_column("table_uuid", uuid_type, column_kind::partition_key) // The time is repair start time .with_column("repair_time", timestamp_type, column_kind::clustering_key) .with_column("repair_uuid", uuid_type, column_kind::clustering_key) // The token range is (range_start, range_end] .with_column("range_start", long_type, column_kind::clustering_key) .with_column("range_end", long_type, column_kind::clustering_key) .with_column("keyspace_name", utf8_type, column_kind::static_column) .with_column("table_name", utf8_type, column_kind::static_column) .set_comment("Record repair history") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::built_indexes() { static thread_local auto built_indexes = [] { schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES, // partition key {{"table_name", utf8_type}}, // table_name here is the name of the keyspace - don't be fooled // clustering key {{"index_name", utf8_type}}, // regular columns {}, // static columns {}, // regular column name type utf8_type, // comment "built column indexes" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::yes); }(); return built_indexes; } /*static*/ schema_ptr system_keyspace::local() { static thread_local auto local = [] { schema_builder builder(generate_legacy_id(NAME, LOCAL), NAME, LOCAL, // partition key {{"key", utf8_type}}, // clustering key {}, // regular columns { {"bootstrapped", utf8_type}, {"cluster_name", utf8_type}, {"cql_version", utf8_type}, {"data_center", utf8_type}, {"gossip_generation", int32_type}, {"host_id", uuid_type}, {"native_protocol_version", utf8_type}, {"partitioner", utf8_type}, {"rack", utf8_type}, {"release_version", utf8_type}, {"schema_version", uuid_type}, {"thrift_version", utf8_type}, {"tokens", set_type_impl::get_instance(utf8_type, true)}, {"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, true)}, // The following 3 columns are only present up until 2.1.8 tables {"rpc_address", inet_addr_type}, {"broadcast_address", inet_addr_type}, {"listen_address", inet_addr_type}, // This column represents advertised local features (i.e. the features // advertised by the node via gossip after passing the feature check // against remote features in the cluster) {"supported_features", utf8_type}, {"scylla_cpu_sharding_algorithm", utf8_type}, {"scylla_nr_shards", int32_type}, {"scylla_msb_ignore", int32_type}, }, // static columns {}, // regular column name type utf8_type, // comment "information about the local node" ); builder.set_gc_grace_seconds(0); auto drop_timestamp = api::max_timestamp; builder.remove_column("scylla_cpu_sharding_algorithm", drop_timestamp); builder.remove_column("scylla_nr_shards", drop_timestamp); builder.remove_column("scylla_msb_ignore", drop_timestamp); builder.remove_column("thrift_version", drop_timestamp); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return local; } /*static*/ schema_ptr system_keyspace::peers() { static thread_local auto peers = [] { schema_builder builder(generate_legacy_id(NAME, PEERS), NAME, PEERS, // partition key {{"peer", inet_addr_type}}, // clustering key {}, // regular columns { {"data_center", utf8_type}, {"host_id", uuid_type}, {"preferred_ip", inet_addr_type}, {"rack", utf8_type}, {"release_version", utf8_type}, {"rpc_address", inet_addr_type}, {"schema_version", uuid_type}, {"tokens", set_type_impl::get_instance(utf8_type, true)}, {"supported_features", utf8_type}, }, // static columns {}, // regular column name type utf8_type, // comment "information about known peers in the cluster" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return peers; } /*static*/ schema_ptr system_keyspace::peer_events() { static thread_local auto peer_events = [] { schema_builder builder(generate_legacy_id(NAME, PEER_EVENTS), NAME, PEER_EVENTS, // partition key {{"peer", inet_addr_type}}, // clustering key {}, // regular columns { {"hints_dropped", map_type_impl::get_instance(uuid_type, int32_type, true)}, }, // static columns {}, // regular column name type utf8_type, // comment "events related to peers" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return peer_events; } /*static*/ schema_ptr system_keyspace::range_xfers() { static thread_local auto range_xfers = [] { schema_builder builder(generate_legacy_id(NAME, RANGE_XFERS), NAME, RANGE_XFERS, // partition key {{"token_bytes", bytes_type}}, // clustering key {}, // regular columns {{"requested_at", timestamp_type}}, // static columns {}, // regular column name type utf8_type, // comment "ranges requested for transfer" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return range_xfers; } /*static*/ schema_ptr system_keyspace::compactions_in_progress() { static thread_local auto compactions_in_progress = [] { schema_builder builder(generate_legacy_id(NAME, COMPACTIONS_IN_PROGRESS), NAME, COMPACTIONS_IN_PROGRESS, // partition key {{"id", uuid_type}}, // clustering key {}, // regular columns { {"columnfamily_name", utf8_type}, {"inputs", set_type_impl::get_instance(int32_type, true)}, {"keyspace_name", utf8_type}, }, // static columns {}, // regular column name type utf8_type, // comment "unfinished compactions" ); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return compactions_in_progress; } /*static*/ schema_ptr system_keyspace::compaction_history() { static thread_local auto compaction_history = [] { schema_builder builder(generate_legacy_id(NAME, COMPACTION_HISTORY), NAME, COMPACTION_HISTORY, // partition key {{"id", uuid_type}}, // clustering key {}, // regular columns { {"bytes_in", long_type}, {"bytes_out", long_type}, {"columnfamily_name", utf8_type}, {"started_at", timestamp_type}, {"compacted_at", timestamp_type}, {"compaction_type", utf8_type}, {"keyspace_name", utf8_type}, {"rows_merged", map_type_impl::get_instance(int32_type, long_type, true)}, {"shard_id", int32_type}, {"sstables_in", list_type_impl::get_instance(sstableinfo_type, false)}, {"sstables_out", list_type_impl::get_instance(sstableinfo_type, false)}, {"total_tombstone_purge_attempt", long_type}, {"total_tombstone_purge_failure_due_to_overlapping_with_memtable", long_type}, {"total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable", long_type}, }, // static columns {}, // regular column name type utf8_type, // comment "week-long compaction history" ); builder.set_default_time_to_live(std::chrono::duration_cast(days(7))); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return compaction_history; } /*static*/ schema_ptr system_keyspace::sstable_activity() { static thread_local auto sstable_activity = [] { schema_builder builder(generate_legacy_id(NAME, SSTABLE_ACTIVITY), NAME, SSTABLE_ACTIVITY, // partition key { {"keyspace_name", utf8_type}, {"columnfamily_name", utf8_type}, {"generation", int32_type}, }, // clustering key {}, // regular columns { {"rate_120m", double_type}, {"rate_15m", double_type}, }, // static columns {}, // regular column name type utf8_type, // comment "historic sstable read rates" ); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return sstable_activity; } schema_ptr system_keyspace::size_estimates() { static thread_local auto size_estimates = [] { schema_builder builder(generate_legacy_id(NAME, SIZE_ESTIMATES), NAME, SIZE_ESTIMATES, // partition key {{"keyspace_name", utf8_type}}, // clustering key {{"table_name", utf8_type}, {"range_start", utf8_type}, {"range_end", utf8_type}}, // regular columns { {"mean_partition_size", long_type}, {"partitions_count", long_type}, }, // static columns {}, // regular column name type utf8_type, // comment "per-table primary range size estimates" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return size_estimates; } /*static*/ schema_ptr system_keyspace::large_partitions() { static thread_local auto large_partitions = [] { schema_builder builder(generate_legacy_id(NAME, LARGE_PARTITIONS), NAME, LARGE_PARTITIONS, // partition key {{"keyspace_name", utf8_type}, {"table_name", utf8_type}}, // clustering key { {"sstable_name", utf8_type}, {"partition_size", reversed_type_impl::get_instance(long_type)}, {"partition_key", utf8_type} }, // CLUSTERING ORDER BY (partition_size DESC) // regular columns { {"rows", long_type}, {"compaction_time", timestamp_type}, {"range_tombstones", long_type}, {"dead_rows", long_type} }, // static columns {}, // regular column name type utf8_type, // comment "partitions larger than specified threshold" ); builder.set_gc_grace_seconds(0); builder.set_caching_options(caching_options::get_disabled_caching_options()); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return large_partitions; } schema_ptr system_keyspace::large_rows() { static thread_local auto large_rows = [] { auto id = generate_legacy_id(NAME, LARGE_ROWS); return schema_builder(NAME, LARGE_ROWS, std::optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::partition_key) .with_column("sstable_name", utf8_type, column_kind::clustering_key) // We want the large rows first, so use reversed_type_impl .with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) .with_column("partition_key", utf8_type, column_kind::clustering_key) .with_column("clustering_key", utf8_type, column_kind::clustering_key) .with_column("compaction_time", timestamp_type) .set_comment("rows larger than specified threshold") .set_gc_grace_seconds(0) .set_caching_options(caching_options::get_disabled_caching_options()) .with_hash_version() .build(); }(); return large_rows; } schema_ptr system_keyspace::large_cells() { static thread_local auto large_cells = [] { auto id = generate_legacy_id(NAME, LARGE_CELLS); return schema_builder(NAME, LARGE_CELLS, id) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::partition_key) .with_column("sstable_name", utf8_type, column_kind::clustering_key) // We want the larger cells first, so use reversed_type_impl .with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) .with_column("partition_key", utf8_type, column_kind::clustering_key) .with_column("clustering_key", utf8_type, column_kind::clustering_key) .with_column("column_name", utf8_type, column_kind::clustering_key) // regular rows .with_column("collection_elements", long_type) .with_column("compaction_time", timestamp_type) .set_comment("cells larger than specified threshold") .set_gc_grace_seconds(0) .set_caching_options(caching_options::get_disabled_caching_options()) .with_hash_version() .build(); }(); return large_cells; } schema_ptr system_keyspace::corrupt_data() { static thread_local auto corrupt_data = [] { auto id = generate_legacy_id(NAME, CORRUPT_DATA); return schema_builder(NAME, CORRUPT_DATA, id) // partition key .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::partition_key) // clustering key .with_column("id", timeuuid_type, column_kind::clustering_key) // regular rows // Storing keys as bytes: having a corrupt key might be the reason // to record the row as corrupt, so we just dump what we have and // leave interpreting to the lucky person investigating the disaster. .with_column("partition_key", bytes_type) .with_column("clustering_key", bytes_type) // Note: mutation-fragment v2 .with_column("mutation_fragment_kind", utf8_type) .with_column("frozen_mutation_fragment", bytes_type) .with_column("origin", utf8_type) .with_column("sstable_name", utf8_type) // options .set_comment("mutation-fragments found to be corrupted") .set_gc_grace_seconds(0) .with_hash_version() .build(); }(); return corrupt_data; } /*static*/ schema_ptr system_keyspace::scylla_local() { static thread_local auto scylla_local = [] { schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL, // partition key {{"key", utf8_type}}, // clustering key {}, // regular columns { {"value", utf8_type}, }, // static columns {}, // regular column name type utf8_type, // comment "Scylla specific information about the local node" ); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return scylla_local; } schema_ptr system_keyspace::v3::batches() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, BATCHES), NAME, BATCHES, // partition key {{"id", timeuuid_type}}, // clustering key {}, // regular columns {{"mutations", list_type_impl::get_instance(bytes_type, true)}, {"version", int32_type}}, // static columns {}, // regular column name type utf8_type, // comment "batches awaiting replay" ); builder.set_gc_grace_seconds(0); // FIXME: the original Java code also had: //.copy(new LocalPartitioner(TimeUUIDType.instance)) builder.set_gc_grace_seconds(0); builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental); builder.set_compaction_strategy_options({{"min_threshold", "2"}}); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return schema; } schema_ptr system_keyspace::v3::built_indexes() { // identical to ours, but ours otoh is a mix-in of the 3.x series cassandra one return db::system_keyspace::built_indexes(); } schema_ptr system_keyspace::v3::local() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, LOCAL), NAME, LOCAL, // partition key {{"key", utf8_type}}, // clustering key {}, // regular columns { {"bootstrapped", utf8_type}, {"broadcast_address", inet_addr_type}, {"cluster_name", utf8_type}, {"cql_version", utf8_type}, {"data_center", utf8_type}, {"gossip_generation", int32_type}, {"host_id", uuid_type}, {"listen_address", inet_addr_type}, {"native_protocol_version", utf8_type}, {"partitioner", utf8_type}, {"rack", utf8_type}, {"release_version", utf8_type}, {"rpc_address", inet_addr_type}, {"schema_version", uuid_type}, {"thrift_version", utf8_type}, {"tokens", set_type_impl::get_instance(utf8_type, true)}, {"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, true)}, }, // static columns {}, // regular column name type utf8_type, // comment "information about the local node" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return schema; } schema_ptr system_keyspace::v3::truncated() { static thread_local auto local = [] { schema_builder builder(generate_legacy_id(NAME, TRUNCATED), NAME, TRUNCATED, // partition key {{"table_uuid", uuid_type}}, // clustering key {{"shard", int32_type}}, // regular columns { {"position", int32_type}, {"segment_id", long_type} }, // static columns { {"truncated_at", timestamp_type}, }, // regular column name type utf8_type, // comment "information about table truncation" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return local; } thread_local data_type replay_position_type = tuple_type_impl::get_instance({long_type, int32_type}); schema_ptr system_keyspace::v3::commitlog_cleanups() { static thread_local auto local = [] { schema_builder builder(generate_legacy_id(NAME, COMMITLOG_CLEANUPS), NAME, COMMITLOG_CLEANUPS, // partition key {{"shard", int32_type}}, // clustering key { {"position", replay_position_type}, {"table_uuid", uuid_type}, {"start_token_exclusive", long_type}, {"end_token_inclusive", long_type}, }, // regular columns {}, // static columns {}, // regular column name type utf8_type, // comment "information about cleanups, for filtering commitlog replay" ); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return local; } schema_ptr system_keyspace::v3::peers() { // identical return db::system_keyspace::peers(); } schema_ptr system_keyspace::v3::peer_events() { // identical return db::system_keyspace::peer_events(); } schema_ptr system_keyspace::v3::range_xfers() { // identical return db::system_keyspace::range_xfers(); } schema_ptr system_keyspace::v3::compaction_history() { // identical return db::system_keyspace::compaction_history(); } schema_ptr system_keyspace::v3::sstable_activity() { // identical return db::system_keyspace::sstable_activity(); } schema_ptr system_keyspace::v3::size_estimates() { // identical return db::system_keyspace::size_estimates(); } schema_ptr system_keyspace::v3::large_partitions() { // identical return db::system_keyspace::large_partitions(); } schema_ptr system_keyspace::v3::scylla_local() { // identical return db::system_keyspace::scylla_local(); } schema_ptr system_keyspace::v3::available_ranges() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, AVAILABLE_RANGES), NAME, AVAILABLE_RANGES, // partition key {{"keyspace_name", utf8_type}}, // clustering key {}, // regular columns {{"ranges", set_type_impl::get_instance(bytes_type, true)}}, // static columns {}, // regular column name type utf8_type, // comment "available keyspace/ranges during bootstrap/replace that are ready to be served" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::v3::views_builds_in_progress() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, VIEWS_BUILDS_IN_PROGRESS), NAME, VIEWS_BUILDS_IN_PROGRESS, // partition key {{"keyspace_name", utf8_type}}, // clustering key {{"view_name", utf8_type}}, // regular columns {{"last_token", utf8_type}, {"generation_number", int32_type}}, // static columns {}, // regular column name type utf8_type, // comment "views builds current progress" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::v3::built_views() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, BUILT_VIEWS), NAME, BUILT_VIEWS, // partition key {{"keyspace_name", utf8_type}}, // clustering key {{"view_name", utf8_type}}, // regular columns {}, // static columns {}, // regular column name type utf8_type, // comment "built views" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::v3::scylla_views_builds_in_progress() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS); return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("view_name", utf8_type, column_kind::clustering_key) .with_column("cpu_id", int32_type, column_kind::clustering_key) .with_column("next_token", utf8_type) .with_column("generation_number", int32_type) .with_column("first_token", utf8_type) .with_hash_version() .build(); }(); return schema; } /*static*/ schema_ptr system_keyspace::v3::cdc_local() { static thread_local auto cdc_local = [] { schema_builder builder(generate_legacy_id(NAME, CDC_LOCAL), NAME, CDC_LOCAL, // partition key {{"key", utf8_type}}, // clustering key {}, // regular columns { /* Every node announces the identifier of the newest known CDC generation to other nodes. * The identifier consists of two things: a timestamp (which is the generation's timestamp, * denoting the time point from which it starts operating) and an UUID (randomly generated * when the generation is created). * This identifier is persisted here and restored on node restart. * * Some identifiers - identifying generations created in older clusters - have only the timestamp. * For these the uuid column is empty. */ {"streams_timestamp", timestamp_type}, {"uuid", uuid_type}, }, // static columns {}, // regular column name type utf8_type, // comment "CDC-specific information that the local node stores" ); builder.set_gc_grace_seconds(0); builder.with_hash_version(); return builder.build(schema_builder::compact_storage::no); }(); return cdc_local; } schema_ptr system_keyspace::group0_history() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, GROUP0_HISTORY); return schema_builder(NAME, GROUP0_HISTORY, id) // this is a single-partition table with key 'history' .with_column("key", utf8_type, column_kind::partition_key) // group0 state timeuuid, descending order .with_column("state_id", reversed_type_impl::get_instance(timeuuid_type), column_kind::clustering_key) // human-readable description of the change .with_column("description", utf8_type) .set_comment("History of Raft group 0 state changes") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::discovery() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, DISCOVERY); return schema_builder(NAME, DISCOVERY, id) // This is a single-partition table with key 'peers' .with_column("key", utf8_type, column_kind::partition_key) // Peer ip address .with_column("ip_addr", inet_addr_type, column_kind::clustering_key) // The ID of the group 0 server on that peer. // May be unknown during discovery, then it's set to UUID 0. .with_column("raft_server_id", uuid_type) .set_comment("State of cluster discovery algorithm: the set of discovered peers") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::broadcast_kv_store() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, BROADCAST_KV_STORE); return schema_builder(NAME, BROADCAST_KV_STORE, id) .with_column("key", utf8_type, column_kind::partition_key) .with_column("value", utf8_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::sstables_registry() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, SSTABLES_REGISTRY); return schema_builder(NAME, SSTABLES_REGISTRY, id) .with_column("owner", uuid_type, column_kind::partition_key) .with_column("generation", timeuuid_type, column_kind::clustering_key) .with_column("status", utf8_type) .with_column("state", utf8_type) .with_column("version", utf8_type) .with_column("format", utf8_type) .set_comment("SSTables ownership table") .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::tablets() { static thread_local auto schema = replica::make_tablets_schema(); return schema; } schema_ptr system_keyspace::service_levels_v2() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, SERVICE_LEVELS_V2); return schema_builder(NAME, SERVICE_LEVELS_V2, id) .with_column("service_level", utf8_type, column_kind::partition_key) .with_column("timeout", duration_type) .with_column("workload_type", utf8_type) .with_column("shares", int32_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::view_build_status_v2() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, VIEW_BUILD_STATUS_V2); return schema_builder(NAME, VIEW_BUILD_STATUS_V2, id) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("view_name", utf8_type, column_kind::partition_key) .with_column("host_id", uuid_type, column_kind::clustering_key) .with_column("status", utf8_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::roles() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, ROLES), NAME, ROLES, // partition key {{"role", utf8_type}}, // clustering key {}, // regular columns { {"can_login", boolean_type}, {"is_superuser", boolean_type}, {"member_of", set_type_impl::get_instance(utf8_type, true)}, {"salted_hash", utf8_type} }, // static columns {}, // regular column name type utf8_type, // comment "roles for authentication and RBAC" ); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::role_members() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, ROLE_MEMBERS), NAME, ROLE_MEMBERS, // partition key {{"role", utf8_type}}, // clustering key {{"member", utf8_type}}, // regular columns {}, // static columns {}, // regular column name type utf8_type, // comment "joins users and their granted roles in RBAC" ); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::role_attributes() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, ROLE_ATTRIBUTES), NAME, ROLE_ATTRIBUTES, // partition key {{"role", utf8_type}}, // clustering key {{"name", utf8_type}}, // regular columns { {"value", utf8_type} }, // static columns {}, // regular column name type utf8_type, // comment "role permissions in RBAC" ); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::role_permissions() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, ROLE_PERMISSIONS), NAME, ROLE_PERMISSIONS, // partition key {{"role", utf8_type}}, // clustering key {{"resource", utf8_type}}, // regular columns { {"permissions", set_type_impl::get_instance(utf8_type, true)} }, // static columns {}, // regular column name type utf8_type, // comment "role permissions for CassandraAuthorizer" ); builder.with_hash_version(); return builder.build(); }(); return schema; } schema_ptr system_keyspace::dicts() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, DICTS); return schema_builder(NAME, DICTS, std::make_optional(id)) .with_column("name", utf8_type, column_kind::partition_key) .with_column("timestamp", timestamp_type) .with_column("origin", uuid_type) .with_column("data", bytes_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::view_building_tasks() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, VIEW_BUILDING_TASKS); return schema_builder(NAME, VIEW_BUILDING_TASKS, std::make_optional(id)) .with_column("key", utf8_type, column_kind::partition_key) .with_column("id", timeuuid_type, column_kind::clustering_key) .with_column("type", utf8_type) .with_column("aborted", boolean_type) .with_column("base_id", uuid_type) .with_column("view_id", uuid_type) .with_column("last_token", long_type) .with_column("host_id", uuid_type) .with_column("shard", int32_type) .with_hash_version() .build(); }(); return schema; } schema_ptr system_keyspace::client_routes() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, CLIENT_ROUTES); return schema_builder(NAME, CLIENT_ROUTES, std::make_optional(id)) .with_column("connection_id", utf8_type, column_kind::partition_key) .with_column("host_id", uuid_type, column_kind::clustering_key) .with_column("address", utf8_type) .with_column("port", int32_type) .with_column("tls_port", int32_type) .with_column("alternator_port", int32_type) .with_column("alternator_https_port", int32_type) .with_hash_version() .build(); }(); return schema; } future system_keyspace::load_local_info() { auto msg = co_await execute_cql(format("SELECT host_id, cluster_name, data_center, rack FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL)); local_info ret; if (!msg->empty()) { auto& row = msg->one(); if (row.has("host_id")) { ret.host_id = locator::host_id(row.get_as("host_id")); } if (row.has("cluster_name")) { ret.cluster_name = row.get_as("cluster_name"); } if (row.has("data_center")) { ret.dc = row.get_as("data_center"); } if (row.has("rack")) { ret.rack = row.get_as("rack"); } } co_return ret; } future<> system_keyspace::save_local_info(local_info sysinfo, gms::inet_address broadcast_address, gms::inet_address broadcast_rpc_address) { auto& cfg = _db.get_config(); sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" , db::system_keyspace::LOCAL); return execute_cql(req, sstring(db::system_keyspace::LOCAL), sysinfo.host_id.uuid(), sysinfo.cluster_name, version::release(), cql3::query_processor::CQL_VERSION, to_sstring(unsigned(cql_serialization_format::latest().protocol_version())), sysinfo.dc, sysinfo.rack, sstring(cfg.partitioner()), broadcast_rpc_address, broadcast_address, sysinfo.listen_address.addr() ).discard_result(); } future<> system_keyspace::save_local_supported_features(const std::set& feats) { static const auto req = format("INSERT INTO system.{} (key, supported_features) VALUES (?, ?)", LOCAL); return execute_cql(req, sstring(db::system_keyspace::LOCAL), fmt::to_string(fmt::join(feats, ","))).discard_result(); } // The cache must be distributed, because the values themselves may not update atomically, so a shard reading that // is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all // updates are propagated correctly. struct local_cache { system_keyspace::bootstrap_state _state; }; future<> system_keyspace::peers_table_read_fixup() { SCYLLA_ASSERT(this_shard_id() == 0); if (_peers_table_read_fixup_done) { co_return; } _peers_table_read_fixup_done = true; const auto cql = format("SELECT peer, host_id, WRITETIME(host_id) as ts from system.{}", PEERS); std::unordered_map> map{}; const auto cql_result = co_await execute_cql(cql); for (const auto& row : *cql_result) { const auto peer = row.get_as("peer"); if (!row.has("host_id")) { slogger.error("Peer {} has no host_id in system.{}, the record is broken, removing it", peer, system_keyspace::PEERS); co_await remove_endpoint(gms::inet_address{peer}); continue; } const auto host_id = row.get_as("host_id"); if (!host_id) { slogger.error("Peer {} has null host_id in system.{}, the record is broken, removing it", peer, system_keyspace::PEERS); co_await remove_endpoint(gms::inet_address{peer}); continue; } const auto ts = row.get_as("ts"); const auto it = map.find(host_id); if (it == map.end()) { map.insert({host_id, {peer, ts}}); continue; } if (it->second.second >= ts) { slogger.error("Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it", peer, host_id, it->second.first, system_keyspace::PEERS); co_await remove_endpoint(gms::inet_address{peer}); } else { slogger.error("Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it", it->second.first, host_id, peer, system_keyspace::PEERS); co_await remove_endpoint(gms::inet_address{it->second.first}); it->second = {peer, ts}; } } } future<> system_keyspace::build_bootstrap_info() { sstring req = format("SELECT bootstrapped FROM system.{} WHERE key = ? ", LOCAL); return execute_cql(req, sstring(LOCAL)).then([this] (auto msg) { static auto state_map = std::unordered_map({ { "NEEDS_BOOTSTRAP", bootstrap_state::NEEDS_BOOTSTRAP }, { "COMPLETED", bootstrap_state::COMPLETED }, { "IN_PROGRESS", bootstrap_state::IN_PROGRESS }, { "DECOMMISSIONED", bootstrap_state::DECOMMISSIONED } }); bootstrap_state state = bootstrap_state::NEEDS_BOOTSTRAP; if (!msg->empty() && msg->one().has("bootstrapped")) { state = state_map.at(msg->one().template get_as("bootstrapped")); } return container().invoke_on_all([state] (auto& sys_ks) { sys_ks._cache->_state = state; }); }); } } namespace db { // Read system.truncate table and cache last truncation time in `table` object for each table on every shard future> system_keyspace::load_truncation_times() { std::unordered_map result; if (!_db.get_config().ignore_truncation_record.is_set()) { sstring req = format("SELECT DISTINCT table_uuid, truncated_at from system.{}", TRUNCATED); auto result_set = co_await execute_cql(req); for (const auto& row: *result_set) { const auto table_uuid = table_id(row.get_as("table_uuid")); const auto ts = row.get_as("truncated_at"); result[table_uuid] = ts; } } co_return result; } future<> system_keyspace::drop_truncation_rp_records() { sstring req = format("SELECT table_uuid, shard, segment_id from system.{}", TRUNCATED); auto rs = co_await execute_cql(req); bool any = false; std::unordered_set to_delete; auto db = _qp.db(); auto max_concurrency = std::min(1024u, smp::count * 8); co_await seastar::max_concurrent_for_each(*rs, max_concurrency, [&] (const cql3::untyped_result_set_row& row) -> future<> { auto table_uuid = table_id(row.get_as("table_uuid")); if (!db.try_find_table(table_uuid)) { to_delete.emplace(table_uuid); co_return; } auto shard = row.get_as("shard"); auto segment_id = row.get_as("segment_id"); if (segment_id != 0) { any = true; sstring req = format("UPDATE system.{} SET segment_id = 0, position = 0 WHERE table_uuid = {} AND shard = {}", TRUNCATED, table_uuid, shard); co_await execute_cql(req); } }); if (!to_delete.empty()) { // IN has a limit to how many values we can put into it. for (auto&& chunk : to_delete | std::views::transform(&table_id::to_sstring) | std::views::chunk(100)) { auto str = std::ranges::to(chunk | std::views::join_with(',')); auto req = format("DELETE FROM system.{} WHERE table_uuid IN ({})", TRUNCATED, str); co_await execute_cql(req); } any = true; } if (any) { co_await force_blocking_flush(TRUNCATED); } } future<> system_keyspace::remove_truncation_records(table_id id) { auto req = format("DELETE FROM system.{} WHERE table_uuid = {}", TRUNCATED, id); co_await execute_cql(req); co_await force_blocking_flush(TRUNCATED); } future<> system_keyspace::save_truncation_record(const replica::column_family& cf, db_clock::time_point truncated_at, db::replay_position rp) { sstring req = format("INSERT INTO system.{} (table_uuid, shard, position, segment_id, truncated_at) VALUES(?,?,?,?,?)", TRUNCATED); co_await _qp.execute_internal(req, {cf.schema()->id().uuid(), int32_t(rp.shard_id()), int32_t(rp.pos), int64_t(rp.base_id()), truncated_at}, cql3::query_processor::cache_internal::yes); // Flush the table so that the value is available on boot before commitlog replay. // Commit log replay depends on truncation records to determine the minimum replay position. co_await force_blocking_flush(TRUNCATED); } future system_keyspace::get_truncated_positions(table_id cf_id) { replay_positions result; if (_db.get_config().ignore_truncation_record.is_set()) { co_return result; } const auto req = format("SELECT * from system.{} WHERE table_uuid = ?", TRUNCATED); auto result_set = co_await execute_cql(req, {cf_id.uuid()}); result.reserve(result_set->size()); for (const auto& row: *result_set) { result.emplace_back(row.get_as("shard"), row.get_as("segment_id"), row.get_as("position")); } co_return result; } future<> system_keyspace::drop_all_commitlog_cleanup_records() { // In this function we want to clear the entire COMMITLOG_CLEANUPS table. // // We can't use TRUNCATE, since it's a system table. So we have to delete each partition. // // The partition key is the shard number. If we knew how many shards there were in // the previous boot cycle, we could just issue DELETEs for 1..N. // // But we don't know that here, so we have to SELECT the set of partition keys, // and issue DELETEs on that. sstring req = format("SELECT shard from system.{}", COMMITLOG_CLEANUPS); auto rs = co_await execute_cql(req); co_await coroutine::parallel_for_each(*rs, [&] (const cql3::untyped_result_set_row& row) -> future<> { auto shard = row.get_as("shard"); co_await execute_cql(format("DELETE FROM system.{} WHERE shard = {}", COMMITLOG_CLEANUPS, shard)); }); } future<> system_keyspace::drop_old_commitlog_cleanup_records(replay_position min_position) { auto pos = make_tuple_value(replay_position_type, tuple_type_impl::native_type({ int64_t(min_position.base_id()), int32_t(min_position.pos) })); sstring req = format("DELETE FROM system.{} WHERE shard = ? AND position < ?", COMMITLOG_CLEANUPS); co_await _qp.execute_internal(req, {int32_t(min_position.shard_id()), pos}, cql3::query_processor::cache_internal::yes); } future<> system_keyspace::save_commitlog_cleanup_record(table_id table, dht::token_range tr, db::replay_position rp) { auto [start_token_exclusive, end_token_inclusive] = canonical_token_range(tr); auto pos = make_tuple_value(replay_position_type, tuple_type_impl::native_type({int64_t(rp.base_id()), int32_t(rp.pos)})); sstring req = format("INSERT INTO system.{} (shard, position, table_uuid, start_token_exclusive, end_token_inclusive) VALUES(?,?,?,?,?)", COMMITLOG_CLEANUPS); co_await _qp.execute_internal(req, {int32_t(rp.shard_id()), pos, table.uuid(), start_token_exclusive, end_token_inclusive}, cql3::query_processor::cache_internal::yes); } std::pair system_keyspace::canonical_token_range(dht::token_range tr) { // closed_full_range represents a full interval using only regular token values. (No infinities). auto closed_full_range = dht::token_range::make({dht::first_token()}, dht::token::from_int64(std::numeric_limits::max())); // By intersecting with closed_full_range we get rid of all the crazy infinities that can be represented by dht::token_range. auto finite_tr = tr.intersection(closed_full_range, dht::token_comparator()); if (!finite_tr) { // If we got here, the interval was degenerate, with only infinities. // So we return an empty (x, x] interval. // We arbitrarily choose `min` as the `x`. // // Note: (x, x] is interpreted by the interval classes from `interval.hh` as the // *full* (wrapping) interval, not an empty interval, so be careful about this if you ever // want to implement a conversion from the output of this function back to `dht::token_range`. // Nota bene, this `interval.hh` convention means that there is no way to represent an empty // interval, so it is objectively bad. // // Note: (x, x] is interpreted by boost::icl as an empty interval, so it doesn't need any special // treatment before use in `boost::icl::interval_map`. return {std::numeric_limits::min(), std::numeric_limits::min()}; } // After getting rid of possible infinities, we only have to adjust the openness of bounds. int64_t start_token_exclusive = dht::token::to_int64(finite_tr->start().value().value()); if (finite_tr->start()->is_inclusive()) { start_token_exclusive -= 1; } int64_t end_token_inclusive = dht::token::to_int64(finite_tr->end().value().value()); if (!finite_tr->end()->is_inclusive()) { end_token_inclusive -= 1; } return {start_token_exclusive, end_token_inclusive}; } size_t system_keyspace::commitlog_cleanup_map_hash::operator()(const std::pair& p) const { size_t seed = 0; boost::hash_combine(seed, std::hash()(p.first.uuid())); boost::hash_combine(seed, std::hash()(p.second)); return seed; } struct system_keyspace::commitlog_cleanup_local_map::impl { boost::icl::interval_map< int64_t, db::replay_position, boost::icl::partial_absorber, std::less, boost::icl::inplace_max, boost::icl::inter_section, boost::icl::left_open_interval > _map; }; system_keyspace::commitlog_cleanup_local_map::~commitlog_cleanup_local_map() { } system_keyspace::commitlog_cleanup_local_map::commitlog_cleanup_local_map() : _pimpl(std::make_unique()) {} std::optional system_keyspace::commitlog_cleanup_local_map::get(int64_t token) const { if (auto it = _pimpl->_map.find(token); it != _pimpl->_map.end()) { return it->second; } return std::nullopt; } future system_keyspace::get_commitlog_cleanup_records() { commitlog_cleanup_map ret; const auto req = format("SELECT * from system.{}", COMMITLOG_CLEANUPS); auto result_set = co_await execute_cql(req); for (const auto& row: *result_set) { auto table = table_id(row.get_as("table_uuid")); auto shard = row.get_as("shard"); auto start_token_exclusive = row.get_as("start_token_exclusive"); auto end_token_inclusive = row.get_as("end_token_inclusive"); auto pos_tuple = value_cast(replay_position_type->deserialize(row.get_view("position"))); auto rp = db::replay_position( shard, value_cast(pos_tuple[0]), value_cast(pos_tuple[1]) ); auto& inner_map = ret.try_emplace(std::make_pair(table, shard)).first->second; inner_map._pimpl->_map += std::make_pair(boost::icl::left_open_interval(start_token_exclusive, end_token_inclusive), rp); } co_return ret; } static set_type_impl::native_type deserialize_set_column(const schema& s, const cql3::untyped_result_set_row& row, const char* name) { auto blob = row.get_blob_unfragmented(name); auto cdef = s.get_column_definition(name); auto deserialized = cdef->type->deserialize(blob); return value_cast(deserialized); } static set_type_impl::native_type prepare_tokens(const std::unordered_set& tokens) { set_type_impl::native_type tset; for (auto& t: tokens) { tset.push_back(t.to_sstring()); } return tset; } std::unordered_set decode_tokens(const set_type_impl::native_type& tokens) { std::unordered_set tset; for (auto& t: tokens) { auto str = value_cast(t); SCYLLA_ASSERT(str == dht::token::from_sstring(str).to_sstring()); tset.insert(dht::token::from_sstring(str)); } return tset; } static std::unordered_set decode_nodes_ids(const set_type_impl::native_type& nodes_ids) { std::unordered_set ids_set; for (auto& id: nodes_ids) { auto uuid = value_cast(id); ids_set.insert(raft::server_id{uuid}); } return ids_set; } static cdc::generation_id_v2 decode_cdc_generation_id(const data_value& gen_id) { auto native = value_cast(gen_id); auto ts = value_cast(native[0]); auto id = value_cast(native[1]); return cdc::generation_id_v2{ts, id}; } static std::vector decode_cdc_generations_ids(const set_type_impl::native_type& gen_ids) { std::vector gen_ids_list; for (auto& gen_id: gen_ids) { gen_ids_list.push_back(decode_cdc_generation_id(gen_id)); } return gen_ids_list; } future>> system_keyspace::load_tokens() { co_await peers_table_read_fixup(); const sstring req = format("SELECT peer, tokens FROM system.{}", PEERS); std::unordered_map> ret; const auto cql_result = co_await execute_cql(req); for (const auto& row : *cql_result) { if (row.has("tokens")) { ret.emplace(gms::inet_address(row.get_as("peer")), decode_tokens(deserialize_set_column(*peers(), row, "tokens"))); } } co_return ret; } future> system_keyspace::load_host_ids() { co_await peers_table_read_fixup(); const sstring req = format("SELECT peer, host_id FROM system.{}", PEERS); std::unordered_map ret; const auto cql_result = co_await execute_cql(req); for (const auto& row : *cql_result) { ret.emplace(gms::inet_address(row.get_as("peer")), locator::host_id(row.get_as("host_id"))); } co_return ret; } future> system_keyspace::load_endpoint_state() { co_await peers_table_read_fixup(); const auto msg = co_await execute_cql(format("SELECT peer, host_id, tokens, data_center, rack from system.{}", PEERS)); std::unordered_map ret; for (const auto& row : *msg) { gms::loaded_endpoint_state st; auto ep = row.get_as("peer"); if (!row.has("host_id")) { // Must never happen after `peers_table_read_fixup` call above on_internal_error_noexcept(slogger, format("load_endpoint_state: node {} has no host_id in system.{}", ep, PEERS)); } auto host_id = locator::host_id(row.get_as("host_id")); if (row.has("tokens")) { st.tokens = decode_tokens(deserialize_set_column(*peers(), row, "tokens")); } if (row.has("data_center") && row.has("rack")) { st.opt_dc_rack.emplace(locator::endpoint_dc_rack { row.get_as("data_center"), row.get_as("rack") }); if (st.opt_dc_rack->dc.empty() || st.opt_dc_rack->rack.empty()) { slogger.error("load_endpoint_state: node {}/{} has empty dc={} or rack={}", host_id, ep, st.opt_dc_rack->dc, st.opt_dc_rack->rack); continue; } } else { slogger.warn("Endpoint {} has no {} in system.{}", ep, !row.has("data_center") && !row.has("rack") ? "data_center nor rack" : !row.has("data_center") ? "data_center" : "rack", PEERS); } st.endpoint = ep; ret.emplace(host_id, std::move(st)); } co_return ret; } future> system_keyspace::load_peers() { co_await peers_table_read_fixup(); const auto res = co_await execute_cql(format("SELECT peer, rpc_address FROM system.{}", PEERS)); SCYLLA_ASSERT(res); std::vector ret; for (const auto& row: *res) { if (!row.has("rpc_address")) { // In the Raft-based topology, we store the Host ID -> IP mapping // of joining nodes in PEERS. We want to ignore such rows. To achieve // it, we check the presence of rpc_address, but we could choose any // column other than host_id and tokens (rows with no tokens can // correspond to zero-token nodes). continue; } ret.emplace_back(gms::inet_address(row.get_as("peer"))); } co_return ret; } future> system_keyspace::load_peers_ids() { co_await peers_table_read_fixup(); const auto res = co_await execute_cql(format("SELECT rpc_address, host_id FROM system.{}", PEERS)); SCYLLA_ASSERT(res); std::vector ret; for (const auto& row: *res) { if (!row.has("rpc_address")) { // In the Raft-based topology, we store the Host ID -> IP mapping // of joining nodes in PEERS. We want to ignore such rows. To achieve // it, we check the presence of rpc_address, but we could choose any // column other than host_id and tokens (rows with no tokens can // correspond to zero-token nodes). continue; } ret.emplace_back(locator::host_id(row.get_as("host_id"))); } co_return ret; } future> system_keyspace::load_peer_features() { co_await peers_table_read_fixup(); const sstring req = format("SELECT host_id, supported_features FROM system.{}", PEERS); std::unordered_map ret; const auto cql_result = co_await execute_cql(req); for (const auto& row : *cql_result) { if (row.has("supported_features")) { ret.emplace(locator::host_id(row.get_as("host_id")), row.get_as("supported_features")); } } co_return ret; } future> system_keyspace::get_preferred_ips() { co_await peers_table_read_fixup(); const sstring req = format("SELECT peer, preferred_ip FROM system.{}", PEERS); std::unordered_map res; const auto cql_result = co_await execute_cql(req); for (const auto& r : *cql_result) { if (r.has("preferred_ip")) { res.emplace(gms::inet_address(r.get_as("peer")), gms::inet_address(r.get_as("preferred_ip"))); } } co_return res; } namespace { template static data_value_or_unset make_data_value_or_unset(const std::optional& opt) { if (opt) { return data_value(*opt); } else { return unset_value{}; } }; static data_value_or_unset make_data_value_or_unset(const std::optional>& opt) { if (opt) { auto set_type = set_type_impl::get_instance(utf8_type, true); return make_set_value(set_type, prepare_tokens(*opt)); } else { return unset_value{}; } }; } future<> system_keyspace::update_peer_info(gms::inet_address ep, locator::host_id hid, const peer_info& info) { if (ep == gms::inet_address{}) { on_internal_error(slogger, format("update_peer_info called with empty inet_address, host_id {}", hid)); } if (!hid) { on_internal_error(slogger, format("update_peer_info called with empty host_id, ep {}", ep)); } if (_db.get_token_metadata().get_topology().is_me(hid)) { on_internal_error(slogger, format("update_peer_info called for this node: {}", ep)); } data_value_list values = { data_value_or_unset(data_value(ep.addr())), make_data_value_or_unset(info.data_center), data_value_or_unset(hid.id), make_data_value_or_unset(info.preferred_ip), make_data_value_or_unset(info.rack), make_data_value_or_unset(info.release_version), make_data_value_or_unset(info.rpc_address), make_data_value_or_unset(info.schema_version), make_data_value_or_unset(info.tokens), make_data_value_or_unset(info.supported_features), }; auto query = fmt::format("INSERT INTO system.{} " "(peer,data_center,host_id,preferred_ip,rack,release_version,rpc_address,schema_version,tokens,supported_features) VALUES" "(?,?,?,?,?,?,?,?,?,?)", PEERS); slogger.debug("{}: values={}", query, values); const auto guard = co_await get_units(_peers_cache_lock, 1); try { co_await _qp.execute_internal(query, db::consistency_level::ONE, values, cql3::query_processor::cache_internal::yes); if (auto* cache = get_peers_cache()) { cache->host_id_to_inet_ip[hid] = ep; cache->inet_ip_to_host_id[ep] = hid; } } catch (...) { _peers_cache = nullptr; throw; } } system_keyspace::peers_cache* system_keyspace::get_peers_cache() { auto* cache = _peers_cache.get(); if (cache && (lowres_clock::now() > cache->expiration_time)) { _peers_cache = nullptr; return nullptr; } return cache; } future> system_keyspace::get_or_load_peers_cache() { const auto guard = co_await get_units(_peers_cache_lock, 1); if (auto* cache = get_peers_cache()) { co_return cache->shared_from_this(); } auto cache = make_lw_shared(); cache->inet_ip_to_host_id = co_await load_host_ids(); cache->host_id_to_inet_ip.reserve(cache->inet_ip_to_host_id.size()); for (const auto [ip, id]: cache->inet_ip_to_host_id) { const auto [it, inserted] = cache->host_id_to_inet_ip.insert({id, ip}); if (!inserted) { on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}", id, it->second, ip)); } } cache->expiration_time = lowres_clock::now() + std::chrono::milliseconds(200); _peers_cache = cache; co_return std::move(cache); } future> system_keyspace::get_ip_from_peers_table(locator::host_id id) { const auto cache = co_await get_or_load_peers_cache(); if (const auto it = cache->host_id_to_inet_ip.find(id); it != cache->host_id_to_inet_ip.end()) { co_return it->second; } co_return std::nullopt; } future system_keyspace::get_host_id_to_ip_map() { const auto cache = co_await get_or_load_peers_cache(); co_return cache->host_id_to_inet_ip; } template future<> system_keyspace::set_scylla_local_param_as(const sstring& key, const T& value, bool visible_before_cl_replay) { sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", system_keyspace::SCYLLA_LOCAL); auto type = data_type_for(); co_await execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result(); if (visible_before_cl_replay) { co_await force_blocking_flush(SCYLLA_LOCAL); } } template future> system_keyspace::get_scylla_local_param_as(const sstring& key) { sstring req = format("SELECT value FROM system.{} WHERE key = ?", system_keyspace::SCYLLA_LOCAL); return execute_cql(req, key).then([] (::shared_ptr res) -> future> { if (res->empty() || !res->one().has("value")) { return make_ready_future>(std::optional()); } auto type = data_type_for(); return make_ready_future>(value_cast(type->deserialize( type->from_string(res->one().get_as("value"))))); }); } template future> system_keyspace::get_scylla_local_param_as(const sstring& key); future<> system_keyspace::set_scylla_local_param(const sstring& key, const sstring& value, bool visible_before_cl_replay) { return set_scylla_local_param_as(key, value, visible_before_cl_replay); } future> system_keyspace::get_scylla_local_param(const sstring& key){ return get_scylla_local_param_as(key); } future<> system_keyspace::update_schema_version(table_schema_version version) { sstring req = format("INSERT INTO system.{} (key, schema_version) VALUES (?, ?)", LOCAL); return execute_cql(req, sstring(LOCAL), version.uuid()).discard_result(); } /** * Remove stored tokens being used by another node */ future<> system_keyspace::remove_endpoint(gms::inet_address ep) { const sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS); slogger.debug("DELETE FROM system.{} WHERE peer = {}", PEERS, ep); const auto guard = co_await get_units(_peers_cache_lock, 1); try { co_await execute_cql(req, ep.addr()).discard_result(); if (auto* cache = get_peers_cache()) { const auto it = cache->inet_ip_to_host_id.find(ep); if (it != cache->inet_ip_to_host_id.end()) { const auto id = it->second; cache->inet_ip_to_host_id.erase(it); cache->host_id_to_inet_ip.erase(id); } } } catch (...) { _peers_cache = nullptr; throw; } } future<> system_keyspace::update_tokens(const std::unordered_set& tokens) { sstring req = format("INSERT INTO system.{} (key, tokens) VALUES (?, ?)", LOCAL); auto set_type = set_type_impl::get_instance(utf8_type, true); co_await execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens))); } future<> system_keyspace::force_blocking_flush(sstring cfname) { return container().invoke_on_all([cfname = std::move(cfname)] (db::system_keyspace& sys_ks) { // if (!Boolean.getBoolean("cassandra.unsafesystem")) return sys_ks._db.flush(NAME, cfname); }); } future> system_keyspace::get_saved_tokens() { sstring req = format("SELECT tokens FROM system.{} WHERE key = ?", LOCAL); return execute_cql(req, sstring(LOCAL)).then([] (auto msg) { if (msg->empty() || !msg->one().has("tokens")) { return make_ready_future>(); } auto decoded_tokens = decode_tokens(deserialize_set_column(*local(), msg->one(), "tokens")); return make_ready_future>(std::move(decoded_tokens)); }); } future> system_keyspace::get_local_tokens() { return get_saved_tokens().then([] (auto&& tokens) { if (tokens.empty()) { auto err = format("get_local_tokens: tokens is empty"); slogger.error("{}", err); throw std::runtime_error(err); } return std::move(tokens); }); } future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) { co_await std::visit(make_visitor( [this] (cdc::generation_id_v1 id) -> future<> { co_await execute_cql( format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), id.ts); }, [this] (cdc::generation_id_v2 id) -> future<> { co_await execute_cql( format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), id.ts, id.id); } ), gen_id); } future> system_keyspace::get_cdc_generation_id() { auto msg = co_await execute_cql( format("SELECT streams_timestamp, uuid FROM system.{} WHERE key = ?", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL)); if (msg->empty()) { co_return std::nullopt; } auto& row = msg->one(); if (!row.has("streams_timestamp")) { // should not happen but whatever co_return std::nullopt; } auto ts = row.get_as("streams_timestamp"); if (!row.has("uuid")) { co_return cdc::generation_id_v1{ts}; } auto id = row.get_as("uuid"); co_return cdc::generation_id_v2{ts, id}; } static const sstring CDC_REWRITTEN_KEY = "rewritten"; future<> system_keyspace::cdc_set_rewritten(std::optional gen_id) { if (gen_id) { return execute_cql( format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL), CDC_REWRITTEN_KEY, gen_id->ts).discard_result(); } else { // Insert just the row marker. return execute_cql( format("INSERT INTO system.{} (key) VALUES (?)", v3::CDC_LOCAL), CDC_REWRITTEN_KEY).discard_result(); } } future system_keyspace::cdc_is_rewritten() { // We don't care about the actual timestamp; it's additional information for debugging purposes. return execute_cql(format("SELECT key FROM system.{} WHERE key = ?", v3::CDC_LOCAL), CDC_REWRITTEN_KEY) .then([] (::shared_ptr msg) { return !msg->empty(); }); } future<> system_keyspace::read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) { static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE); static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE); struct cur_t { table_id tid; db_clock::time_point ts; utils::chunked_vector streams; }; std::optional cur; co_await _qp.query_internal(table ? single_table_query : all_tables_query, db::consistency_level::ONE, table ? data_value_list{table->uuid()} : data_value_list{}, 1000, [&] (const cql3::untyped_result_set_row& row) -> future { auto tid = table_id(row.get_as("table_id")); auto ts = row.get_as("timestamp"); auto stream_id = cdc::stream_id(row.get_as("stream_id")); if (!cur || tid != cur->tid || ts != cur->ts) { if (cur) { co_await f(cur->tid, cur->ts, std::move(cur->streams)); } cur = { tid, ts, utils::chunked_vector() }; } cur->streams.push_back(std::move(stream_id)); co_return stop_iteration::no; }); if (cur) { co_await f(cur->tid, cur->ts, std::move(cur->streams)); } } future<> system_keyspace::read_cdc_streams_history(table_id table, std::optional from, noncopyable_function(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f) { static const sstring query_all = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY); static const sstring query_from = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ? AND timestamp > ?", NAME, CDC_STREAMS_HISTORY); struct cur_t { table_id tid; db_clock::time_point ts; cdc::cdc_stream_diff diff; }; std::optional cur; co_await _qp.query_internal(from ? query_from : query_all, db::consistency_level::ONE, from ? data_value_list{table.uuid(), *from} : data_value_list{table.uuid()}, 1000, [&] (const cql3::untyped_result_set_row& row) -> future { auto tid = table_id(row.get_as("table_id")); auto ts = row.get_as("timestamp"); auto stream_state = cdc::read_stream_state(row.get_as("stream_state")); auto stream_id = cdc::stream_id(row.get_as("stream_id")); if (!cur || tid != cur->tid || ts != cur->ts) { if (cur) { co_await f(cur->tid, cur->ts, std::move(cur->diff)); } cur = { tid, ts, cdc::cdc_stream_diff() }; } if (stream_state == cdc::stream_state::closed) { cur->diff.closed_streams.push_back(std::move(stream_id)); } else if (stream_state == cdc::stream_state::opened) { cur->diff.opened_streams.push_back(std::move(stream_id)); } else { on_internal_error(slogger, fmt::format("unexpected CDC stream state {} in {}.{} for table {}", std::to_underlying(stream_state), NAME, CDC_STREAMS_HISTORY, table)); } co_return stop_iteration::no; }); if (cur) { co_await f(cur->tid, cur->ts, std::move(cur->diff)); } } bool system_keyspace::bootstrap_needed() const { return get_bootstrap_state() == bootstrap_state::NEEDS_BOOTSTRAP; } bool system_keyspace::bootstrap_complete() const { return get_bootstrap_state() == bootstrap_state::COMPLETED; } bool system_keyspace::bootstrap_in_progress() const { return get_bootstrap_state() == bootstrap_state::IN_PROGRESS; } bool system_keyspace::was_decommissioned() const { return get_bootstrap_state() == bootstrap_state::DECOMMISSIONED; } system_keyspace::bootstrap_state system_keyspace::get_bootstrap_state() const { return _cache->_state; } future<> system_keyspace::set_bootstrap_state(bootstrap_state state) { static std::unordered_map> state_to_name({ { bootstrap_state::NEEDS_BOOTSTRAP, "NEEDS_BOOTSTRAP" }, { bootstrap_state::COMPLETED, "COMPLETED" }, { bootstrap_state::IN_PROGRESS, "IN_PROGRESS" }, { bootstrap_state::DECOMMISSIONED, "DECOMMISSIONED" } }); sstring state_name = state_to_name.at(state); sstring req = format("INSERT INTO system.{} (key, bootstrapped) VALUES (?, ?)", LOCAL); co_await execute_cql(req, sstring(LOCAL), state_name).discard_result(); co_await container().invoke_on_all([state] (auto& sys_ks) { sys_ks._cache->_state = state; }); } std::vector system_keyspace::auth_tables() { return {roles(), role_members(), role_attributes(), role_permissions()}; } std::vector system_keyspace::all_tables(const db::config& cfg) { std::vector r; auto schema_tables = db::schema_tables::all_tables(schema_features::full()); std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r)); auto auth_tables = system_keyspace::auth_tables(); std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r)); r.insert(r.end(), { built_indexes(), hints(), batchlog(), batchlog_v2(), paxos(), local(), peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(), corrupt_data(), scylla_local(), db::schema_tables::scylla_table_schema_history(), repair_history(), v3::views_builds_in_progress(), v3::built_views(), v3::scylla_views_builds_in_progress(), v3::truncated(), v3::commitlog_cleanups(), v3::cdc_local(), raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(), topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(), dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history() }); if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { r.insert(r.end(), {broadcast_kv_store()}); } r.insert(r.end(), {tablets()}); if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) { r.insert(r.end(), {sstables_registry()}); } return r; } static bool maybe_write_in_user_memory(schema_ptr s) { return (s.get() == system_keyspace::batchlog().get()) || (s.get() == system_keyspace::batchlog_v2().get()) || (s.get() == system_keyspace::paxos().get()) || s == system_keyspace::v3::scylla_views_builds_in_progress(); } future<> system_keyspace::make( locator::effective_replication_map_factory& erm_factory, replica::database& db) { for (auto&& table : system_keyspace::all_tables(db.get_config())) { co_await db.create_local_system_table(table, maybe_write_in_user_memory(table), erm_factory); co_await db.find_column_family(table).init_storage(); } replica::tablet_add_repair_scheduler_user_types(NAME, db); db.find_keyspace(NAME).add_user_type(sstableinfo_type); } void system_keyspace::mark_writable() { for (auto&& table : system_keyspace::all_tables(_db.get_config())) { _db.find_column_family(table).mark_ready_for_writes(_db.commitlog_for(table)); } } static service::query_state& internal_system_query_state() { using namespace std::chrono_literals; const auto t = 10s; static timeout_config tc{ t, t, t, t, t, t, t }; static thread_local service::client_state cs(service::client_state::internal_tag{}, tc); static thread_local service::query_state qs(cs, empty_service_permit()); return qs; }; static future> get_scylla_local_mutation(replica::database& db, std::string_view key) { auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); partition_key pk = partition_key::from_singular(*s, key); dht::partition_range pr = dht::partition_range::make_singular(dht::decorate_key(*s, pk)); auto rs = co_await replica::query_mutations(db.container(), s, pr, s->full_slice(), db::no_timeout); if (!rs) { on_internal_error(slogger, "get_scylla_local_mutation(): no result from querying mutations"); } auto& ps = rs->partitions(); for (auto& p: ps) { auto mut = p.mut().unfreeze(s); co_return std::move(mut); } co_return std::nullopt; } future>> system_keyspace::query_mutations(sharded& db, schema_ptr schema) { return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout); } future>> system_keyspace::query_mutations(sharded& db, const sstring& ks_name, const sstring& cf_name) { schema_ptr schema = db.local().find_schema(ks_name, cf_name); return query_mutations(db, schema); } future>> system_keyspace::query_mutations(sharded& db, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) { auto schema = db.local().find_schema(ks_name, cf_name); auto slice_ptr = std::make_unique(partition_slice_builder(*schema) .with_range(std::move(row_range)) .build()); return replica::query_mutations(db, std::move(schema), partition_range, *slice_ptr, db::no_timeout).finally([slice_ptr = std::move(slice_ptr)] { }); } future> system_keyspace::query(sharded& db, const sstring& ks_name, const sstring& cf_name) { schema_ptr schema = db.local().find_schema(ks_name, cf_name); return replica::query_data(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout).then([schema] (auto&& qr) { return make_lw_shared(query::result_set::from_raw_result(schema, schema->full_slice(), *qr)); }); } future> system_keyspace::query(sharded& db, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range) { auto schema = db.local().find_schema(ks_name, cf_name); auto pr_ptr = std::make_unique(dht::partition_range::make_singular(key)); auto slice_ptr = std::make_unique(partition_slice_builder(*schema) .with_range(std::move(row_range)) .build()); return replica::query_data(db, schema, *pr_ptr, *slice_ptr, db::no_timeout).then( [schema, pr_ptr = std::move(pr_ptr), slice_ptr = std::move(slice_ptr)] (auto&& qr) { return make_lw_shared(query::result_set::from_raw_result(schema, schema->full_slice(), *qr)); }); } static list_type_impl::native_type prepare_sstables(const std::vector& sstables) { list_type_impl::native_type tmp; for (auto& info : sstables) { auto element = make_user_value(sstableinfo_type, {data_value(info.generation), data_value(info.origin), data_value(info.size)}); tmp.push_back(std::move(element)); } return tmp; } static std::vector restore_sstables(const std::vector& sstables) { std::vector tmp; tmp.reserve(sstables.size()); for (auto& data : sstables) { tmp.emplace_back(sstables::generation_type(value_cast(data[0])), value_cast(data[1]), value_cast(data[2])); } return tmp; } static map_type_impl::native_type prepare_rows_merged(std::unordered_map& rows_merged) { map_type_impl::native_type tmp; for (auto& r: rows_merged) { int32_t first = r.first; int64_t second = r.second; auto map_element = std::make_pair(data_value(first), data_value(second)); tmp.push_back(std::move(map_element)); } return tmp; } future<> system_keyspace::update_compaction_history(compaction_history_entry entry) { // don't write anything when the history table itself is compacted, since that would in turn cause new compactions if (entry.ks == "system" && entry.cf == COMPACTION_HISTORY) { return make_ready_future<>(); } auto map_type = map_type_impl::get_instance(int32_type, long_type, true); auto list_type = list_type_impl::get_instance(sstableinfo_type, false); db_clock::time_point compacted_at{db_clock::duration{entry.compacted_at}}; db_clock::time_point started_at{db_clock::duration{entry.started_at}}; std::function>()> execute; if (local_db().features().compaction_history_upgrade) { static constexpr auto reqest_template = "INSERT INTO system.{} ( \ id, shard_id, keyspace_name, columnfamily_name, started_at, compacted_at, compaction_type, bytes_in, bytes_out, rows_merged, \ sstables_in, sstables_out, total_tombstone_purge_attempt, total_tombstone_purge_failure_due_to_overlapping_with_memtable, \ total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable \ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; sstring request = format(reqest_template, COMPACTION_HISTORY); execute = [&, request=std::move(request)]() { return execute_cql(request, entry.id, int32_t(entry.shard_id), entry.ks, entry.cf, started_at, compacted_at, entry.compaction_type, entry.bytes_in, entry.bytes_out, make_map_value(map_type, prepare_rows_merged(entry.rows_merged)), make_list_value(list_type, prepare_sstables(entry.sstables_in)), make_list_value(list_type, prepare_sstables(entry.sstables_out)), entry.total_tombstone_purge_attempt, entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable, entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable); }; } else { static constexpr auto reqest_template = "INSERT INTO system.{} ( \ id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; sstring request = format(reqest_template, COMPACTION_HISTORY); execute = [&, request=std::move(request)]() { return execute_cql(request, entry.id, entry.ks, entry.cf, compacted_at, entry.bytes_in, entry.bytes_out, make_map_value(map_type, prepare_rows_merged(entry.rows_merged))); }; } return execute().discard_result().handle_exception([] (auto ep) { slogger.error("update compaction history failed: {}: ignored", ep); }); } future<> system_keyspace::get_compaction_history(compaction_history_consumer consumer) { sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY); co_await _qp.query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future { compaction_history_entry entry; entry.id = row.get_as("id"); entry.shard_id = row.get_or("shard_id", 0); entry.ks = row.get_as("keyspace_name"); entry.cf = row.get_as("columnfamily_name"); entry.compaction_type = row.get_or("compaction_type", ""); entry.started_at = row.get_or("started_at", 0); entry.compacted_at = row.get_as("compacted_at"); entry.bytes_in = row.get_as("bytes_in"); entry.bytes_out = row.get_as("bytes_out"); if (row.has("rows_merged")) { entry.rows_merged = row.get_map("rows_merged"); } if (row.has("sstables_in")) { entry.sstables_in = restore_sstables(row.get_list("sstables_in", sstableinfo_type)); } if (row.has("sstables_out")) { entry.sstables_out = restore_sstables(row.get_list("sstables_out", sstableinfo_type)); } entry.total_tombstone_purge_attempt = row.get_or("total_tombstone_purge_attempt", 0); entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable = row.get_or("total_tombstone_purge_failure_due_to_overlapping_with_memtable", 0); entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable = row.get_or("total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable", 0); co_await consumer(std::move(entry)); co_return stop_iteration::no; }); } future<> system_keyspace::update_repair_history(repair_history_entry entry) { sstring req = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", REPAIR_HISTORY); co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id.uuid(), entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result(); } future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) { sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id); co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future { repair_history_entry ent; ent.id = tasks::task_id(row.get_as("repair_uuid")); ent.table_uuid = ::table_id(row.get_as("table_uuid")); ent.range_start = row.get_as("range_start"); ent.range_end = row.get_as("range_end"); ent.ks = row.get_as("keyspace_name"); ent.cf = row.get_as("table_name"); ent.ts = row.get_as("repair_time"); co_await f(std::move(ent)); co_return stop_iteration::no; }); } future system_keyspace::increment_and_get_generation() { auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL); auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes); gms::generation_type generation; if (rs->empty() || !rs->one().has("gossip_generation")) { // seconds-since-epoch isn't a foolproof new generation // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), // but it's as close as sanely possible generation = gms::get_generation_number(); } else { // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. auto stored_generation = gms::generation_type(rs->one().template get_as("gossip_generation") + 1); auto now = gms::get_generation_number(); if (stored_generation >= now) { slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}." "See CASSANDRA-3654 if you experience problems", stored_generation, now); generation = stored_generation; } else { generation = now; } } req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL); co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes); co_return generation; } mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::vector estimates) { auto&& schema = db::system_keyspace::size_estimates(); auto timestamp = api::new_timestamp(); mutation m_to_apply{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks))}; for (auto&& e : estimates) { auto ck = clustering_key_prefix(std::vector{ utf8_type->decompose(e.schema->cf_name()), e.range_start_token, e.range_end_token}); m_to_apply.set_clustered_cell(ck, "mean_partition_size", e.mean_partition_size, timestamp); m_to_apply.set_clustered_cell(ck, "partitions_count", e.partitions_count, timestamp); } return m_to_apply; } future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) { sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); return execute_cql( std::move(req), std::move(ks_name), std::move(view_name), 0, int32_t(this_shard_id()), token.to_sstring()).discard_result(); } future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_name, sstring view_name, const dht::token& token) { // registers this_shard_id() with the given token and inserts an empty status for all other shards. // this is used to register all shards atomically and ensure all shards have a status, even if we crash // before all shards are registered. // if another shard has already registered, this won't overwrite its status. if it hasn't registered, we insert // a status with first_token=null and next_token=null, indicating it hasn't made progress. auto&& schema = db::system_keyspace::v3::scylla_views_builds_in_progress(); auto timestamp = api::new_timestamp(); mutation m{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks_name))}; for (size_t s = 0; s < smp::count; s++) { auto ck = clustering_key_prefix(std::vector{ utf8_type->decompose(view_name), int32_type->decompose(int32_t(s))}); m.set_clustered_cell(ck, "generation_number", int32_t(0), timestamp); if (s == this_shard_id()) { m.set_clustered_cell(ck, "first_token", token.to_sstring(), timestamp); } } return apply_mutation(std::move(m)); } future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) { sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); return execute_cql( std::move(req), std::move(ks_name), std::move(view_name), token.to_sstring(), int32_t(this_shard_id())).discard_result(); } future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) { return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), std::move(ks_name), std::move(view_name)).discard_result(); } future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) { return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), std::move(ks_name), std::move(view_name), int32_t(this_shard_id())).discard_result(); } future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) { return execute_cql( format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS), std::move(ks_name), std::move(view_name)).discard_result(); } future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) { return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS), std::move(ks_name), std::move(view_name)).discard_result(); } future> system_keyspace::load_built_views() { return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr cql_result) { return *cql_result | std::views::transform([] (const cql3::untyped_result_set::row& row) { auto ks_name = row.get_as("keyspace_name"); auto cf_name = row.get_as("view_name"); return std::pair(std::move(ks_name), std::move(cf_name)); }) | std::ranges::to>(); }); } future> system_keyspace::load_view_build_progress() { return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr cql_result) { std::vector progress; for (auto& row : *cql_result) { auto ks_name = row.get_as("keyspace_name"); auto cf_name = row.get_as("view_name"); auto first_token_opt = row.get_opt("first_token").transform(dht::token::from_sstring); auto next_token_sstring = row.get_opt("next_token"); std::optional next_token; if (next_token_sstring) { next_token = dht::token::from_sstring(std::move(next_token_sstring).value()); } auto cpu_id = row.get_as("cpu_id"); progress.emplace_back(view_build_progress{ view_name(std::move(ks_name), std::move(cf_name)), std::move(first_token_opt), std::move(next_token), static_cast(cpu_id)}); } return progress; }).handle_exception([] (const std::exception_ptr& eptr) { slogger.warn("Failed to load view build progress: {}", eptr); return std::vector(); }); } future system_keyspace::get_view_build_status_map() { static const sstring query = format("SELECT * FROM {}.{}", NAME, VIEW_BUILD_STATUS_V2); view_build_status_map map; co_await _qp.query_internal(query, [&] (const cql3::untyped_result_set_row& row) -> future { auto ks_name = row.get_as("keyspace_name"); auto view_name = row.get_as("view_name"); auto host_id = locator::host_id(row.get_as("host_id")); auto status = view::build_status_from_string(row.get_as("status")); auto view = std::make_pair(std::move(ks_name), std::move(view_name)); map[view][host_id] = status; co_return stop_iteration::no; }); co_return map; } future system_keyspace::make_view_build_status_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id, view::build_status status) { static const sstring stmt = format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS_V2); auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second, host_id.uuid(), view::build_status_to_sstring(status)}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future system_keyspace::make_view_build_status_update_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id, view::build_status status) { static const sstring stmt = format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS_V2); auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view::build_status_to_sstring(status), view_name.first, view_name.second, host_id.uuid()}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future system_keyspace::make_remove_view_build_status_mutation(api::timestamp_type ts, system_keyspace_view_name view_name) { static const sstring stmt = format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS_V2); auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future system_keyspace::make_remove_view_build_status_on_host_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id) { static const sstring stmt = format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS_V2); auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second, host_id.uuid()}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } static constexpr auto VIEW_BUILDING_KEY = "view_building"; future system_keyspace::get_view_building_tasks() { static const sstring query = format("SELECT id, type, aborted, base_id, view_id, last_token, host_id, shard FROM {}.{} WHERE key = '{}'", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY); using namespace db::view; building_tasks tasks; co_await _qp.query_internal(query, [&] (const cql3::untyped_result_set_row& row) -> future { auto id = row.get_as("id"); auto type = task_type_from_string(row.get_as("type")); auto aborted = row.get_as("aborted"); auto base_id = table_id(row.get_as("base_id")); auto view_id = row.get_opt("view_id").transform([] (const utils::UUID& uuid) { return table_id(uuid); }); auto last_token = dht::token::from_int64(row.get_as("last_token")); auto host_id = locator::host_id(row.get_as("host_id")); auto shard = unsigned(row.get_as("shard")); locator::tablet_replica replica{host_id, shard}; view_building_task task{id, type, aborted, base_id, view_id, replica, last_token}; switch (type) { case db::view::view_building_task::task_type::build_range: if (!view_id) { on_internal_error(slogger, fmt::format("view_id is not set for build_range task with id: {}", id)); } tasks[base_id][replica].view_tasks[*view_id].insert({id, std::move(task)}); break; case db::view::view_building_task::task_type::process_staging: tasks[base_id][replica].staging_tasks.insert({id, std::move(task)}); break; } co_return stop_iteration::no; }); co_return tasks; } future system_keyspace::make_view_building_task_mutation(api::timestamp_type ts, const db::view::view_building_task& task) { static const sstring stmt = format("INSERT INTO {}.{}(key, id, type, aborted, base_id, view_id, last_token, host_id, shard) VALUES ('{}', ?, ?, ?, ?, ?, ?, ?, ?)", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY); using namespace db::view; data_value_or_unset view_id = unset_value{}; if (task.type == db::view::view_building_task::task_type::build_range) { if (!task.view_id) { on_internal_error(slogger, fmt::format("view_id is not set for build_range task with id: {}", task.id)); } view_id = data_value(task.view_id->uuid()); } auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, { task.id, task_type_to_sstring(task.type), task.aborted, task.base_id.uuid(), view_id, dht::token::to_int64(task.last_token), task.replica.host.uuid(), int32_t(task.replica.shard) }); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future system_keyspace::make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id) { static const sstring stmt = format("DELETE FROM {}.{} WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY); auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {id}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } static constexpr auto VIEW_BUILDING_PROCESSING_BASE_ID_KEY = "view_building_processing_base_id"; future> system_keyspace::get_view_building_processing_base_id() { auto value = co_await get_scylla_local_param(VIEW_BUILDING_PROCESSING_BASE_ID_KEY); co_return value.transform([] (sstring uuid) { return table_id(utils::UUID(uuid)); }); } future> system_keyspace::get_view_building_processing_base_id_mutation() { return get_scylla_local_mutation(_db, VIEW_BUILDING_PROCESSING_BASE_ID_KEY); } future system_keyspace::make_view_building_processing_base_id_mutation(api::timestamp_type ts, table_id base_id) { static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal( query, internal_system_query_state(), ts, {VIEW_BUILDING_PROCESSING_BASE_ID_KEY, base_id.to_sstring()}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future system_keyspace::make_remove_view_building_processing_base_id_mutation(api::timestamp_type ts) { static sstring query = format("DELETE FROM {}.{} WHERE key = ?", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {VIEW_BUILDING_PROCESSING_BASE_ID_KEY}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size())); } co_return std::move(muts[0]); } future> system_keyspace::load_local_enabled_features() { std::set features; auto features_str = co_await get_scylla_local_param(gms::feature_service::ENABLED_FEATURES_KEY); if (features_str) { features = gms::feature_service::to_feature_set(*features_str); } co_return features; } future<> system_keyspace::save_local_enabled_features(std::set features, bool visible_before_cl_replay) { auto features_str = fmt::to_string(fmt::join(features, ",")); co_await set_scylla_local_param(gms::feature_service::ENABLED_FEATURES_KEY, features_str, visible_before_cl_replay); } future system_keyspace::get_raft_group0_id() { auto opt = co_await get_scylla_local_param_as("raft_group0_id"); co_return opt.value_or({}); } future<> system_keyspace::set_raft_group0_id(utils::UUID uuid) { return set_scylla_local_param_as("raft_group0_id", uuid, false); } static constexpr auto GROUP0_HISTORY_KEY = "history"; future system_keyspace::get_last_group0_state_id() { auto rs = co_await execute_cql( format( "SELECT state_id FROM system.{} WHERE key = '{}' LIMIT 1", GROUP0_HISTORY, GROUP0_HISTORY_KEY)); SCYLLA_ASSERT(rs); if (rs->empty()) { co_return utils::UUID{}; } co_return rs->one().get_as("state_id"); } future system_keyspace::group0_history_contains(utils::UUID state_id) { auto rs = co_await execute_cql( format( "SELECT state_id FROM system.{} WHERE key = '{}' AND state_id = ?", GROUP0_HISTORY, GROUP0_HISTORY_KEY), state_id); SCYLLA_ASSERT(rs); co_return !rs->empty(); } mutation system_keyspace::make_group0_history_state_id_mutation( utils::UUID state_id, std::optional gc_older_than, std::string_view description) { auto s = group0_history(); mutation m(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY)); auto& row = m.partition().clustered_row(*s, clustering_key::from_singular(*s, state_id)); auto ts = utils::UUID_gen::micros_timestamp(state_id); row.apply(row_marker(ts)); if (!description.empty()) { auto cdef = s->get_column_definition("description"); SCYLLA_ASSERT(cdef); row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, cdef->type->decompose(description))); } if (gc_older_than) { using namespace std::chrono; SCYLLA_ASSERT(*gc_older_than >= gc_clock::duration{0}); auto ts_micros = microseconds{ts}; auto gc_older_than_micros = duration_cast(*gc_older_than); SCYLLA_ASSERT(gc_older_than_micros < ts_micros); auto tomb_upper_bound = utils::UUID_gen::min_time_UUID(ts_micros - gc_older_than_micros); // We want to delete all entries with IDs smaller than `tomb_upper_bound` // but the deleted range is of the form (x, +inf) since the schema is reversed. auto range = query::clustering_range::make_starting_with({ clustering_key_prefix::from_single_value(*s, timeuuid_type->decompose(tomb_upper_bound)), false}); auto bv = bound_view::from_range(range); m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_clock::now()}}); } return m; } future system_keyspace::get_group0_history(sharded& db) { auto s = group0_history(); auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY); SCYLLA_ASSERT(rs); auto& ps = rs->partitions(); for (auto& p: ps) { auto mut = p.mut().unfreeze(s); auto partition_key = value_cast(utf8_type->deserialize(mut.key().get_component(*s, 0))); if (partition_key == GROUP0_HISTORY_KEY) { co_return mut; } slogger.warn("get_group0_history: unexpected partition in group0 history table: {}", partition_key); } slogger.warn("get_group0_history: '{}' partition not found", GROUP0_HISTORY_KEY); co_return mutation(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY)); } future> system_keyspace::get_group0_schema_version() { return get_scylla_local_mutation(_db, "group0_schema_version"); } static constexpr auto AUTH_VERSION_KEY = "auth_version"; future system_keyspace::get_auth_version() { auto str_opt = co_await get_scylla_local_param(AUTH_VERSION_KEY); if (!str_opt) { co_return auth_version_t::v1; } auto& str = *str_opt; if (str == "" || str == "1") { co_return auth_version_t::v1; } if (str == "2") { co_return auth_version_t::v2; } on_internal_error(slogger, fmt::format("unexpected auth_version in scylla_local got {}", str)); } future> system_keyspace::get_auth_version_mutation() { return get_scylla_local_mutation(_db, AUTH_VERSION_KEY); } future system_keyspace::make_auth_version_mutation(api::timestamp_type ts, db::system_keyspace::auth_version_t version) { static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {AUTH_VERSION_KEY, std::to_string(int64_t(version))}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 auth_version mutation got {}", muts.size())); } co_return std::move(muts[0]); } static constexpr auto VIEW_BUILDER_VERSION_KEY = "view_builder_version"; future system_keyspace::get_view_builder_version() { auto str_opt = co_await get_scylla_local_param(VIEW_BUILDER_VERSION_KEY); if (!str_opt) { co_return view_builder_version_t::v1; } auto& str = *str_opt; if (str == "" || str == "10") { co_return view_builder_version_t::v1; } if (str == "15") { co_return view_builder_version_t::v1_5; } if (str == "20") { co_return view_builder_version_t::v2; } on_internal_error(slogger, fmt::format("unexpected view_builder_version in scylla_local got {}", str)); } future> system_keyspace::get_view_builder_version_mutation() { return get_scylla_local_mutation(_db, VIEW_BUILDER_VERSION_KEY); } future system_keyspace::make_view_builder_version_mutation(api::timestamp_type ts, db::system_keyspace::view_builder_version_t version) { static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {VIEW_BUILDER_VERSION_KEY, std::to_string(int64_t(version))}); if (muts.size() != 1) { on_internal_error(slogger, fmt::format("expected 1 view_builder_version mutation got {}", muts.size())); } co_return std::move(muts[0]); } static constexpr auto SERVICE_LEVEL_DRIVER_CREATED_KEY = "service_level_driver_created"; future> system_keyspace::get_service_level_driver_created_mutation() { return get_scylla_local_mutation(_db, SERVICE_LEVEL_DRIVER_CREATED_KEY); } future system_keyspace::make_service_level_driver_created_mutation(bool is_created, api::timestamp_type timestamp) { static const sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), timestamp, {SERVICE_LEVEL_DRIVER_CREATED_KEY, data_type_for()->to_string_impl(data_value(is_created))}); if (muts.size() != 1) { on_internal_error(slogger, format("expecting single insert mutation, got {}", muts.size())); } co_return std::move(muts[0]); } future> system_keyspace::get_service_level_driver_created() { return get_scylla_local_param_as(SERVICE_LEVEL_DRIVER_CREATED_KEY); } static constexpr auto SERVICE_LEVELS_VERSION_KEY = "service_level_version"; future> system_keyspace::get_service_levels_version_mutation() { return get_scylla_local_mutation(_db, SERVICE_LEVELS_VERSION_KEY); } future system_keyspace::make_service_levels_version_mutation(int8_t version, api::timestamp_type timestamp) { static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), timestamp, {SERVICE_LEVELS_VERSION_KEY, format("{}", version)}); if (muts.size() != 1) { on_internal_error(slogger, format("expecting single insert mutation, got {}", muts.size())); } co_return std::move(muts[0]); } future> system_keyspace::get_service_levels_version() { return get_scylla_local_param_as(SERVICE_LEVELS_VERSION_KEY); } static constexpr auto GROUP0_UPGRADE_STATE_KEY = "group0_upgrade_state"; future> system_keyspace::load_group0_upgrade_state() { return get_scylla_local_param_as(GROUP0_UPGRADE_STATE_KEY); } future<> system_keyspace::save_group0_upgrade_state(sstring value) { return set_scylla_local_param(GROUP0_UPGRADE_STATE_KEY, value, false); } static constexpr auto MUST_SYNCHRONIZE_TOPOLOGY_KEY = "must_synchronize_topology"; future system_keyspace::get_must_synchronize_topology() { auto opt = co_await get_scylla_local_param_as(MUST_SYNCHRONIZE_TOPOLOGY_KEY); co_return opt.value_or(false); } future<> system_keyspace::set_must_synchronize_topology(bool value) { return set_scylla_local_param_as(MUST_SYNCHRONIZE_TOPOLOGY_KEY, value, false); } static std::set decode_features(const set_type_impl::native_type& features) { std::set fset; for (auto& f : features) { fset.insert(value_cast(std::move(f))); } return fset; } static bool must_have_tokens(service::node_state nst) { switch (nst) { case service::node_state::none: return false; // Bootstrapping and replacing nodes don't have tokens at first, // they are inserted only at some point during bootstrap/replace case service::node_state::bootstrapping: return false; case service::node_state::replacing: return false; // A decommissioning node doesn't have tokens at the end, they are // removed during transition to the left_token_ring state. case service::node_state::decommissioning: return false; // A removing node might or might not have tokens depending on whether // REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both // cases, we allow removing nodes to not have tokens. case service::node_state::removing: return false; case service::node_state::rebuilding: return true; case service::node_state::normal: return true; case service::node_state::left: return false; } } future system_keyspace::load_topology_state(const std::unordered_set& force_load_hosts) { auto rs = co_await execute_cql( format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY)); SCYLLA_ASSERT(rs); service::topology_state_machine::topology_type ret; if (rs->empty()) { co_return ret; } for (auto& row : *rs) { if (!row.has("host_id")) { // There are no clustering rows, only the static row. // Skip the whole loop, the static row is handled later. break; } raft::server_id host_id{row.get_as("host_id")}; auto datacenter = row.get_as("datacenter"); auto rack = row.get_as("rack"); auto release_version = row.get_as("release_version"); uint32_t num_tokens = row.get_as("num_tokens"); sstring tokens_string = row.get_as("tokens_string"); size_t shard_count = row.get_as("shard_count"); uint8_t ignore_msb = row.get_as("ignore_msb"); sstring cleanup_status = row.get_as("cleanup_status"); utils::UUID request_id = row.get_as("request_id"); service::node_state nstate = service::node_state_from_string(row.get_as("node_state")); std::optional ring_slice; if (row.has("tokens")) { auto tokens = decode_tokens(deserialize_set_column(*topology(), row, "tokens")); ring_slice = service::ring_slice { .tokens = std::move(tokens), }; } else { auto zero_token = num_tokens == 0 && tokens_string.empty(); if (zero_token) { // We distinguish normal zero-token nodes from token-owning nodes without tokens at the moment // in the following way: // - for normal zero-token nodes, ring_slice is engaged with an empty set of tokens, // - for token-owning nodes without tokens at the moment, ring_slice equals std::nullopt. // ring_slice also equals std::nullopt for joining zero-token nodes. The reason is that the // topology coordinator assigns tokens in the join_group0 state handler, and we want to simulate // assigning zero tokens for zero-token nodes. It allows us to have the same assertions for all nodes. // The code below is correct because the join_group0 state is the last transition state if a joining // node is zero-token. // Note that we need this workaround because we store tokens in a non-frozen set, which doesn't // distinguish an empty set from no value. if (nstate != service::node_state::none && nstate != service::node_state::bootstrapping && nstate != service::node_state::replacing) { ring_slice = service::ring_slice { .tokens = std::unordered_set(), }; } } else if (must_have_tokens(nstate)) { on_fatal_internal_error(slogger, format( "load_topology_state: node {} in {} state but missing ring slice", host_id, nstate)); } } std::optional replaced_id; if (row.has("replaced_id")) { replaced_id = raft::server_id(row.get_as("replaced_id")); } std::optional rebuild_option; if (row.has("rebuild_option")) { rebuild_option = row.get_as("rebuild_option"); } std::set supported_features; if (row.has("supported_features")) { supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features")); } if (row.has("topology_request")) { auto req = service::topology_request_from_string(row.get_as("topology_request")); ret.requests.emplace(host_id, req); switch(req) { case service::topology_request::replace: if (!replaced_id) { on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id)); } ret.req_param.emplace(host_id, service::replace_param{*replaced_id}); break; case service::topology_request::rebuild: if (!rebuild_option) { on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id)); } ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option}); break; default: // no parameters for other requests break; } } else { switch (nstate) { case service::node_state::bootstrapping: // The tokens aren't generated right away when we enter the `bootstrapping` node state. // Therefore we need to know the number of tokens when we generate them during the bootstrap process. ret.req_param.emplace(host_id, service::join_param{num_tokens, tokens_string}); break; case service::node_state::replacing: // If a node is replacing we need to know which node it is replacing and which nodes are ignored if (!replaced_id) { on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id)); } ret.req_param.emplace(host_id, service::replace_param{*replaced_id}); break; case service::node_state::rebuilding: // If a node is rebuilding it needs to know the parameter for the operation if (!rebuild_option) { on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id)); } ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option}); break; default: // no parameters for other operations break; } } std::unordered_map* map = nullptr; if (nstate == service::node_state::normal) { map = &ret.normal_nodes; } else if (nstate == service::node_state::left) { ret.left_nodes.emplace(host_id); if (force_load_hosts.contains(locator::host_id(host_id.uuid()))) { map = &ret.left_nodes_rs; } } else if (nstate == service::node_state::none) { map = &ret.new_nodes; } else { map = &ret.transition_nodes; // Currently, at most one node at a time can be in transitioning state. if (!map->empty()) { const auto& [other_id, other_rs] = *map->begin(); on_fatal_internal_error(slogger, format( "load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state", other_id, other_rs.state, host_id, nstate)); } } if (map) { map->emplace(host_id, service::replica_state{ nstate, std::move(datacenter), std::move(rack), std::move(release_version), ring_slice, shard_count, ignore_msb, std::move(supported_features), service::cleanup_status_from_string(cleanup_status), request_id}); } } { // Here we access static columns, any row will do. auto& some_row = *rs->begin(); if (some_row.has("version")) { ret.version = some_row.get_as("version"); } if (some_row.has("fence_version")) { ret.fence_version = some_row.get_as("fence_version"); } if (some_row.has("transition_state")) { ret.tstate = service::transition_state_from_string(some_row.get_as("transition_state")); } else { // Any remaining transition_nodes must be in rebuilding state. auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(), [] (auto& p) { return p.second.state != service::node_state::rebuilding; }); if (it != ret.transition_nodes.end()) { on_internal_error(slogger, format( "load_topology_state: topology not in transition state" " but transition node {} in rebuilding state is present", it->first)); } } if (some_row.has("new_cdc_generation_data_uuid")) { ret.new_cdc_generation_data_uuid = some_row.get_as("new_cdc_generation_data_uuid"); } if (some_row.has("committed_cdc_generations")) { ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations")); } if (some_row.has("new_keyspace_rf_change_data")) { ret.new_keyspace_rf_change_ks_name = some_row.get_as("new_keyspace_rf_change_ks_name"); ret.new_keyspace_rf_change_data = some_row.get_map("new_keyspace_rf_change_data"); } if (!ret.committed_cdc_generations.empty()) { // Sanity check for CDC generation data consistency. auto gen_id = ret.committed_cdc_generations.back(); auto gen_rows = co_await execute_cql( format("SELECT count(range_end) as cnt FROM {}.{} WHERE key = '{}' AND id = ?", NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY), gen_id.id); SCYLLA_ASSERT(gen_rows); if (gen_rows->empty()) { on_internal_error(slogger, format( "load_topology_state: last committed CDC generation time UUID ({}) present, but data missing", gen_id.id)); } auto cnt = gen_rows->one().get_as("cnt"); slogger.debug("load_topology_state: last committed CDC generation time UUID ({}), loaded {} ranges", gen_id.id, cnt); } else { if (!ret.normal_nodes.empty()) { on_internal_error(slogger, "load_topology_state: normal nodes present but no committed CDC generations"); } } if (some_row.has("unpublished_cdc_generations")) { ret.unpublished_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "unpublished_cdc_generations")); } if (some_row.has("global_topology_request")) { auto req = service::global_topology_request_from_string( some_row.get_as("global_topology_request")); ret.global_request.emplace(req); } if (some_row.has("global_topology_request_id")) { ret.global_request_id = some_row.get_as("global_topology_request_id"); } if (some_row.has("global_requests")) { for (auto&& v : deserialize_set_column(*topology(), some_row, "global_requests")) { ret.global_requests_queue.push_back(value_cast(v)); } } if (some_row.has("paused_rf_change_requests")) { for (auto&& v : deserialize_set_column(*topology(), some_row, "paused_rf_change_requests")) { ret.paused_rf_change_requests.insert(value_cast(v)); } } if (some_row.has("enabled_features")) { ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features")); } if (some_row.has("session")) { ret.session = service::session_id(some_row.get_as("session")); } if (some_row.has("tablet_balancing_enabled")) { ret.tablet_balancing_enabled = some_row.get_as("tablet_balancing_enabled"); } else { ret.tablet_balancing_enabled = true; } if (some_row.has("upgrade_state")) { ret.upgrade_state = service::upgrade_state_from_string(some_row.get_as("upgrade_state")); } else { ret.upgrade_state = service::topology::upgrade_state_type::not_upgraded; } if (some_row.has("ignore_nodes")) { ret.ignored_nodes = decode_nodes_ids(deserialize_set_column(*topology(), some_row, "ignore_nodes")); } ret.excluded_tablet_nodes = ret.ignored_nodes; for (const auto& [id, _]: ret.left_nodes_rs) { ret.excluded_tablet_nodes.insert(id); } } co_return ret; } future> system_keyspace::load_topology_features_state() { auto rs = co_await execute_cql( format("SELECT host_id, node_state, supported_features, enabled_features FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY)); SCYLLA_ASSERT(rs); co_return decode_topology_features_state(std::move(rs)); } std::optional system_keyspace::decode_topology_features_state(::shared_ptr rs) { service::topology_features ret; if (rs->empty()) { return std::nullopt; } auto& some_row = *rs->begin(); if (!some_row.has("enabled_features")) { return std::nullopt; } for (auto& row : *rs) { if (!row.has("host_id")) { // There are no clustering rows, only the static row. // Skip the whole loop, the static row is handled later. break; } raft::server_id host_id{row.get_as("host_id")}; service::node_state nstate = service::node_state_from_string(row.get_as("node_state")); if (row.has("supported_features") && nstate == service::node_state::normal) { ret.normal_supported_features.emplace(host_id, decode_features(deserialize_set_column(*topology(), row, "supported_features"))); } } ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features")); return ret; } future system_keyspace::read_cdc_generation(utils::UUID id) { auto gen_desc = co_await read_cdc_generation_opt(id); if (!gen_desc) { on_internal_error(slogger, format( "read_cdc_generation: data for CDC generation {} not present", id)); } co_return std::move(*gen_desc); } future> system_keyspace::read_cdc_generation_opt(utils::UUID id) { utils::chunked_vector entries; co_await _qp.query_internal( format("SELECT range_end, streams, ignore_msb FROM {}.{} WHERE key = '{}' AND id = ?", NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY), db::consistency_level::ONE, { id }, 1000, // for ~1KB rows, ~1MB page size [&] (const cql3::untyped_result_set_row& row) { std::vector streams; row.get_list_data("streams", std::back_inserter(streams)); entries.push_back(cdc::token_range_description{ dht::token::from_int64(row.get_as("range_end")), std::move(streams), uint8_t(row.get_as("ignore_msb"))}); return make_ready_future(stop_iteration::no); }); if (entries.empty()) { co_return std::nullopt; } co_return cdc::topology_description{std::move(entries)}; } future<> system_keyspace::sstables_registry_create_entry(table_id owner, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) { static const auto req = format("INSERT INTO system.{} (owner, generation, status, state, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY); slogger.trace("Inserting {}.{} into {}", owner, desc.generation, SSTABLES_REGISTRY); co_await execute_cql(req, owner.id, desc.generation, status, sstables::state_to_dir(state), fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result(); } future<> system_keyspace::sstables_registry_update_entry_status(table_id owner, sstables::generation_type gen, sstring status) { static const auto req = format("UPDATE system.{} SET status = ? WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY); slogger.trace("Updating {}.{} -> status={} in {}", owner, gen, status, SSTABLES_REGISTRY); co_await execute_cql(req, status, owner.id, gen).discard_result(); } future<> system_keyspace::sstables_registry_update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) { static const auto req = format("UPDATE system.{} SET state = ? WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY); auto new_state = sstables::state_to_dir(state); slogger.trace("Updating {}.{} -> state={} in {}", owner, gen, new_state, SSTABLES_REGISTRY); co_await execute_cql(req, new_state, owner.id, gen).discard_result(); } future<> system_keyspace::sstables_registry_delete_entry(table_id owner, sstables::generation_type gen) { static const auto req = format("DELETE FROM system.{} WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY); slogger.trace("Removing {}.{} from {}", owner, gen, SSTABLES_REGISTRY); co_await execute_cql(req, owner.id, gen).discard_result(); } future<> system_keyspace::sstables_registry_list(table_id owner, sstable_registry_entry_consumer consumer) { static const auto req = format("SELECT status, state, generation, version, format FROM system.{} WHERE owner = ?", SSTABLES_REGISTRY); slogger.trace("Listing {} entries from {}", owner, SSTABLES_REGISTRY); co_await _qp.query_internal(req, db::consistency_level::ONE, { owner.id }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future { auto status = row.get_as("status"); auto state = sstables::state_from_dir(row.get_as("state")); auto gen = sstables::generation_type(row.get_as("generation")); auto ver = sstables::version_from_string(row.get_as("version")); auto fmt = sstables::format_from_string(row.get_as("format")); sstables::entry_descriptor desc(gen, ver, fmt, sstables::component_type::TOC); co_await consumer(std::move(status), std::move(state), std::move(desc)); co_return stop_iteration::no; }); } future system_keyspace::get_topology_request_state(utils::UUID id, bool require_entry) { auto rs = co_await execute_cql( format("SELECT done, error FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id)); if (!rs || rs->empty()) { if (require_entry) { on_internal_error(slogger, format("no entry for request id {}", id)); } else { co_return service::topology_request_state{false, ""}; } } auto& row = rs->one(); sstring error; if (row.has("error")) { error = row.get_as("error"); } co_return service::topology_request_state{row.get_as("done"), std::move(error)}; } system_keyspace::topology_requests_entry system_keyspace::topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row) { topology_requests_entry entry; entry.id = id; if (row.has("initiating_host")) { entry.initiating_host = row.get_as("initiating_host"); } if (row.has("request_type")) { auto rts = row.get_as("request_type"); auto rt = service::try_topology_request_from_string(rts); if (rt) { entry.request_type = *rt; } else { entry.request_type = service::global_topology_request_from_string(rts); } } if (row.has("start_time")) { entry.start_time = row.get_as("start_time"); } if (row.has("done")) { entry.done = row.get_as("done"); } if (row.has("error")) { entry.error = row.get_as("error"); } if (row.has("end_time")) { entry.end_time = row.get_as("end_time"); } if (row.has("truncate_table_id")) { entry.truncate_table_id = table_id(row.get_as("truncate_table_id")); } if (row.has("new_keyspace_rf_change_data")) { entry.new_keyspace_rf_change_ks_name = row.get_as("new_keyspace_rf_change_ks_name"); entry.new_keyspace_rf_change_data = row.get_map("new_keyspace_rf_change_data"); } return entry; } future system_keyspace::get_topology_request_entry(utils::UUID id) { auto r = co_await get_topology_request_entry_opt(id); if (!r) { on_internal_error(slogger, format("no entry for request id {}", id)); } co_return std::move(*r); } future> system_keyspace::get_topology_request_entry_opt(utils::UUID id) { auto rs = co_await execute_cql( format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id)); if (!rs || rs->empty()) { co_return std::nullopt; } const auto& row = rs->one(); co_return topology_request_row_to_entry(id, row); } future system_keyspace::get_topology_request_entries(std::vector> request_types, db_clock::time_point end_time_limit) { sstring request_types_str = ""; bool first = true; for (const auto& rt : request_types) { if (!std::exchange(first, false)) { request_types_str += ", "; } request_types_str += std::visit([] (auto&& arg) { return fmt::format("'{}'", arg); }, rt); } // Running requests. auto rs_running = co_await execute_cql( format("SELECT * FROM system.{} WHERE done = false AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, request_types_str)); // Requests which finished after end_time_limit. auto rs_done = co_await execute_cql( format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(), request_types_str)); topology_requests_entries m; for (const auto& row: *rs_done) { auto id = row.get_as("id"); m.emplace(id, topology_request_row_to_entry(id, row)); } for (const auto& row: *rs_running) { auto id = row.get_as("id"); // If a topology request finishes between the reads, it may be contained in both row sets. // Keep the latest info. m.emplace(id, topology_request_row_to_entry(id, row)); } co_return m; } future system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) { return get_topology_request_entries({ service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove }, end_time_limit); } future system_keyspace::get_insert_dict_mutation( std::string_view name, bytes data, locator::host_id host_id, db_clock::time_point dict_ts, api::timestamp_type write_ts ) const { slogger.debug("Publishing new compression dictionary: {} {} {}", name, dict_ts, host_id); static sstring insert_new = format("INSERT INTO {}.{} (name, timestamp, origin, data) VALUES (?, ?, ?, ?);", NAME, DICTS); auto muts = co_await _qp.get_mutations_internal(insert_new, internal_system_query_state(), write_ts, { data_value(name), data_value(dict_ts), data_value(host_id.uuid()), data_value(std::move(data)), }); if (muts.size() != 1) { on_internal_error(slogger, "Expected to prepare a single mutation, but got multiple."); } co_return std::move(muts[0]); } mutation system_keyspace::get_delete_dict_mutation(std::string_view name, api::timestamp_type write_ts) { auto s = db::system_keyspace::dicts(); mutation m(s, partition_key::from_single_value(*s, data_value(name).serialize_nonnull() )); m.partition().apply(tombstone(write_ts, gc_clock::now())); return m; } future> system_keyspace::query_all_dict_names() const { std::vector result; sstring query = format("SELECT name from {}.{}", NAME, DICTS); auto rs = co_await _qp.execute_internal( query, db::consistency_level::ONE, internal_system_query_state(), {}, cql3::query_processor::cache_internal::yes); for (const auto& row : *rs) { result.push_back(row.get_as("name")); } co_return result; } future system_keyspace::query_dict(std::string_view name) const { static sstring query = format("SELECT * FROM {}.{} WHERE name = ?;", NAME, DICTS); auto result_set = co_await _qp.execute_internal( query, db::consistency_level::ONE, internal_system_query_state(), {name}, cql3::query_processor::cache_internal::yes); if (!result_set->empty()) { auto &&row = result_set->one(); auto content = row.get_as("data"); auto timestamp = row.get_as("timestamp").time_since_epoch().count(); auto origin = row.get_as("origin"); const int zstd_compression_level = 1; co_return netw::shared_dict( std::as_bytes(std::span(content)), timestamp, origin, zstd_compression_level ); } else { co_return netw::shared_dict(); } } future> system_keyspace::query_dict_timestamp(std::string_view name) const { static sstring query = format("SELECT timestamp FROM {}.{} WHERE name = ?;", NAME, DICTS); auto result_set = co_await _qp.execute_internal( query, db::consistency_level::ONE, internal_system_query_state(), {name}, cql3::query_processor::cache_internal::yes); if (!result_set->empty()) { auto &&row = result_set->one(); auto timestamp = row.get_as("timestamp"); co_return timestamp; } else { co_return std::nullopt; } } sstring system_keyspace_name() { return system_keyspace::NAME; } system_keyspace::system_keyspace( cql3::query_processor& qp, replica::database& db) noexcept : _qp(qp) , _db(db) , _cache(std::make_unique()) { _db.plug_system_keyspace(*this); } system_keyspace::~system_keyspace() { } future<> system_keyspace::shutdown() { if (!_shutdown) { _shutdown = true; co_await _db.unplug_system_keyspace(); } } future<> system_keyspace::stop() { co_await shutdown(); } future<::shared_ptr> system_keyspace::execute_cql(const sstring& query_string, const data_value_list& values) { return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes); } future<> system_keyspace::apply_mutation(mutation m) { if (m.schema()->ks_name() != NAME) { on_internal_error(slogger, fmt::format("system_keyspace::apply_mutation(): attempted to apply mutation belonging to table {}.{}", m.schema()->cf_name(), m.schema()->ks_name())); } return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout); } } // namespace db