initially logstor tables will not support tablet migrations, so disable tablet balancing if the experimental feature flag is set.
3724 lines
162 KiB
C++
3724 lines
162 KiB
C++
/*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include <boost/range/algorithm.hpp>
|
|
#include <boost/functional/hash.hpp>
|
|
#include <boost/icl/interval_map.hpp>
|
|
#include <fmt/ranges.h>
|
|
#include <ranges>
|
|
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/core/loop.hh>
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include "system_keyspace.hh"
|
|
#include "cql3/untyped_result_set.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "locator/host_id.hh"
|
|
#include "locator/tablets.hh"
|
|
#include "partition_slice_builder.hh"
|
|
#include "db/config.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "system_keyspace_view_types.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/hashers.hh"
|
|
#include "utils/log.hh"
|
|
#include <seastar/core/enum.hh>
|
|
#include "gms/inet_address.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "mutation_query.hh"
|
|
#include "db/timeout_clock.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "gms/generation-number.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "service/paxos/paxos_state.hh"
|
|
#include "query/query-result-set.hh"
|
|
#include "idl/frozen_mutation.dist.hh"
|
|
#include "idl/frozen_mutation.dist.impl.hh"
|
|
#include "service/topology_state_machine.hh"
|
|
#include "sstables/generation_type.hh"
|
|
#include "cdc/generation.hh"
|
|
#include "replica/tablets.hh"
|
|
#include "replica/query.hh"
|
|
#include "types/types.hh"
|
|
#include "service/raft/raft_group0_client.hh"
|
|
#include "message/shared_dict.hh"
|
|
#include "replica/database.hh"
|
|
#include "db/compaction_history_entry.hh"
|
|
#include "mutation/async_utils.hh"
|
|
|
|
#include <unordered_map>
|
|
|
|
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
|
|
|
static thread_local auto sstableinfo_type = user_type_impl::get_instance(
|
|
"system", "sstableinfo", {"generation", "origin", "size"}, {uuid_type, utf8_type, long_type}, false);
|
|
|
|
namespace db {
|
|
namespace {
|
|
const auto set_null_sharder = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
|
// tables in the "system" keyspace which need to use null sharder
|
|
static const std::unordered_set<sstring> tables = {
|
|
// empty
|
|
};
|
|
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
|
builder.set_use_null_sharder(true);
|
|
}
|
|
});
|
|
const auto set_wait_for_sync_to_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
|
static const std::unordered_set<sstring> tables = {
|
|
system_keyspace::PAXOS,
|
|
};
|
|
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
|
builder.set_wait_for_sync_to_commitlog(true);
|
|
}
|
|
});
|
|
const auto set_use_schema_commitlog = schema_builder::register_schema_initializer([](schema_builder& builder) {
|
|
static const std::unordered_set<sstring> tables = {
|
|
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
|
|
system_keyspace::BROADCAST_KV_STORE,
|
|
system_keyspace::RAFT,
|
|
system_keyspace::RAFT_SNAPSHOTS,
|
|
system_keyspace::RAFT_SNAPSHOT_CONFIG,
|
|
system_keyspace::GROUP0_HISTORY,
|
|
system_keyspace::DISCOVERY,
|
|
system_keyspace::LOCAL,
|
|
system_keyspace::PEERS,
|
|
system_keyspace::COMMITLOG_CLEANUPS,
|
|
system_keyspace::CDC_LOCAL,
|
|
};
|
|
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
|
builder.enable_schema_commitlog();
|
|
}
|
|
});
|
|
|
|
const auto set_group0_table_options =
|
|
schema_builder::register_schema_initializer([](schema_builder& builder) {
|
|
static const std::unordered_set<sstring> tables = {
|
|
// scylla_local may store a replicated tombstone related to schema
|
|
// (see `make_group0_schema_version_mutation`), so we include it in the group0 tables list.
|
|
system_keyspace::SCYLLA_LOCAL,
|
|
system_keyspace::TOPOLOGY,
|
|
system_keyspace::TOPOLOGY_REQUESTS,
|
|
system_keyspace::CDC_GENERATIONS_V3,
|
|
system_keyspace::TABLETS,
|
|
system_keyspace::SERVICE_LEVELS_V2,
|
|
system_keyspace::VIEW_BUILD_STATUS_V2,
|
|
system_keyspace::CDC_STREAMS_STATE,
|
|
system_keyspace::CDC_STREAMS_HISTORY,
|
|
// auth tables
|
|
system_keyspace::ROLES,
|
|
system_keyspace::ROLE_MEMBERS,
|
|
system_keyspace::ROLE_ATTRIBUTES,
|
|
system_keyspace::ROLE_PERMISSIONS,
|
|
system_keyspace::DICTS,
|
|
system_keyspace::VIEW_BUILDING_TASKS,
|
|
system_keyspace::CLIENT_ROUTES,
|
|
system_keyspace::REPAIR_TASKS,
|
|
};
|
|
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
|
builder.set_is_group0_table();
|
|
}
|
|
});
|
|
}
|
|
|
|
static logging::logger slogger("system_keyspace");
|
|
static const api::timestamp_type creation_timestamp = api::new_timestamp();
|
|
|
|
api::timestamp_type system_keyspace::schema_creation_timestamp() {
|
|
return creation_timestamp;
|
|
}
|
|
|
|
|
|
// Currently, the type variables (uuid_type, etc.) are thread-local reference-
|
|
// counted shared pointers. This forces us to also make the built in schemas
|
|
// below thread-local as well.
|
|
// We return schema_ptr, not schema&, because that's the "tradition" in our
|
|
// other code.
|
|
// We hide the thread_local variable inside a function, because if we later
|
|
// we remove the thread_local, we'll start having initialization order
|
|
// problems (we need the type variables to be constructed first), and using
|
|
// functions will solve this problem. So we use functions right now.
|
|
|
|
|
|
schema_ptr system_keyspace::hints() {
|
|
static thread_local auto hints = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
|
|
// partition key
|
|
{{"target_id", uuid_type}},
|
|
// clustering key
|
|
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
|
|
// regular columns
|
|
{{"mutation", bytes_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"hints awaiting delivery"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.set_compaction_strategy_options({{ "enabled", "false" }});
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::yes);
|
|
}();
|
|
return hints;
|
|
}
|
|
|
|
schema_ptr system_keyspace::batchlog() {
|
|
static thread_local auto batchlog = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"batches awaiting replay"
|
|
// FIXME: the original Java code also had:
|
|
// operations on resulting CFMetaData:
|
|
// .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2"))
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return batchlog;
|
|
}
|
|
|
|
schema_ptr system_keyspace::batchlog_v2() {
|
|
static thread_local auto batchlog_v2 = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, BATCHLOG_V2), NAME, BATCHLOG_V2,
|
|
// partition key
|
|
{{"version", int32_type}, {"stage", byte_type}, {"shard", int32_type}},
|
|
// clustering key
|
|
{{"written_at", timestamp_type}, {"id", uuid_type}},
|
|
// regular columns
|
|
{{"data", bytes_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"batches awaiting replay"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.set_caching_options(caching_options::get_disabled_caching_options());
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return batchlog_v2;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::paxos() {
|
|
static thread_local auto paxos = [] {
|
|
// FIXME: switch to the new schema_builder interface (with_column(...), etc)
|
|
schema_builder builder(generate_legacy_id(NAME, PAXOS), NAME, PAXOS,
|
|
// partition key
|
|
{{"row_key", bytes_type}}, // byte representation of a row key that hashes to the same token as original
|
|
// clustering key
|
|
{{"cf_id", uuid_type}},
|
|
// regular columns
|
|
{
|
|
{"promise", timeuuid_type},
|
|
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
|
|
{"most_recent_commit_at", timeuuid_type},
|
|
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
|
|
{"proposal_ballot", timeuuid_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"in-progress paxos proposals"
|
|
// FIXME: the original Java code also had:
|
|
// operations on resulting CFMetaData:
|
|
// .compactionStrategyClass(LeveledCompactionStrategy.class);
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return paxos;
|
|
}
|
|
|
|
thread_local data_type cdc_generation_ts_id_type = tuple_type_impl::get_instance({timestamp_type, timeuuid_type});
|
|
|
|
schema_ptr system_keyspace::topology() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, TOPOLOGY);
|
|
return schema_builder(NAME, TOPOLOGY, std::optional(id))
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
|
.with_column("datacenter", utf8_type)
|
|
.with_column("rack", utf8_type)
|
|
.with_column("tokens", set_type_impl::get_instance(utf8_type, true))
|
|
.with_column("node_state", utf8_type)
|
|
.with_column("release_version", utf8_type)
|
|
.with_column("topology_request", utf8_type)
|
|
.with_column("replaced_id", uuid_type)
|
|
.with_column("rebuild_option", utf8_type)
|
|
.with_column("num_tokens", int32_type)
|
|
.with_column("tokens_string", utf8_type)
|
|
.with_column("shard_count", int32_type)
|
|
.with_column("ignore_msb", int32_type)
|
|
.with_column("cleanup_status", utf8_type)
|
|
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
|
|
.with_column("request_id", timeuuid_type)
|
|
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
|
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
|
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column) // deprecated
|
|
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column) // deprecated
|
|
.with_column("version", long_type, column_kind::static_column)
|
|
.with_column("fence_version", long_type, column_kind::static_column)
|
|
.with_column("transition_state", utf8_type, column_kind::static_column)
|
|
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
|
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
|
.with_column("global_topology_request", utf8_type, column_kind::static_column)
|
|
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
|
|
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
|
|
.with_column("session", uuid_type, column_kind::static_column)
|
|
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
|
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
|
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
|
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
|
.set_comment("Current state of topology change machine")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::topology_requests() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, TOPOLOGY_REQUESTS);
|
|
return schema_builder(NAME, TOPOLOGY_REQUESTS, std::optional(id))
|
|
.with_column("id", timeuuid_type, column_kind::partition_key)
|
|
.with_column("initiating_host", uuid_type)
|
|
.with_column("request_type", utf8_type)
|
|
.with_column("start_time", timestamp_type)
|
|
.with_column("done", boolean_type)
|
|
.with_column("error", utf8_type)
|
|
.with_column("end_time", timestamp_type)
|
|
.with_column("truncate_table_id", uuid_type)
|
|
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
|
|
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
|
.with_column("snapshot_table_ids", set_type_impl::get_instance(uuid_type, false))
|
|
.with_column("snapshot_tag", utf8_type)
|
|
.with_column("snapshot_expiry", timestamp_type)
|
|
.with_column("snapshot_skip_flush", boolean_type)
|
|
.set_comment("Topology request tracking")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
extern thread_local data_type cdc_streams_set_type;
|
|
|
|
/* An internal table used by nodes to store CDC generation data.
|
|
* Written to by Raft Group 0. */
|
|
schema_ptr system_keyspace::cdc_generations_v3() {
|
|
thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3);
|
|
return schema_builder(NAME, CDC_GENERATIONS_V3, {id})
|
|
/* This is a single-partition table with key 'cdc_generations'. */
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
/* The unique identifier of this generation. */
|
|
.with_column("id", timeuuid_type, column_kind::clustering_key)
|
|
/* The generation describes a mapping from all tokens in the token ring to a set of stream IDs.
|
|
* This mapping is built from a bunch of smaller mappings, each describing how tokens in a
|
|
* subrange of the token ring are mapped to stream IDs; these subranges together cover the entire
|
|
* token ring. Each such range-local mapping is represented by a row of this table. The second
|
|
* column of the clustering key of the row is the end of the range being described by this row.
|
|
* The start of this range is the range_end of the previous row (in the clustering order, which
|
|
* is the integer order) or of the last row with the same id value if this is the first row with
|
|
* such id. */
|
|
.with_column("range_end", long_type, column_kind::clustering_key)
|
|
/* The set of streams mapped to in this range. The number of streams mapped to a single range in
|
|
* a CDC generation is bounded from above by the number of shards on the owner of that range in
|
|
* the token ring. In other words, the number of elements of this set is bounded by the maximum
|
|
* of the number of shards over all nodes. The serialized size is obtained by counting about 20B
|
|
* for each stream. For example, if all nodes in the cluster have at most 128 shards, the
|
|
* serialized size of this set will be bounded by ~2.5 KB. */
|
|
.with_column("streams", cdc_streams_set_type)
|
|
/* The value of the `ignore_msb` sharding parameter of the node which was the owner of this token
|
|
* range when the generation was first created. Together with the set of streams above it fully
|
|
* describes the mapping for this particular range. */
|
|
.with_column("ignore_msb", byte_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::cdc_streams_state() {
|
|
thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, CDC_STREAMS_STATE);
|
|
return schema_builder(NAME, CDC_STREAMS_STATE, {id})
|
|
.with_column("table_id", uuid_type, column_kind::partition_key)
|
|
.with_column("last_token", long_type, column_kind::clustering_key)
|
|
.with_column("stream_id", bytes_type)
|
|
.with_column("timestamp", timestamp_type, column_kind::static_column)
|
|
.set_comment("Oldest CDC stream set for tablets-based tables")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::cdc_streams_history() {
|
|
thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, CDC_STREAMS_HISTORY);
|
|
return schema_builder(NAME, CDC_STREAMS_HISTORY, {id})
|
|
.with_column("table_id", uuid_type, column_kind::partition_key)
|
|
.with_column("timestamp", timestamp_type, column_kind::clustering_key)
|
|
.with_column("stream_state", byte_type, column_kind::clustering_key)
|
|
.with_column("last_token", long_type, column_kind::clustering_key)
|
|
.with_column("stream_id", bytes_type)
|
|
.set_comment("CDC stream sets for tablets-based tables described as differences from the previous state")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::raft() {
|
|
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT, true);
|
|
return schema;
|
|
}
|
|
|
|
// Note that this table does not include actula user snapshot data since it's dependent
|
|
// on user-provided state machine and could be stored anywhere else in any other form.
|
|
// This should be seen as a snapshot descriptor, instead.
|
|
schema_ptr system_keyspace::raft_snapshots() {
|
|
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_SNAPSHOTS, true);
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::raft_snapshot_config() {
|
|
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_SNAPSHOT_CONFIG, true);
|
|
return schema;
|
|
}
|
|
|
|
// Raft tables for strongly consistent tablets.
|
|
// These tables have partition keys of the form (shard, group_id), allowing the data
|
|
// to be co-located with the tablet replica that owns the raft group.
|
|
// The raft_groups_partitioner creates tokens that map to the specified shard.
|
|
|
|
schema_ptr system_keyspace::raft_groups() {
|
|
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT_GROUPS, false);
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::raft_groups_snapshots() {
|
|
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOTS, false);
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::raft_groups_snapshot_config() {
|
|
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOT_CONFIG, false);
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::repair_history() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, REPAIR_HISTORY);
|
|
return schema_builder(NAME, REPAIR_HISTORY, std::optional(id))
|
|
.with_column("table_uuid", uuid_type, column_kind::partition_key)
|
|
// The time is repair start time
|
|
.with_column("repair_time", timestamp_type, column_kind::clustering_key)
|
|
.with_column("repair_uuid", uuid_type, column_kind::clustering_key)
|
|
// The token range is (range_start, range_end]
|
|
.with_column("range_start", long_type, column_kind::clustering_key)
|
|
.with_column("range_end", long_type, column_kind::clustering_key)
|
|
.with_column("keyspace_name", utf8_type, column_kind::static_column)
|
|
.with_column("table_name", utf8_type, column_kind::static_column)
|
|
.set_comment("Record repair history")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::repair_tasks() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
|
|
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
|
|
.with_column("task_uuid", uuid_type, column_kind::partition_key)
|
|
.with_column("operation", utf8_type, column_kind::clustering_key)
|
|
// First and last token for of the tablet
|
|
.with_column("first_token", long_type, column_kind::clustering_key)
|
|
.with_column("last_token", long_type, column_kind::clustering_key)
|
|
.with_column("timestamp", timestamp_type)
|
|
.with_column("table_uuid", uuid_type, column_kind::static_column)
|
|
.set_comment("Record tablet repair tasks")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::built_indexes() {
|
|
static thread_local auto built_indexes = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
|
|
// partition key
|
|
{{"table_name", utf8_type}}, // table_name here is the name of the keyspace - don't be fooled
|
|
// clustering key
|
|
{{"index_name", utf8_type}},
|
|
// regular columns
|
|
{},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"built column indexes"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::yes);
|
|
}();
|
|
return built_indexes;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::local() {
|
|
static thread_local auto local = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, LOCAL), NAME, LOCAL,
|
|
// partition key
|
|
{{"key", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"bootstrapped", utf8_type},
|
|
{"cluster_name", utf8_type},
|
|
{"cql_version", utf8_type},
|
|
{"data_center", utf8_type},
|
|
{"gossip_generation", int32_type},
|
|
{"host_id", uuid_type},
|
|
{"native_protocol_version", utf8_type},
|
|
{"partitioner", utf8_type},
|
|
{"rack", utf8_type},
|
|
{"release_version", utf8_type},
|
|
{"schema_version", uuid_type},
|
|
{"thrift_version", utf8_type},
|
|
{"tokens", set_type_impl::get_instance(utf8_type, true)},
|
|
{"truncated_at", map_type_impl::get_instance(uuid_type, bytes_type, true)},
|
|
// The following 3 columns are only present up until 2.1.8 tables
|
|
{"rpc_address", inet_addr_type},
|
|
{"broadcast_address", inet_addr_type},
|
|
{"listen_address", inet_addr_type},
|
|
// This column represents advertised local features (i.e. the features
|
|
// advertised by the node via gossip after passing the feature check
|
|
// against remote features in the cluster)
|
|
{"supported_features", utf8_type},
|
|
{"scylla_cpu_sharding_algorithm", utf8_type},
|
|
{"scylla_nr_shards", int32_type},
|
|
{"scylla_msb_ignore", int32_type},
|
|
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about the local node"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
auto drop_timestamp = api::max_timestamp;
|
|
builder.remove_column("scylla_cpu_sharding_algorithm", drop_timestamp);
|
|
builder.remove_column("scylla_nr_shards", drop_timestamp);
|
|
builder.remove_column("scylla_msb_ignore", drop_timestamp);
|
|
builder.remove_column("thrift_version", drop_timestamp);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return local;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::peers() {
|
|
static thread_local auto peers = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, PEERS), NAME, PEERS,
|
|
// partition key
|
|
{{"peer", inet_addr_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"data_center", utf8_type},
|
|
{"host_id", uuid_type},
|
|
{"preferred_ip", inet_addr_type},
|
|
{"rack", utf8_type},
|
|
{"release_version", utf8_type},
|
|
{"rpc_address", inet_addr_type},
|
|
{"schema_version", uuid_type},
|
|
{"tokens", set_type_impl::get_instance(utf8_type, true)},
|
|
{"supported_features", utf8_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about known peers in the cluster"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return peers;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::peer_events() {
|
|
static thread_local auto peer_events = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, PEER_EVENTS), NAME, PEER_EVENTS,
|
|
// partition key
|
|
{{"peer", inet_addr_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"hints_dropped", map_type_impl::get_instance(uuid_type, int32_type, true)},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"events related to peers"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return peer_events;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::range_xfers() {
|
|
static thread_local auto range_xfers = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, RANGE_XFERS), NAME, RANGE_XFERS,
|
|
// partition key
|
|
{{"token_bytes", bytes_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"requested_at", timestamp_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"ranges requested for transfer"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return range_xfers;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::compactions_in_progress() {
|
|
static thread_local auto compactions_in_progress = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, COMPACTIONS_IN_PROGRESS), NAME, COMPACTIONS_IN_PROGRESS,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"columnfamily_name", utf8_type},
|
|
{"inputs", set_type_impl::get_instance(int32_type, true)},
|
|
{"keyspace_name", utf8_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"unfinished compactions"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return compactions_in_progress;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::compaction_history() {
|
|
static thread_local auto compaction_history = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, COMPACTION_HISTORY), NAME, COMPACTION_HISTORY,
|
|
// partition key
|
|
{{"id", uuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"bytes_in", long_type},
|
|
{"bytes_out", long_type},
|
|
{"columnfamily_name", utf8_type},
|
|
{"started_at", timestamp_type},
|
|
{"compacted_at", timestamp_type},
|
|
{"compaction_type", utf8_type},
|
|
{"keyspace_name", utf8_type},
|
|
{"rows_merged", map_type_impl::get_instance(int32_type, long_type, true)},
|
|
{"shard_id", int32_type},
|
|
{"sstables_in", list_type_impl::get_instance(sstableinfo_type, false)},
|
|
{"sstables_out", list_type_impl::get_instance(sstableinfo_type, false)},
|
|
{"total_tombstone_purge_attempt", long_type},
|
|
{"total_tombstone_purge_failure_due_to_overlapping_with_memtable", long_type},
|
|
{"total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable", long_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"week-long compaction history"
|
|
);
|
|
builder.set_default_time_to_live(std::chrono::duration_cast<std::chrono::seconds>(days(7)));
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return compaction_history;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::sstable_activity() {
|
|
static thread_local auto sstable_activity = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, SSTABLE_ACTIVITY), NAME, SSTABLE_ACTIVITY,
|
|
// partition key
|
|
{
|
|
{"keyspace_name", utf8_type},
|
|
{"columnfamily_name", utf8_type},
|
|
{"generation", int32_type},
|
|
},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"rate_120m", double_type},
|
|
{"rate_15m", double_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"historic sstable read rates"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return sstable_activity;
|
|
}
|
|
|
|
schema_ptr system_keyspace::size_estimates() {
|
|
static thread_local auto size_estimates = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, SIZE_ESTIMATES), NAME, SIZE_ESTIMATES,
|
|
// partition key
|
|
{{"keyspace_name", utf8_type}},
|
|
// clustering key
|
|
{{"table_name", utf8_type}, {"range_start", utf8_type}, {"range_end", utf8_type}},
|
|
// regular columns
|
|
{
|
|
{"mean_partition_size", long_type},
|
|
{"partitions_count", long_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"per-table primary range size estimates"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return size_estimates;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::large_partitions() {
|
|
static thread_local auto large_partitions = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, LARGE_PARTITIONS), NAME, LARGE_PARTITIONS,
|
|
// partition key
|
|
{{"keyspace_name", utf8_type}, {"table_name", utf8_type}},
|
|
// clustering key
|
|
{
|
|
{"sstable_name", utf8_type},
|
|
{"partition_size", reversed_type_impl::get_instance(long_type)},
|
|
{"partition_key", utf8_type}
|
|
}, // CLUSTERING ORDER BY (partition_size DESC)
|
|
// regular columns
|
|
{
|
|
{"rows", long_type},
|
|
{"compaction_time", timestamp_type},
|
|
{"range_tombstones", long_type},
|
|
{"dead_rows", long_type}
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"partitions larger than specified threshold"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.set_caching_options(caching_options::get_disabled_caching_options());
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return large_partitions;
|
|
}
|
|
|
|
schema_ptr system_keyspace::large_rows() {
|
|
static thread_local auto large_rows = [] {
|
|
auto id = generate_legacy_id(NAME, LARGE_ROWS);
|
|
return schema_builder(NAME, LARGE_ROWS, std::optional(id))
|
|
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
|
.with_column("table_name", utf8_type, column_kind::partition_key)
|
|
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
|
// We want the large rows first, so use reversed_type_impl
|
|
.with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
|
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
|
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
|
.with_column("compaction_time", timestamp_type)
|
|
.set_comment("rows larger than specified threshold")
|
|
.set_gc_grace_seconds(0)
|
|
.set_caching_options(caching_options::get_disabled_caching_options())
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return large_rows;
|
|
}
|
|
|
|
schema_ptr system_keyspace::large_cells() {
|
|
static thread_local auto large_cells = [] {
|
|
auto id = generate_legacy_id(NAME, LARGE_CELLS);
|
|
return schema_builder(NAME, LARGE_CELLS, id)
|
|
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
|
.with_column("table_name", utf8_type, column_kind::partition_key)
|
|
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
|
// We want the larger cells first, so use reversed_type_impl
|
|
.with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
|
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
|
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
|
.with_column("column_name", utf8_type, column_kind::clustering_key)
|
|
// regular rows
|
|
.with_column("collection_elements", long_type)
|
|
.with_column("compaction_time", timestamp_type)
|
|
.set_comment("cells larger than specified threshold")
|
|
.set_gc_grace_seconds(0)
|
|
.set_caching_options(caching_options::get_disabled_caching_options())
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return large_cells;
|
|
}
|
|
|
|
schema_ptr system_keyspace::corrupt_data() {
|
|
static thread_local auto corrupt_data = [] {
|
|
auto id = generate_legacy_id(NAME, CORRUPT_DATA);
|
|
return schema_builder(NAME, CORRUPT_DATA, id)
|
|
// partition key
|
|
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
|
.with_column("table_name", utf8_type, column_kind::partition_key)
|
|
// clustering key
|
|
.with_column("id", timeuuid_type, column_kind::clustering_key)
|
|
// regular rows
|
|
// Storing keys as bytes: having a corrupt key might be the reason
|
|
// to record the row as corrupt, so we just dump what we have and
|
|
// leave interpreting to the lucky person investigating the disaster.
|
|
.with_column("partition_key", bytes_type)
|
|
.with_column("clustering_key", bytes_type)
|
|
// Note: mutation-fragment v2
|
|
.with_column("mutation_fragment_kind", utf8_type)
|
|
.with_column("frozen_mutation_fragment", bytes_type)
|
|
.with_column("origin", utf8_type)
|
|
.with_column("sstable_name", utf8_type)
|
|
// options
|
|
.set_comment("mutation-fragments found to be corrupted")
|
|
.set_gc_grace_seconds(0)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return corrupt_data;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::scylla_local() {
|
|
static thread_local auto scylla_local = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
|
|
// partition key
|
|
{{"key", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"value", utf8_type},
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"Scylla specific information about the local node"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return scylla_local;
|
|
}
|
|
|
|
schema_ptr system_keyspace::batches() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, BATCHES), NAME, BATCHES,
|
|
// partition key
|
|
{{"id", timeuuid_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"mutations", list_type_impl::get_instance(bytes_type, true)}, {"version", int32_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"batches awaiting replay"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
// FIXME: the original Java code also had:
|
|
//.copy(new LocalPartitioner(TimeUUIDType.instance))
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
|
|
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::truncated() {
|
|
static thread_local auto local = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, TRUNCATED), NAME, TRUNCATED,
|
|
// partition key
|
|
{{"table_uuid", uuid_type}},
|
|
// clustering key
|
|
{{"shard", int32_type}},
|
|
// regular columns
|
|
{
|
|
{"position", int32_type},
|
|
{"segment_id", long_type}
|
|
},
|
|
// static columns
|
|
{
|
|
{"truncated_at", timestamp_type},
|
|
},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about table truncation"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return local;
|
|
}
|
|
|
|
thread_local data_type replay_position_type = tuple_type_impl::get_instance({long_type, int32_type});
|
|
|
|
schema_ptr system_keyspace::commitlog_cleanups() {
|
|
static thread_local auto local = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, COMMITLOG_CLEANUPS), NAME, COMMITLOG_CLEANUPS,
|
|
// partition key
|
|
{{"shard", int32_type}},
|
|
// clustering key
|
|
{
|
|
{"position", replay_position_type},
|
|
{"table_uuid", uuid_type},
|
|
{"start_token_exclusive", long_type},
|
|
{"end_token_inclusive", long_type},
|
|
},
|
|
// regular columns
|
|
{},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"information about cleanups, for filtering commitlog replay"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return local;
|
|
}
|
|
|
|
schema_ptr system_keyspace::available_ranges() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, AVAILABLE_RANGES), NAME, AVAILABLE_RANGES,
|
|
// partition key
|
|
{{"keyspace_name", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{{"ranges", set_type_impl::get_instance(bytes_type, true)}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"available keyspace/ranges during bootstrap/replace that are ready to be served"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::views_builds_in_progress() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, VIEWS_BUILDS_IN_PROGRESS), NAME, VIEWS_BUILDS_IN_PROGRESS,
|
|
// partition key
|
|
{{"keyspace_name", utf8_type}},
|
|
// clustering key
|
|
{{"view_name", utf8_type}},
|
|
// regular columns
|
|
{{"last_token", utf8_type}, {"generation_number", int32_type}},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"views builds current progress"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::built_views() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, BUILT_VIEWS), NAME, BUILT_VIEWS,
|
|
// partition key
|
|
{{"keyspace_name", utf8_type}},
|
|
// clustering key
|
|
{{"view_name", utf8_type}},
|
|
// regular columns
|
|
{},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"built views"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::scylla_views_builds_in_progress() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
|
return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, std::make_optional(id))
|
|
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
|
.with_column("view_name", utf8_type, column_kind::clustering_key)
|
|
.with_column("cpu_id", int32_type, column_kind::clustering_key)
|
|
.with_column("next_token", utf8_type)
|
|
.with_column("generation_number", int32_type)
|
|
.with_column("first_token", utf8_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
/*static*/ schema_ptr system_keyspace::cdc_local() {
|
|
static thread_local auto cdc_local = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, CDC_LOCAL), NAME, CDC_LOCAL,
|
|
// partition key
|
|
{{"key", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
/* Every node announces the identifier of the newest known CDC generation to other nodes.
|
|
* The identifier consists of two things: a timestamp (which is the generation's timestamp,
|
|
* denoting the time point from which it starts operating) and an UUID (randomly generated
|
|
* when the generation is created).
|
|
* This identifier is persisted here and restored on node restart.
|
|
*
|
|
* Some identifiers - identifying generations created in older clusters - have only the timestamp.
|
|
* For these the uuid column is empty.
|
|
*/
|
|
{"streams_timestamp", timestamp_type},
|
|
{"uuid", uuid_type},
|
|
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"CDC-specific information that the local node stores"
|
|
);
|
|
builder.set_gc_grace_seconds(0);
|
|
builder.with_hash_version();
|
|
return builder.build(schema_builder::compact_storage::no);
|
|
}();
|
|
return cdc_local;
|
|
}
|
|
|
|
schema_ptr system_keyspace::group0_history() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, GROUP0_HISTORY);
|
|
return schema_builder(NAME, GROUP0_HISTORY, id)
|
|
// this is a single-partition table with key 'history'
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
// group0 state timeuuid, descending order
|
|
.with_column("state_id", reversed_type_impl::get_instance(timeuuid_type), column_kind::clustering_key)
|
|
// human-readable description of the change
|
|
.with_column("description", utf8_type)
|
|
|
|
.set_comment("History of Raft group 0 state changes")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::discovery() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, DISCOVERY);
|
|
return schema_builder(NAME, DISCOVERY, id)
|
|
// This is a single-partition table with key 'peers'
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
// Peer ip address
|
|
.with_column("ip_addr", inet_addr_type, column_kind::clustering_key)
|
|
// The ID of the group 0 server on that peer.
|
|
// May be unknown during discovery, then it's set to UUID 0.
|
|
.with_column("raft_server_id", uuid_type)
|
|
.set_comment("State of cluster discovery algorithm: the set of discovered peers")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::broadcast_kv_store() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, BROADCAST_KV_STORE);
|
|
return schema_builder(NAME, BROADCAST_KV_STORE, id)
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
.with_column("value", utf8_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::sstables_registry() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, SSTABLES_REGISTRY);
|
|
return schema_builder(NAME, SSTABLES_REGISTRY, id)
|
|
.with_column("owner", uuid_type, column_kind::partition_key)
|
|
.with_column("generation", timeuuid_type, column_kind::clustering_key)
|
|
.with_column("status", utf8_type)
|
|
.with_column("state", utf8_type)
|
|
.with_column("version", utf8_type)
|
|
.with_column("format", utf8_type)
|
|
.set_comment("SSTables ownership table")
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::tablets() {
|
|
static thread_local auto schema = replica::make_tablets_schema();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::service_levels_v2() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, SERVICE_LEVELS_V2);
|
|
return schema_builder(NAME, SERVICE_LEVELS_V2, id)
|
|
.with_column("service_level", utf8_type, column_kind::partition_key)
|
|
.with_column("timeout", duration_type)
|
|
.with_column("workload_type", utf8_type)
|
|
.with_column("shares", int32_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::view_build_status_v2() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, VIEW_BUILD_STATUS_V2);
|
|
return schema_builder(NAME, VIEW_BUILD_STATUS_V2, id)
|
|
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
|
.with_column("view_name", utf8_type, column_kind::partition_key)
|
|
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
|
.with_column("status", utf8_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::roles() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, ROLES), NAME, ROLES,
|
|
// partition key
|
|
{{"role", utf8_type}},
|
|
// clustering key
|
|
{},
|
|
// regular columns
|
|
{
|
|
{"can_login", boolean_type},
|
|
{"is_superuser", boolean_type},
|
|
{"member_of", set_type_impl::get_instance(utf8_type, true)},
|
|
{"salted_hash", utf8_type}
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"roles for authentication and RBAC"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::role_members() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, ROLE_MEMBERS), NAME, ROLE_MEMBERS,
|
|
// partition key
|
|
{{"role", utf8_type}},
|
|
// clustering key
|
|
{{"member", utf8_type}},
|
|
// regular columns
|
|
{},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"joins users and their granted roles in RBAC"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::role_attributes() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, ROLE_ATTRIBUTES), NAME, ROLE_ATTRIBUTES,
|
|
// partition key
|
|
{{"role", utf8_type}},
|
|
// clustering key
|
|
{{"name", utf8_type}},
|
|
// regular columns
|
|
{
|
|
{"value", utf8_type}
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"role permissions in RBAC"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::role_permissions() {
|
|
static thread_local auto schema = [] {
|
|
schema_builder builder(generate_legacy_id(NAME, ROLE_PERMISSIONS), NAME, ROLE_PERMISSIONS,
|
|
// partition key
|
|
{{"role", utf8_type}},
|
|
// clustering key
|
|
{{"resource", utf8_type}},
|
|
// regular columns
|
|
{
|
|
{"permissions", set_type_impl::get_instance(utf8_type, true)}
|
|
},
|
|
// static columns
|
|
{},
|
|
// regular column name type
|
|
utf8_type,
|
|
// comment
|
|
"role permissions for CassandraAuthorizer"
|
|
);
|
|
builder.with_hash_version();
|
|
return builder.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::dicts() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, DICTS);
|
|
return schema_builder(NAME, DICTS, std::make_optional(id))
|
|
.with_column("name", utf8_type, column_kind::partition_key)
|
|
.with_column("timestamp", timestamp_type)
|
|
.with_column("origin", uuid_type)
|
|
.with_column("data", bytes_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::view_building_tasks() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, VIEW_BUILDING_TASKS);
|
|
return schema_builder(NAME, VIEW_BUILDING_TASKS, std::make_optional(id))
|
|
.with_column("key", utf8_type, column_kind::partition_key)
|
|
.with_column("id", timeuuid_type, column_kind::clustering_key)
|
|
.with_column("type", utf8_type)
|
|
.with_column("aborted", boolean_type)
|
|
.with_column("base_id", uuid_type)
|
|
.with_column("view_id", uuid_type)
|
|
.with_column("last_token", long_type)
|
|
.with_column("host_id", uuid_type)
|
|
.with_column("shard", int32_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
schema_ptr system_keyspace::client_routes() {
|
|
static thread_local auto schema = [] {
|
|
auto id = generate_legacy_id(NAME, CLIENT_ROUTES);
|
|
return schema_builder(NAME, CLIENT_ROUTES, std::make_optional(id))
|
|
.with_column("connection_id", utf8_type, column_kind::partition_key)
|
|
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
|
.with_column("address", utf8_type)
|
|
.with_column("port", int32_type)
|
|
.with_column("tls_port", int32_type)
|
|
.with_column("alternator_port", int32_type)
|
|
.with_column("alternator_https_port", int32_type)
|
|
.with_hash_version()
|
|
.build();
|
|
}();
|
|
return schema;
|
|
}
|
|
|
|
future<system_keyspace::local_info> system_keyspace::load_local_info() {
|
|
auto msg = co_await execute_cql(format("SELECT host_id, cluster_name, data_center, rack FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL));
|
|
|
|
local_info ret;
|
|
if (!msg->empty()) {
|
|
auto& row = msg->one();
|
|
if (row.has("host_id")) {
|
|
ret.host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
|
}
|
|
if (row.has("cluster_name")) {
|
|
ret.cluster_name = row.get_as<sstring>("cluster_name");
|
|
}
|
|
if (row.has("data_center")) {
|
|
ret.dc = row.get_as<sstring>("data_center");
|
|
}
|
|
if (row.has("rack")) {
|
|
ret.rack = row.get_as<sstring>("rack");
|
|
}
|
|
}
|
|
|
|
co_return ret;
|
|
}
|
|
|
|
future<> system_keyspace::save_local_info(local_info sysinfo, gms::inet_address broadcast_address, gms::inet_address broadcast_rpc_address) {
|
|
auto& cfg = _db.get_config();
|
|
sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
|
, db::system_keyspace::LOCAL);
|
|
|
|
return execute_cql(req, sstring(db::system_keyspace::LOCAL),
|
|
sysinfo.host_id.uuid(),
|
|
sysinfo.cluster_name,
|
|
version::release(),
|
|
cql3::query_processor::CQL_VERSION,
|
|
to_sstring(unsigned(cql_serialization_format::latest().protocol_version())),
|
|
sysinfo.dc,
|
|
sysinfo.rack,
|
|
sstring(cfg.partitioner()),
|
|
broadcast_rpc_address,
|
|
broadcast_address,
|
|
sysinfo.listen_address.addr()
|
|
).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::save_local_supported_features(const std::set<std::string_view>& feats) {
|
|
static const auto req = format("INSERT INTO system.{} (key, supported_features) VALUES (?, ?)", LOCAL);
|
|
return execute_cql(req,
|
|
sstring(db::system_keyspace::LOCAL),
|
|
fmt::to_string(fmt::join(feats, ","))).discard_result();
|
|
}
|
|
|
|
// The cache must be distributed, because the values themselves may not update atomically, so a shard reading that
|
|
// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all
|
|
// updates are propagated correctly.
|
|
struct local_cache {
|
|
system_keyspace::bootstrap_state _state;
|
|
};
|
|
|
|
future<> system_keyspace::peers_table_read_fixup() {
|
|
SCYLLA_ASSERT(this_shard_id() == 0);
|
|
if (_peers_table_read_fixup_done) {
|
|
co_return;
|
|
}
|
|
_peers_table_read_fixup_done = true;
|
|
|
|
const auto cql = format("SELECT peer, host_id, WRITETIME(host_id) as ts from system.{}", PEERS);
|
|
std::unordered_map<utils::UUID, std::pair<net::inet_address, int64_t>> map{};
|
|
const auto cql_result = co_await execute_cql(cql);
|
|
for (const auto& row : *cql_result) {
|
|
const auto peer = row.get_as<net::inet_address>("peer");
|
|
if (!row.has("host_id")) {
|
|
slogger.error("Peer {} has no host_id in system.{}, the record is broken, removing it",
|
|
peer, system_keyspace::PEERS);
|
|
co_await remove_endpoint(gms::inet_address{peer});
|
|
continue;
|
|
}
|
|
const auto host_id = row.get_as<utils::UUID>("host_id");
|
|
if (!host_id) {
|
|
slogger.error("Peer {} has null host_id in system.{}, the record is broken, removing it",
|
|
peer, system_keyspace::PEERS);
|
|
co_await remove_endpoint(gms::inet_address{peer});
|
|
continue;
|
|
}
|
|
const auto ts = row.get_as<int64_t>("ts");
|
|
const auto it = map.find(host_id);
|
|
if (it == map.end()) {
|
|
map.insert({host_id, {peer, ts}});
|
|
continue;
|
|
}
|
|
if (it->second.second >= ts) {
|
|
slogger.error("Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it",
|
|
peer, host_id, it->second.first, system_keyspace::PEERS);
|
|
co_await remove_endpoint(gms::inet_address{peer});
|
|
} else {
|
|
slogger.error("Peer {} with host_id {} has newer IP {} in system.{}, the record is stale, removing it",
|
|
it->second.first, host_id, peer, system_keyspace::PEERS);
|
|
co_await remove_endpoint(gms::inet_address{it->second.first});
|
|
it->second = {peer, ts};
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> system_keyspace::build_bootstrap_info() {
|
|
sstring req = format("SELECT bootstrapped FROM system.{} WHERE key = ? ", LOCAL);
|
|
return execute_cql(req, sstring(LOCAL)).then([this] (auto msg) {
|
|
static auto state_map = std::unordered_map<sstring, bootstrap_state>({
|
|
{ "NEEDS_BOOTSTRAP", bootstrap_state::NEEDS_BOOTSTRAP },
|
|
{ "COMPLETED", bootstrap_state::COMPLETED },
|
|
{ "IN_PROGRESS", bootstrap_state::IN_PROGRESS },
|
|
{ "DECOMMISSIONED", bootstrap_state::DECOMMISSIONED }
|
|
});
|
|
bootstrap_state state = bootstrap_state::NEEDS_BOOTSTRAP;
|
|
|
|
if (!msg->empty() && msg->one().has("bootstrapped")) {
|
|
state = state_map.at(msg->one().template get_as<sstring>("bootstrapped"));
|
|
}
|
|
return container().invoke_on_all([state] (auto& sys_ks) {
|
|
sys_ks._cache->_state = state;
|
|
});
|
|
});
|
|
}
|
|
|
|
}
|
|
|
|
namespace db {
|
|
|
|
// Read system.truncate table and cache last truncation time in `table` object for each table on every shard
|
|
future<std::unordered_map<table_id, db_clock::time_point>> system_keyspace::load_truncation_times() {
|
|
std::unordered_map<table_id, db_clock::time_point> result;
|
|
if (!_db.get_config().ignore_truncation_record.is_set()) {
|
|
sstring req = format("SELECT DISTINCT table_uuid, truncated_at from system.{}", TRUNCATED);
|
|
auto result_set = co_await execute_cql(req);
|
|
for (const auto& row: *result_set) {
|
|
const auto table_uuid = table_id(row.get_as<utils::UUID>("table_uuid"));
|
|
const auto ts = row.get_as<db_clock::time_point>("truncated_at");
|
|
result[table_uuid] = ts;
|
|
}
|
|
}
|
|
co_return result;
|
|
}
|
|
|
|
future<> system_keyspace::drop_truncation_rp_records() {
|
|
sstring req = format("SELECT table_uuid, shard, segment_id from system.{}", TRUNCATED);
|
|
auto rs = co_await execute_cql(req);
|
|
|
|
bool any = false;
|
|
std::unordered_set<table_id> to_delete;
|
|
auto db = _qp.db();
|
|
auto max_concurrency = std::min(1024u, smp::count * 8);
|
|
co_await seastar::max_concurrent_for_each(*rs, max_concurrency, [&] (const cql3::untyped_result_set_row& row) -> future<> {
|
|
auto table_uuid = table_id(row.get_as<utils::UUID>("table_uuid"));
|
|
if (!db.try_find_table(table_uuid)) {
|
|
to_delete.emplace(table_uuid);
|
|
co_return;
|
|
}
|
|
auto shard = row.get_as<int32_t>("shard");
|
|
auto segment_id = row.get_as<int64_t>("segment_id");
|
|
|
|
if (segment_id != 0) {
|
|
any = true;
|
|
sstring req = format("UPDATE system.{} SET segment_id = 0, position = 0 WHERE table_uuid = {} AND shard = {}", TRUNCATED, table_uuid, shard);
|
|
co_await execute_cql(req);
|
|
}
|
|
});
|
|
if (!to_delete.empty()) {
|
|
// IN has a limit to how many values we can put into it.
|
|
for (auto&& chunk : to_delete | std::views::transform(&table_id::to_sstring) | std::views::chunk(100)) {
|
|
auto str = std::ranges::to<std::string>(chunk | std::views::join_with(','));
|
|
auto req = format("DELETE FROM system.{} WHERE table_uuid IN ({})", TRUNCATED, str);
|
|
co_await execute_cql(req);
|
|
}
|
|
any = true;
|
|
}
|
|
if (any) {
|
|
co_await force_blocking_flush(TRUNCATED);
|
|
}
|
|
}
|
|
|
|
future<> system_keyspace::remove_truncation_records(table_id id) {
|
|
auto req = format("DELETE FROM system.{} WHERE table_uuid = {}", TRUNCATED, id);
|
|
co_await execute_cql(req);
|
|
co_await force_blocking_flush(TRUNCATED);
|
|
}
|
|
|
|
future<> system_keyspace::save_truncation_record(const replica::column_family& cf, db_clock::time_point truncated_at, db::replay_position rp) {
|
|
sstring req = format("INSERT INTO system.{} (table_uuid, shard, position, segment_id, truncated_at) VALUES(?,?,?,?,?)", TRUNCATED);
|
|
co_await _qp.execute_internal(req, {cf.schema()->id().uuid(), int32_t(rp.shard_id()), int32_t(rp.pos), int64_t(rp.base_id()), truncated_at}, cql3::query_processor::cache_internal::yes);
|
|
// Flush the table so that the value is available on boot before commitlog replay.
|
|
// Commit log replay depends on truncation records to determine the minimum replay position.
|
|
co_await force_blocking_flush(TRUNCATED);
|
|
}
|
|
|
|
future<replay_positions> system_keyspace::get_truncated_positions(table_id cf_id) {
|
|
replay_positions result;
|
|
if (_db.get_config().ignore_truncation_record.is_set()) {
|
|
co_return result;
|
|
}
|
|
const auto req = format("SELECT * from system.{} WHERE table_uuid = ?", TRUNCATED);
|
|
auto result_set = co_await execute_cql(req, {cf_id.uuid()});
|
|
result.reserve(result_set->size());
|
|
for (const auto& row: *result_set) {
|
|
result.emplace_back(row.get_as<int32_t>("shard"),
|
|
row.get_as<int64_t>("segment_id"),
|
|
row.get_as<int32_t>("position"));
|
|
}
|
|
co_return result;
|
|
}
|
|
|
|
future<> system_keyspace::drop_all_commitlog_cleanup_records() {
|
|
// In this function we want to clear the entire COMMITLOG_CLEANUPS table.
|
|
//
|
|
// We can't use TRUNCATE, since it's a system table. So we have to delete each partition.
|
|
//
|
|
// The partition key is the shard number. If we knew how many shards there were in
|
|
// the previous boot cycle, we could just issue DELETEs for 1..N.
|
|
//
|
|
// But we don't know that here, so we have to SELECT the set of partition keys,
|
|
// and issue DELETEs on that.
|
|
sstring req = format("SELECT shard from system.{}", COMMITLOG_CLEANUPS);
|
|
auto rs = co_await execute_cql(req);
|
|
|
|
co_await coroutine::parallel_for_each(*rs, [&] (const cql3::untyped_result_set_row& row) -> future<> {
|
|
auto shard = row.get_as<int32_t>("shard");
|
|
co_await execute_cql(format("DELETE FROM system.{} WHERE shard = {}", COMMITLOG_CLEANUPS, shard));
|
|
});
|
|
}
|
|
|
|
future<> system_keyspace::drop_old_commitlog_cleanup_records(replay_position min_position) {
|
|
auto pos = make_tuple_value(replay_position_type, tuple_type_impl::native_type({
|
|
int64_t(min_position.base_id()),
|
|
int32_t(min_position.pos)
|
|
}));
|
|
sstring req = format("DELETE FROM system.{} WHERE shard = ? AND position < ?", COMMITLOG_CLEANUPS);
|
|
co_await _qp.execute_internal(req, {int32_t(min_position.shard_id()), pos}, cql3::query_processor::cache_internal::yes);
|
|
}
|
|
|
|
future<> system_keyspace::save_commitlog_cleanup_record(table_id table, dht::token_range tr, db::replay_position rp) {
|
|
auto [start_token_exclusive, end_token_inclusive] = canonical_token_range(tr);
|
|
auto pos = make_tuple_value(replay_position_type, tuple_type_impl::native_type({int64_t(rp.base_id()), int32_t(rp.pos)}));
|
|
sstring req = format("INSERT INTO system.{} (shard, position, table_uuid, start_token_exclusive, end_token_inclusive) VALUES(?,?,?,?,?)", COMMITLOG_CLEANUPS);
|
|
co_await _qp.execute_internal(req, {int32_t(rp.shard_id()), pos, table.uuid(), start_token_exclusive, end_token_inclusive}, cql3::query_processor::cache_internal::yes);
|
|
}
|
|
|
|
std::pair<int64_t, int64_t> system_keyspace::canonical_token_range(dht::token_range tr) {
|
|
// closed_full_range represents a full interval using only regular token values. (No infinities).
|
|
auto closed_full_range = dht::token_range::make({dht::first_token()}, dht::token::from_int64(std::numeric_limits<int64_t>::max()));
|
|
// By intersecting with closed_full_range we get rid of all the crazy infinities that can be represented by dht::token_range.
|
|
auto finite_tr = tr.intersection(closed_full_range, dht::token_comparator());
|
|
if (!finite_tr) {
|
|
// If we got here, the interval was degenerate, with only infinities.
|
|
// So we return an empty (x, x] interval.
|
|
// We arbitrarily choose `min` as the `x`.
|
|
//
|
|
// Note: (x, x] is interpreted by the interval classes from `interval.hh` as the
|
|
// *full* (wrapping) interval, not an empty interval, so be careful about this if you ever
|
|
// want to implement a conversion from the output of this function back to `dht::token_range`.
|
|
// Nota bene, this `interval.hh` convention means that there is no way to represent an empty
|
|
// interval, so it is objectively bad.
|
|
//
|
|
// Note: (x, x] is interpreted by boost::icl as an empty interval, so it doesn't need any special
|
|
// treatment before use in `boost::icl::interval_map`.
|
|
return {std::numeric_limits<int64_t>::min(), std::numeric_limits<int64_t>::min()};
|
|
}
|
|
// After getting rid of possible infinities, we only have to adjust the openness of bounds.
|
|
int64_t start_token_exclusive = dht::token::to_int64(finite_tr->start().value().value());
|
|
if (finite_tr->start()->is_inclusive()) {
|
|
start_token_exclusive -= 1;
|
|
}
|
|
int64_t end_token_inclusive = dht::token::to_int64(finite_tr->end().value().value());
|
|
if (!finite_tr->end()->is_inclusive()) {
|
|
end_token_inclusive -= 1;
|
|
}
|
|
return {start_token_exclusive, end_token_inclusive};
|
|
}
|
|
|
|
size_t system_keyspace::commitlog_cleanup_map_hash::operator()(const std::pair<table_id, int32_t>& p) const {
|
|
size_t seed = 0;
|
|
boost::hash_combine(seed, std::hash<utils::UUID>()(p.first.uuid()));
|
|
boost::hash_combine(seed, std::hash<int32_t>()(p.second));
|
|
return seed;
|
|
}
|
|
|
|
struct system_keyspace::commitlog_cleanup_local_map::impl {
|
|
boost::icl::interval_map<
|
|
int64_t,
|
|
db::replay_position,
|
|
boost::icl::partial_absorber,
|
|
std::less,
|
|
boost::icl::inplace_max,
|
|
boost::icl::inter_section,
|
|
boost::icl::left_open_interval<int64_t>
|
|
> _map;
|
|
};
|
|
|
|
system_keyspace::commitlog_cleanup_local_map::~commitlog_cleanup_local_map() {
|
|
}
|
|
system_keyspace::commitlog_cleanup_local_map::commitlog_cleanup_local_map()
|
|
: _pimpl(std::make_unique<impl>())
|
|
{}
|
|
std::optional<db::replay_position> system_keyspace::commitlog_cleanup_local_map::get(int64_t token) const {
|
|
if (auto it = _pimpl->_map.find(token); it != _pimpl->_map.end()) {
|
|
return it->second;
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
future<system_keyspace::commitlog_cleanup_map> system_keyspace::get_commitlog_cleanup_records() {
|
|
commitlog_cleanup_map ret;
|
|
const auto req = format("SELECT * from system.{}", COMMITLOG_CLEANUPS);
|
|
auto result_set = co_await execute_cql(req);
|
|
for (const auto& row: *result_set) {
|
|
auto table = table_id(row.get_as<utils::UUID>("table_uuid"));
|
|
auto shard = row.get_as<int32_t>("shard");
|
|
auto start_token_exclusive = row.get_as<int64_t>("start_token_exclusive");
|
|
auto end_token_inclusive = row.get_as<int64_t>("end_token_inclusive");
|
|
auto pos_tuple = value_cast<tuple_type_impl::native_type>(replay_position_type->deserialize(row.get_view("position")));
|
|
auto rp = db::replay_position(
|
|
shard,
|
|
value_cast<int64_t>(pos_tuple[0]),
|
|
value_cast<int32_t>(pos_tuple[1])
|
|
);
|
|
auto& inner_map = ret.try_emplace(std::make_pair(table, shard)).first->second;
|
|
inner_map._pimpl->_map += std::make_pair(boost::icl::left_open_interval<int64_t>(start_token_exclusive, end_token_inclusive), rp);
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
static set_type_impl::native_type deserialize_set_column(const schema& s, const cql3::untyped_result_set_row& row, const char* name) {
|
|
auto blob = row.get_blob_unfragmented(name);
|
|
auto cdef = s.get_column_definition(name);
|
|
auto deserialized = cdef->type->deserialize(blob);
|
|
return value_cast<set_type_impl::native_type>(deserialized);
|
|
}
|
|
|
|
static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::token>& tokens) {
|
|
set_type_impl::native_type tset;
|
|
for (auto& t: tokens) {
|
|
tset.push_back(t.to_sstring());
|
|
}
|
|
return tset;
|
|
}
|
|
|
|
std::unordered_set<dht::token> decode_tokens(const set_type_impl::native_type& tokens) {
|
|
std::unordered_set<dht::token> tset;
|
|
for (auto& t: tokens) {
|
|
auto str = value_cast<sstring>(t);
|
|
if (str != dht::token::from_sstring(str).to_sstring()) {
|
|
on_internal_error(slogger, format("decode_tokens: invalid token string '{}'", str));
|
|
}
|
|
tset.insert(dht::token::from_sstring(str));
|
|
}
|
|
return tset;
|
|
}
|
|
|
|
static std::unordered_set<raft::server_id> decode_nodes_ids(const set_type_impl::native_type& nodes_ids) {
|
|
std::unordered_set<raft::server_id> ids_set;
|
|
for (auto& id: nodes_ids) {
|
|
auto uuid = value_cast<utils::UUID>(id);
|
|
ids_set.insert(raft::server_id{uuid});
|
|
}
|
|
return ids_set;
|
|
}
|
|
|
|
static cdc::generation_id decode_cdc_generation_id(const data_value& gen_id) {
|
|
auto native = value_cast<tuple_type_impl::native_type>(gen_id);
|
|
auto ts = value_cast<db_clock::time_point>(native[0]);
|
|
auto id = value_cast<utils::UUID>(native[1]);
|
|
return cdc::generation_id{ts, id};
|
|
}
|
|
|
|
static std::vector<cdc::generation_id> decode_cdc_generations_ids(const set_type_impl::native_type& gen_ids) {
|
|
std::vector<cdc::generation_id> gen_ids_list;
|
|
for (auto& gen_id: gen_ids) {
|
|
gen_ids_list.push_back(decode_cdc_generation_id(gen_id));
|
|
}
|
|
return gen_ids_list;
|
|
}
|
|
|
|
future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> system_keyspace::load_tokens() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const sstring req = format("SELECT peer, tokens FROM system.{}", PEERS);
|
|
std::unordered_map<gms::inet_address, std::unordered_set<dht::token>> ret;
|
|
const auto cql_result = co_await execute_cql(req);
|
|
for (const auto& row : *cql_result) {
|
|
if (row.has("tokens")) {
|
|
ret.emplace(gms::inet_address(row.get_as<net::inet_address>("peer")),
|
|
decode_tokens(deserialize_set_column(*peers(), row, "tokens")));
|
|
}
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unordered_map<gms::inet_address, locator::host_id>> system_keyspace::load_host_ids() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const sstring req = format("SELECT peer, host_id FROM system.{}", PEERS);
|
|
std::unordered_map<gms::inet_address, locator::host_id> ret;
|
|
const auto cql_result = co_await execute_cql(req);
|
|
for (const auto& row : *cql_result) {
|
|
ret.emplace(gms::inet_address(row.get_as<net::inet_address>("peer")),
|
|
locator::host_id(row.get_as<utils::UUID>("host_id")));
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unordered_map<locator::host_id, gms::loaded_endpoint_state>> system_keyspace::load_endpoint_state() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const auto msg = co_await execute_cql(format("SELECT peer, host_id, data_center, rack from system.{}", PEERS));
|
|
|
|
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> ret;
|
|
for (const auto& row : *msg) {
|
|
gms::loaded_endpoint_state st;
|
|
auto ep = row.get_as<net::inet_address>("peer");
|
|
if (!row.has("host_id")) {
|
|
// Must never happen after `peers_table_read_fixup` call above
|
|
on_internal_error_noexcept(slogger, format("load_endpoint_state: node {} has no host_id in system.{}", ep, PEERS));
|
|
}
|
|
auto host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
|
if (row.has("data_center") && row.has("rack")) {
|
|
st.opt_dc_rack.emplace(locator::endpoint_dc_rack {
|
|
row.get_as<sstring>("data_center"),
|
|
row.get_as<sstring>("rack")
|
|
});
|
|
if (st.opt_dc_rack->dc.empty() || st.opt_dc_rack->rack.empty()) {
|
|
slogger.error("load_endpoint_state: node {}/{} has empty dc={} or rack={}", host_id, ep, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
|
|
continue;
|
|
}
|
|
} else {
|
|
slogger.warn("Endpoint {} has no {} in system.{}", ep,
|
|
!row.has("data_center") && !row.has("rack") ? "data_center nor rack" : !row.has("data_center") ? "data_center" : "rack",
|
|
PEERS);
|
|
}
|
|
st.endpoint = ep;
|
|
ret.emplace(host_id, std::move(st));
|
|
}
|
|
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::vector<gms::inet_address>> system_keyspace::load_peers() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const auto res = co_await execute_cql(format("SELECT peer, rpc_address FROM system.{}", PEERS));
|
|
SCYLLA_ASSERT(res);
|
|
|
|
std::vector<gms::inet_address> ret;
|
|
for (const auto& row: *res) {
|
|
if (!row.has("rpc_address")) {
|
|
// In the Raft-based topology, we store the Host ID -> IP mapping
|
|
// of joining nodes in PEERS. We want to ignore such rows. To achieve
|
|
// it, we check the presence of rpc_address, but we could choose any
|
|
// column other than host_id and tokens (rows with no tokens can
|
|
// correspond to zero-token nodes).
|
|
continue;
|
|
}
|
|
ret.emplace_back(gms::inet_address(row.get_as<net::inet_address>("peer")));
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::vector<locator::host_id>> system_keyspace::load_peers_ids() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const auto res = co_await execute_cql(format("SELECT rpc_address, host_id FROM system.{}", PEERS));
|
|
SCYLLA_ASSERT(res);
|
|
|
|
std::vector<locator::host_id> ret;
|
|
for (const auto& row: *res) {
|
|
if (!row.has("rpc_address")) {
|
|
// In the Raft-based topology, we store the Host ID -> IP mapping
|
|
// of joining nodes in PEERS. We want to ignore such rows. To achieve
|
|
// it, we check the presence of rpc_address, but we could choose any
|
|
// column other than host_id and tokens (rows with no tokens can
|
|
// correspond to zero-token nodes).
|
|
continue;
|
|
}
|
|
ret.emplace_back(locator::host_id(row.get_as<utils::UUID>("host_id")));
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unordered_map<locator::host_id, sstring>> system_keyspace::load_peer_features() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const sstring req = format("SELECT host_id, supported_features FROM system.{}", PEERS);
|
|
std::unordered_map<locator::host_id, sstring> ret;
|
|
const auto cql_result = co_await execute_cql(req);
|
|
for (const auto& row : *cql_result) {
|
|
if (row.has("supported_features")) {
|
|
ret.emplace(locator::host_id(row.get_as<utils::UUID>("host_id")),
|
|
row.get_as<sstring>("supported_features"));
|
|
}
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unordered_map<gms::inet_address, gms::inet_address>> system_keyspace::get_preferred_ips() {
|
|
co_await peers_table_read_fixup();
|
|
|
|
const sstring req = format("SELECT peer, preferred_ip FROM system.{}", PEERS);
|
|
std::unordered_map<gms::inet_address, gms::inet_address> res;
|
|
|
|
const auto cql_result = co_await execute_cql(req);
|
|
for (const auto& r : *cql_result) {
|
|
if (r.has("preferred_ip")) {
|
|
res.emplace(gms::inet_address(r.get_as<net::inet_address>("peer")),
|
|
gms::inet_address(r.get_as<net::inet_address>("preferred_ip")));
|
|
}
|
|
}
|
|
|
|
co_return res;
|
|
}
|
|
|
|
namespace {
|
|
template <typename T>
|
|
static data_value_or_unset make_data_value_or_unset(const std::optional<T>& opt) {
|
|
if (opt) {
|
|
return data_value(*opt);
|
|
} else {
|
|
return unset_value{};
|
|
}
|
|
};
|
|
|
|
static data_value_or_unset make_data_value_or_unset(const std::optional<std::unordered_set<dht::token>>& opt) {
|
|
if (opt) {
|
|
auto set_type = set_type_impl::get_instance(utf8_type, true);
|
|
return make_set_value(set_type, prepare_tokens(*opt));
|
|
} else {
|
|
return unset_value{};
|
|
}
|
|
};
|
|
}
|
|
|
|
future<> system_keyspace::update_peer_info(gms::inet_address ep, locator::host_id hid, const peer_info& info) {
|
|
if (ep == gms::inet_address{}) {
|
|
on_internal_error(slogger, format("update_peer_info called with empty inet_address, host_id {}", hid));
|
|
}
|
|
if (!hid) {
|
|
on_internal_error(slogger, format("update_peer_info called with empty host_id, ep {}", ep));
|
|
}
|
|
if (_db.get_token_metadata().get_topology().is_me(hid)) {
|
|
on_internal_error(slogger, format("update_peer_info called for this node: {}", ep));
|
|
}
|
|
|
|
data_value_list values = {
|
|
data_value_or_unset(data_value(ep.addr())),
|
|
make_data_value_or_unset(info.data_center),
|
|
data_value_or_unset(hid.id),
|
|
make_data_value_or_unset(info.preferred_ip),
|
|
make_data_value_or_unset(info.rack),
|
|
make_data_value_or_unset(info.release_version),
|
|
make_data_value_or_unset(info.rpc_address),
|
|
make_data_value_or_unset(info.schema_version),
|
|
make_data_value_or_unset(info.tokens),
|
|
make_data_value_or_unset(info.supported_features),
|
|
};
|
|
|
|
auto query = fmt::format("INSERT INTO system.{} "
|
|
"(peer,data_center,host_id,preferred_ip,rack,release_version,rpc_address,schema_version,tokens,supported_features) VALUES"
|
|
"(?,?,?,?,?,?,?,?,?,?)", PEERS);
|
|
|
|
slogger.debug("{}: values={}", query, values);
|
|
|
|
const auto guard = co_await get_units(_peers_cache_lock, 1);
|
|
try {
|
|
co_await _qp.execute_internal(query, db::consistency_level::ONE, values, cql3::query_processor::cache_internal::yes);
|
|
if (auto* cache = get_peers_cache()) {
|
|
cache->host_id_to_inet_ip[hid] = ep;
|
|
cache->inet_ip_to_host_id[ep] = hid;
|
|
}
|
|
} catch (...) {
|
|
_peers_cache = nullptr;
|
|
throw;
|
|
}
|
|
}
|
|
|
|
system_keyspace::peers_cache* system_keyspace::get_peers_cache() {
|
|
auto* cache = _peers_cache.get();
|
|
if (cache && (lowres_clock::now() > cache->expiration_time)) {
|
|
_peers_cache = nullptr;
|
|
return nullptr;
|
|
}
|
|
return cache;
|
|
}
|
|
|
|
future<lw_shared_ptr<const system_keyspace::peers_cache>> system_keyspace::get_or_load_peers_cache() {
|
|
const auto guard = co_await get_units(_peers_cache_lock, 1);
|
|
if (auto* cache = get_peers_cache()) {
|
|
co_return cache->shared_from_this();
|
|
}
|
|
auto cache = make_lw_shared<peers_cache>();
|
|
cache->inet_ip_to_host_id = co_await load_host_ids();
|
|
cache->host_id_to_inet_ip.reserve(cache->inet_ip_to_host_id.size());
|
|
for (const auto [ip, id]: cache->inet_ip_to_host_id) {
|
|
const auto [it, inserted] = cache->host_id_to_inet_ip.insert({id, ip});
|
|
if (!inserted) {
|
|
on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}",
|
|
id, it->second, ip));
|
|
}
|
|
}
|
|
cache->expiration_time = lowres_clock::now() + std::chrono::milliseconds(200);
|
|
_peers_cache = cache;
|
|
co_return std::move(cache);
|
|
}
|
|
|
|
future<std::optional<gms::inet_address>> system_keyspace::get_ip_from_peers_table(locator::host_id id) {
|
|
const auto cache = co_await get_or_load_peers_cache();
|
|
if (const auto it = cache->host_id_to_inet_ip.find(id); it != cache->host_id_to_inet_ip.end()) {
|
|
co_return it->second;
|
|
}
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
future<system_keyspace::host_id_to_ip_map_t> system_keyspace::get_host_id_to_ip_map() {
|
|
const auto cache = co_await get_or_load_peers_cache();
|
|
co_return cache->host_id_to_inet_ip;
|
|
}
|
|
|
|
template <typename T>
|
|
future<> system_keyspace::set_scylla_local_param_as(const sstring& key, const T& value, bool visible_before_cl_replay) {
|
|
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", system_keyspace::SCYLLA_LOCAL);
|
|
auto type = data_type_for<T>();
|
|
co_await execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result();
|
|
if (visible_before_cl_replay) {
|
|
co_await force_blocking_flush(SCYLLA_LOCAL);
|
|
}
|
|
}
|
|
|
|
template <typename T>
|
|
future<std::optional<T>> system_keyspace::get_scylla_local_param_as(const sstring& key) {
|
|
sstring req = format("SELECT value FROM system.{} WHERE key = ?", system_keyspace::SCYLLA_LOCAL);
|
|
return execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res)
|
|
-> future<std::optional<T>> {
|
|
if (res->empty() || !res->one().has("value")) {
|
|
return make_ready_future<std::optional<T>>(std::optional<T>());
|
|
}
|
|
auto type = data_type_for<T>();
|
|
return make_ready_future<std::optional<T>>(value_cast<T>(type->deserialize(
|
|
type->from_string(res->one().get_as<sstring>("value")))));
|
|
});
|
|
}
|
|
|
|
template
|
|
future<std::optional<utils::UUID>>
|
|
system_keyspace::get_scylla_local_param_as<utils::UUID>(const sstring& key);
|
|
|
|
future<> system_keyspace::set_scylla_local_param(const sstring& key, const sstring& value, bool visible_before_cl_replay) {
|
|
return set_scylla_local_param_as<sstring>(key, value, visible_before_cl_replay);
|
|
}
|
|
|
|
future<std::optional<sstring>> system_keyspace::get_scylla_local_param(const sstring& key){
|
|
return get_scylla_local_param_as<sstring>(key);
|
|
}
|
|
|
|
future<> system_keyspace::update_schema_version(table_schema_version version) {
|
|
sstring req = format("INSERT INTO system.{} (key, schema_version) VALUES (?, ?)", LOCAL);
|
|
return execute_cql(req, sstring(LOCAL), version.uuid()).discard_result();
|
|
}
|
|
|
|
/**
|
|
* Remove stored tokens being used by another node
|
|
*/
|
|
future<> system_keyspace::remove_endpoint(gms::inet_address ep) {
|
|
const sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS);
|
|
slogger.debug("DELETE FROM system.{} WHERE peer = {}", PEERS, ep);
|
|
|
|
const auto guard = co_await get_units(_peers_cache_lock, 1);
|
|
try {
|
|
co_await execute_cql(req, ep.addr()).discard_result();
|
|
if (auto* cache = get_peers_cache()) {
|
|
const auto it = cache->inet_ip_to_host_id.find(ep);
|
|
if (it != cache->inet_ip_to_host_id.end()) {
|
|
const auto id = it->second;
|
|
cache->inet_ip_to_host_id.erase(it);
|
|
cache->host_id_to_inet_ip.erase(id);
|
|
}
|
|
}
|
|
} catch (...) {
|
|
_peers_cache = nullptr;
|
|
throw;
|
|
}
|
|
}
|
|
|
|
future<> system_keyspace::update_tokens(const std::unordered_set<dht::token>& tokens) {
|
|
sstring req = format("INSERT INTO system.{} (key, tokens) VALUES (?, ?)", LOCAL);
|
|
auto set_type = set_type_impl::get_instance(utf8_type, true);
|
|
co_await execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens)));
|
|
}
|
|
|
|
future<> system_keyspace::force_blocking_flush(sstring cfname) {
|
|
return container().invoke_on_all([cfname = std::move(cfname)] (db::system_keyspace& sys_ks) {
|
|
// if (!Boolean.getBoolean("cassandra.unsafesystem"))
|
|
return sys_ks._db.flush(NAME, cfname);
|
|
});
|
|
}
|
|
|
|
future<std::unordered_set<dht::token>> system_keyspace::get_saved_tokens() {
|
|
sstring req = format("SELECT tokens FROM system.{} WHERE key = ?", LOCAL);
|
|
return execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
|
|
if (msg->empty() || !msg->one().has("tokens")) {
|
|
return make_ready_future<std::unordered_set<dht::token>>();
|
|
}
|
|
|
|
auto decoded_tokens = decode_tokens(deserialize_set_column(*local(), msg->one(), "tokens"));
|
|
return make_ready_future<std::unordered_set<dht::token>>(std::move(decoded_tokens));
|
|
});
|
|
}
|
|
|
|
future<std::unordered_set<dht::token>> system_keyspace::get_local_tokens() {
|
|
return get_saved_tokens().then([] (auto&& tokens) {
|
|
if (tokens.empty()) {
|
|
auto err = format("get_local_tokens: tokens is empty");
|
|
slogger.error("{}", err);
|
|
throw std::runtime_error(err);
|
|
}
|
|
return std::move(tokens);
|
|
});
|
|
}
|
|
|
|
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
|
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
|
|
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
|
|
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
|
|
|
|
struct cur_t {
|
|
table_id tid;
|
|
db_clock::time_point ts;
|
|
utils::chunked_vector<cdc::stream_id> streams;
|
|
};
|
|
std::optional<cur_t> cur;
|
|
|
|
co_await _qp.query_internal(table ? single_table_query : all_tables_query,
|
|
db::consistency_level::ONE,
|
|
table ? data_value_list{table->uuid()} : data_value_list{},
|
|
1000,
|
|
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
|
auto tid = table_id(row.get_as<utils::UUID>("table_id"));
|
|
auto ts = row.get_as<db_clock::time_point>("timestamp");
|
|
auto stream_id = cdc::stream_id(row.get_as<bytes>("stream_id"));
|
|
|
|
if (!cur || tid != cur->tid || ts != cur->ts) {
|
|
if (cur) {
|
|
co_await f(cur->tid, cur->ts, std::move(cur->streams));
|
|
}
|
|
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
|
|
}
|
|
cur->streams.push_back(std::move(stream_id));
|
|
|
|
co_return stop_iteration::no;
|
|
});
|
|
|
|
if (cur) {
|
|
co_await f(cur->tid, cur->ts, std::move(cur->streams));
|
|
}
|
|
}
|
|
|
|
future<> system_keyspace::read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from,
|
|
noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f) {
|
|
static const sstring query_all = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY);
|
|
static const sstring query_from = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ? AND timestamp > ?", NAME, CDC_STREAMS_HISTORY);
|
|
|
|
struct cur_t {
|
|
table_id tid;
|
|
db_clock::time_point ts;
|
|
cdc::cdc_stream_diff diff;
|
|
};
|
|
std::optional<cur_t> cur;
|
|
|
|
co_await _qp.query_internal(from ? query_from : query_all,
|
|
db::consistency_level::ONE,
|
|
from ? data_value_list{table.uuid(), *from} : data_value_list{table.uuid()},
|
|
1000,
|
|
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
|
auto tid = table_id(row.get_as<utils::UUID>("table_id"));
|
|
auto ts = row.get_as<db_clock::time_point>("timestamp");
|
|
auto stream_state = cdc::read_stream_state(row.get_as<int8_t>("stream_state"));
|
|
auto stream_id = cdc::stream_id(row.get_as<bytes>("stream_id"));
|
|
|
|
if (!cur || tid != cur->tid || ts != cur->ts) {
|
|
if (cur) {
|
|
co_await f(cur->tid, cur->ts, std::move(cur->diff));
|
|
}
|
|
cur = { tid, ts, cdc::cdc_stream_diff() };
|
|
}
|
|
|
|
if (stream_state == cdc::stream_state::closed) {
|
|
cur->diff.closed_streams.push_back(std::move(stream_id));
|
|
} else if (stream_state == cdc::stream_state::opened) {
|
|
cur->diff.opened_streams.push_back(std::move(stream_id));
|
|
} else {
|
|
on_internal_error(slogger, fmt::format("unexpected CDC stream state {} in {}.{} for table {}",
|
|
std::to_underlying(stream_state), NAME, CDC_STREAMS_HISTORY, table));
|
|
}
|
|
|
|
co_return stop_iteration::no;
|
|
});
|
|
|
|
if (cur) {
|
|
co_await f(cur->tid, cur->ts, std::move(cur->diff));
|
|
}
|
|
}
|
|
|
|
bool system_keyspace::bootstrap_needed() const {
|
|
return get_bootstrap_state() == bootstrap_state::NEEDS_BOOTSTRAP;
|
|
}
|
|
|
|
bool system_keyspace::bootstrap_complete() const {
|
|
return get_bootstrap_state() == bootstrap_state::COMPLETED;
|
|
}
|
|
|
|
bool system_keyspace::bootstrap_in_progress() const {
|
|
return get_bootstrap_state() == bootstrap_state::IN_PROGRESS;
|
|
}
|
|
|
|
bool system_keyspace::was_decommissioned() const {
|
|
return get_bootstrap_state() == bootstrap_state::DECOMMISSIONED;
|
|
}
|
|
|
|
system_keyspace::bootstrap_state system_keyspace::get_bootstrap_state() const {
|
|
return _cache->_state;
|
|
}
|
|
|
|
future<> system_keyspace::set_bootstrap_state(bootstrap_state state) {
|
|
static std::unordered_map<bootstrap_state, sstring, enum_hash<bootstrap_state>> state_to_name({
|
|
{ bootstrap_state::NEEDS_BOOTSTRAP, "NEEDS_BOOTSTRAP" },
|
|
{ bootstrap_state::COMPLETED, "COMPLETED" },
|
|
{ bootstrap_state::IN_PROGRESS, "IN_PROGRESS" },
|
|
{ bootstrap_state::DECOMMISSIONED, "DECOMMISSIONED" }
|
|
});
|
|
|
|
sstring state_name = state_to_name.at(state);
|
|
|
|
sstring req = format("INSERT INTO system.{} (key, bootstrapped) VALUES (?, ?)", LOCAL);
|
|
co_await execute_cql(req, sstring(LOCAL), state_name).discard_result();
|
|
co_await container().invoke_on_all([state] (auto& sys_ks) {
|
|
sys_ks._cache->_state = state;
|
|
});
|
|
}
|
|
|
|
std::vector<schema_ptr> system_keyspace::auth_tables() {
|
|
return {roles(), role_members(), role_attributes(), role_permissions()};
|
|
}
|
|
|
|
std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
|
std::vector<schema_ptr> r;
|
|
auto schema_tables = db::schema_tables::all_tables(schema_features::full());
|
|
std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r));
|
|
auto auth_tables = system_keyspace::auth_tables();
|
|
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
|
|
r.insert(r.end(), { built_indexes(), hints(), batchlog(), batchlog_v2(), paxos(), local(),
|
|
peers(), peer_events(), range_xfers(),
|
|
compactions_in_progress(), compaction_history(),
|
|
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
|
|
corrupt_data(),
|
|
scylla_local(), db::schema_tables::scylla_table_schema_history(),
|
|
repair_history(),
|
|
repair_tasks(),
|
|
views_builds_in_progress(), built_views(),
|
|
scylla_views_builds_in_progress(),
|
|
truncated(),
|
|
commitlog_cleanups(),
|
|
cdc_local(),
|
|
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
|
|
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
|
|
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
|
|
});
|
|
|
|
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
|
r.insert(r.end(), {broadcast_kv_store()});
|
|
}
|
|
|
|
r.insert(r.end(), {tablets()});
|
|
|
|
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
|
r.insert(r.end(), {sstables_registry()});
|
|
}
|
|
|
|
if (cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
|
|
r.insert(r.end(), {raft_groups(), raft_groups_snapshots(), raft_groups_snapshot_config()});
|
|
}
|
|
|
|
return r;
|
|
}
|
|
|
|
static bool maybe_write_in_user_memory(schema_ptr s, replica::database& db) {
|
|
bool strongly_consistent = db.get_config().check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES);
|
|
return (s.get() == system_keyspace::batchlog().get())
|
|
|| (s.get() == system_keyspace::batchlog_v2().get())
|
|
|| (s.get() == system_keyspace::paxos().get())
|
|
|| s == system_keyspace::scylla_views_builds_in_progress()
|
|
|| (strongly_consistent && s == system_keyspace::raft_groups())
|
|
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshots())
|
|
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshot_config());
|
|
}
|
|
|
|
future<> system_keyspace::make(
|
|
locator::effective_replication_map_factory& erm_factory,
|
|
replica::database& db) {
|
|
for (auto&& table : system_keyspace::all_tables(db.get_config())) {
|
|
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table, db), erm_factory);
|
|
co_await db.find_column_family(table).init_storage();
|
|
}
|
|
|
|
replica::tablet_add_repair_scheduler_user_types(NAME, db);
|
|
db.find_keyspace(NAME).add_user_type(sstableinfo_type);
|
|
}
|
|
|
|
void system_keyspace::mark_writable() {
|
|
for (auto&& table : system_keyspace::all_tables(_db.get_config())) {
|
|
_db.find_column_family(table).mark_ready_for_writes(_db.commitlog_for(table));
|
|
}
|
|
}
|
|
|
|
static service::query_state& internal_system_query_state() {
|
|
using namespace std::chrono_literals;
|
|
const auto t = 10s;
|
|
static timeout_config tc{ t, t, t, t, t, t, t };
|
|
static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
|
|
static thread_local service::query_state qs(cs, empty_service_permit());
|
|
return qs;
|
|
};
|
|
|
|
static future<std::optional<mutation>> get_scylla_local_mutation(replica::database& db, std::string_view key) {
|
|
auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
|
|
partition_key pk = partition_key::from_singular(*s, key);
|
|
dht::partition_range pr = dht::partition_range::make_singular(dht::decorate_key(*s, pk));
|
|
|
|
auto rs = co_await replica::query_mutations(db.container(), s, pr, s->full_slice(), db::no_timeout);
|
|
if (!rs) {
|
|
on_internal_error(slogger, "get_scylla_local_mutation(): no result from querying mutations");
|
|
}
|
|
auto& ps = rs->partitions();
|
|
for (auto& p: ps) {
|
|
auto mut = p.mut().unfreeze(s);
|
|
co_return std::move(mut);
|
|
}
|
|
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
|
system_keyspace::query_mutations(sharded<replica::database>& db, schema_ptr schema) {
|
|
return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout);
|
|
}
|
|
|
|
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
|
system_keyspace::query_mutations(sharded<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
|
|
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
|
|
return query_mutations(db, schema);
|
|
}
|
|
|
|
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
|
system_keyspace::query_mutations(sharded<replica::database>& db, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) {
|
|
auto schema = db.local().find_schema(ks_name, cf_name);
|
|
auto slice_ptr = std::make_unique<query::partition_slice>(partition_slice_builder(*schema)
|
|
.with_range(std::move(row_range))
|
|
.build());
|
|
return replica::query_mutations(db, std::move(schema), partition_range, *slice_ptr, db::no_timeout).finally([slice_ptr = std::move(slice_ptr)] { });
|
|
}
|
|
|
|
future<lw_shared_ptr<query::result_set>>
|
|
system_keyspace::query(sharded<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
|
|
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
|
|
return replica::query_data(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout).then([schema] (auto&& qr) {
|
|
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, schema->full_slice(), *qr));
|
|
});
|
|
}
|
|
|
|
future<lw_shared_ptr<query::result_set>>
|
|
system_keyspace::query(sharded<replica::database>& db, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range)
|
|
{
|
|
auto schema = db.local().find_schema(ks_name, cf_name);
|
|
auto pr_ptr = std::make_unique<dht::partition_range>(dht::partition_range::make_singular(key));
|
|
auto slice_ptr = std::make_unique<query::partition_slice>(partition_slice_builder(*schema)
|
|
.with_range(std::move(row_range))
|
|
.build());
|
|
return replica::query_data(db, schema, *pr_ptr, *slice_ptr, db::no_timeout).then(
|
|
[schema, pr_ptr = std::move(pr_ptr), slice_ptr = std::move(slice_ptr)] (auto&& qr) {
|
|
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, schema->full_slice(), *qr));
|
|
});
|
|
}
|
|
|
|
static list_type_impl::native_type prepare_sstables(const std::vector<sstables::basic_info>& sstables) {
|
|
list_type_impl::native_type tmp;
|
|
for (auto& info : sstables) {
|
|
auto element = make_user_value(sstableinfo_type, {data_value(info.generation), data_value(info.origin), data_value(info.size)});
|
|
tmp.push_back(std::move(element));
|
|
}
|
|
return tmp;
|
|
}
|
|
|
|
static std::vector<sstables::basic_info> restore_sstables(const std::vector<user_type_impl::native_type>& sstables) {
|
|
std::vector<sstables::basic_info> tmp;
|
|
tmp.reserve(sstables.size());
|
|
|
|
for (auto& data : sstables) {
|
|
tmp.emplace_back(sstables::generation_type(value_cast<utils::UUID>(data[0])), value_cast<sstring>(data[1]), value_cast<int64_t>(data[2]));
|
|
}
|
|
|
|
return tmp;
|
|
}
|
|
|
|
static map_type_impl::native_type prepare_rows_merged(std::unordered_map<int32_t, int64_t>& rows_merged) {
|
|
map_type_impl::native_type tmp;
|
|
for (auto& r: rows_merged) {
|
|
int32_t first = r.first;
|
|
int64_t second = r.second;
|
|
auto map_element = std::make_pair<data_value, data_value>(data_value(first), data_value(second));
|
|
tmp.push_back(std::move(map_element));
|
|
}
|
|
return tmp;
|
|
}
|
|
|
|
future<> system_keyspace::update_compaction_history(compaction_history_entry entry)
|
|
{
|
|
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
|
|
if (entry.ks == "system" && entry.cf == COMPACTION_HISTORY) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
auto map_type = map_type_impl::get_instance(int32_type, long_type, true);
|
|
auto list_type = list_type_impl::get_instance(sstableinfo_type, false);
|
|
db_clock::time_point compacted_at{db_clock::duration{entry.compacted_at}};
|
|
db_clock::time_point started_at{db_clock::duration{entry.started_at}};
|
|
|
|
std::function<future<::shared_ptr<cql3::untyped_result_set>>()> execute;
|
|
if (local_db().features().compaction_history_upgrade) {
|
|
static constexpr auto reqest_template = "INSERT INTO system.{} ( \
|
|
id, shard_id, keyspace_name, columnfamily_name, started_at, compacted_at, compaction_type, bytes_in, bytes_out, rows_merged, \
|
|
sstables_in, sstables_out, total_tombstone_purge_attempt, total_tombstone_purge_failure_due_to_overlapping_with_memtable, \
|
|
total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable \
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
|
sstring request = format(reqest_template, COMPACTION_HISTORY);
|
|
execute = [&, request=std::move(request)]() {
|
|
return execute_cql(request, entry.id, int32_t(entry.shard_id), entry.ks, entry.cf, started_at, compacted_at, entry.compaction_type,
|
|
entry.bytes_in, entry.bytes_out, make_map_value(map_type, prepare_rows_merged(entry.rows_merged)),
|
|
make_list_value(list_type, prepare_sstables(entry.sstables_in)), make_list_value(list_type, prepare_sstables(entry.sstables_out)),
|
|
entry.total_tombstone_purge_attempt, entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable,
|
|
entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable);
|
|
};
|
|
} else {
|
|
static constexpr auto reqest_template = "INSERT INTO system.{} ( \
|
|
id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
|
sstring request = format(reqest_template, COMPACTION_HISTORY);
|
|
execute = [&, request=std::move(request)]() {
|
|
return execute_cql(request, entry.id, entry.ks, entry.cf, compacted_at, entry.bytes_in, entry.bytes_out,
|
|
make_map_value(map_type, prepare_rows_merged(entry.rows_merged)));
|
|
};
|
|
}
|
|
|
|
return execute().discard_result().handle_exception([] (auto ep) {
|
|
slogger.error("update compaction history failed: {}: ignored", ep);
|
|
});
|
|
}
|
|
|
|
future<> system_keyspace::get_compaction_history(compaction_history_consumer consumer) {
|
|
sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY);
|
|
co_await _qp.query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
|
compaction_history_entry entry;
|
|
entry.id = row.get_as<utils::UUID>("id");
|
|
entry.shard_id = row.get_or<int32_t>("shard_id", 0);
|
|
entry.ks = row.get_as<sstring>("keyspace_name");
|
|
entry.cf = row.get_as<sstring>("columnfamily_name");
|
|
entry.compaction_type = row.get_or<sstring>("compaction_type", "");
|
|
entry.started_at = row.get_or<int64_t>("started_at", 0);
|
|
entry.compacted_at = row.get_as<int64_t>("compacted_at");
|
|
entry.bytes_in = row.get_as<int64_t>("bytes_in");
|
|
entry.bytes_out = row.get_as<int64_t>("bytes_out");
|
|
if (row.has("rows_merged")) {
|
|
entry.rows_merged = row.get_map<int32_t, int64_t>("rows_merged");
|
|
}
|
|
if (row.has("sstables_in")) {
|
|
entry.sstables_in = restore_sstables(row.get_list<user_type_impl::native_type>("sstables_in", sstableinfo_type));
|
|
}
|
|
if (row.has("sstables_out")) {
|
|
entry.sstables_out = restore_sstables(row.get_list<user_type_impl::native_type>("sstables_out", sstableinfo_type));
|
|
}
|
|
entry.total_tombstone_purge_attempt = row.get_or<int64_t>("total_tombstone_purge_attempt", 0);
|
|
entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable = row.get_or<int64_t>("total_tombstone_purge_failure_due_to_overlapping_with_memtable", 0);
|
|
entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable = row.get_or<int64_t>("total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable", 0);
|
|
|
|
co_await consumer(std::move(entry));
|
|
co_return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<> system_keyspace::update_repair_history(repair_history_entry entry) {
|
|
sstring req = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", REPAIR_HISTORY);
|
|
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id.uuid(), entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) {
|
|
sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id);
|
|
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
|
repair_history_entry ent;
|
|
ent.id = tasks::task_id(row.get_as<utils::UUID>("repair_uuid"));
|
|
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
|
ent.range_start = row.get_as<int64_t>("range_start");
|
|
ent.range_end = row.get_as<int64_t>("range_end");
|
|
ent.ks = row.get_as<sstring>("keyspace_name");
|
|
ent.cf = row.get_as<sstring>("table_name");
|
|
ent.ts = row.get_as<db_clock::time_point>("repair_time");
|
|
co_await f(std::move(ent));
|
|
co_return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
|
|
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
|
|
constexpr int ttl = 10 * 24 * 3600;
|
|
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
|
|
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
|
|
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
|
|
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
|
|
utils::chunked_vector<canonical_mutation> cmuts(muts.begin(), muts.end());
|
|
co_return cmuts;
|
|
}
|
|
|
|
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
|
|
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
|
|
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
|
repair_task_entry ent;
|
|
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
|
|
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
|
|
ent.first_token = row.get_as<int64_t>("first_token");
|
|
ent.last_token = row.get_as<int64_t>("last_token");
|
|
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
|
|
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
|
co_await f(std::move(ent));
|
|
co_return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
|
|
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
|
|
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
|
|
gms::generation_type generation;
|
|
if (rs->empty() || !rs->one().has("gossip_generation")) {
|
|
// seconds-since-epoch isn't a foolproof new generation
|
|
// (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
|
|
// but it's as close as sanely possible
|
|
generation = gms::get_generation_number();
|
|
} else {
|
|
// Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
|
|
auto stored_generation = gms::generation_type(rs->one().template get_as<int>("gossip_generation") + 1);
|
|
auto now = gms::get_generation_number();
|
|
if (stored_generation >= now) {
|
|
slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}."
|
|
"See CASSANDRA-3654 if you experience problems", stored_generation, now);
|
|
generation = stored_generation;
|
|
} else {
|
|
generation = now;
|
|
}
|
|
}
|
|
req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL);
|
|
co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes);
|
|
co_return generation;
|
|
}
|
|
|
|
mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::vector<system_keyspace::range_estimates> estimates) {
|
|
auto&& schema = db::system_keyspace::size_estimates();
|
|
auto timestamp = api::new_timestamp();
|
|
mutation m_to_apply{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks))};
|
|
|
|
for (auto&& e : estimates) {
|
|
auto ck = clustering_key_prefix(std::vector<bytes>{
|
|
utf8_type->decompose(e.schema->cf_name()), e.range_start_token, e.range_end_token});
|
|
|
|
m_to_apply.set_clustered_cell(ck, "mean_partition_size", e.mean_partition_size, timestamp);
|
|
m_to_apply.set_clustered_cell(ck, "partitions_count", e.partitions_count, timestamp);
|
|
}
|
|
|
|
return m_to_apply;
|
|
}
|
|
|
|
future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
|
|
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
|
|
SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
|
return execute_cql(
|
|
std::move(req),
|
|
std::move(ks_name),
|
|
std::move(view_name),
|
|
0,
|
|
int32_t(this_shard_id()),
|
|
token.to_sstring()).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::register_view_for_building_for_all_shards(sstring ks_name, sstring view_name, const dht::token& token) {
|
|
// registers this_shard_id() with the given token and inserts an empty status for all other shards.
|
|
// this is used to register all shards atomically and ensure all shards have a status, even if we crash
|
|
// before all shards are registered.
|
|
// if another shard has already registered, this won't overwrite its status. if it hasn't registered, we insert
|
|
// a status with first_token=null and next_token=null, indicating it hasn't made progress.
|
|
auto&& schema = db::system_keyspace::scylla_views_builds_in_progress();
|
|
auto timestamp = api::new_timestamp();
|
|
mutation m{schema, partition_key::from_single_value(*schema, utf8_type->decompose(ks_name))};
|
|
|
|
for (size_t s = 0; s < smp::count; s++) {
|
|
auto ck = clustering_key_prefix(std::vector<bytes>{
|
|
utf8_type->decompose(view_name),
|
|
int32_type->decompose(int32_t(s))});
|
|
m.set_clustered_cell(ck, "generation_number", int32_t(0), timestamp);
|
|
if (s == this_shard_id()) {
|
|
m.set_clustered_cell(ck, "first_token", token.to_sstring(), timestamp);
|
|
}
|
|
}
|
|
return apply_mutation(std::move(m));
|
|
}
|
|
|
|
future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
|
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
|
|
SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
|
return execute_cql(
|
|
std::move(req),
|
|
std::move(ks_name),
|
|
std::move(view_name),
|
|
token.to_sstring(),
|
|
int32_t(this_shard_id())).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
|
|
return execute_cql(
|
|
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
|
std::move(ks_name),
|
|
std::move(view_name)).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) {
|
|
return execute_cql(
|
|
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
|
std::move(ks_name),
|
|
std::move(view_name),
|
|
int32_t(this_shard_id())).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) {
|
|
return execute_cql(
|
|
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", BUILT_VIEWS),
|
|
std::move(ks_name),
|
|
std::move(view_name)).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) {
|
|
return execute_cql(
|
|
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", BUILT_VIEWS),
|
|
std::move(ks_name),
|
|
std::move(view_name)).discard_result();
|
|
}
|
|
|
|
future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_views() {
|
|
return execute_cql(format("SELECT * FROM system.{}", BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
|
return *cql_result
|
|
| std::views::transform([] (const cql3::untyped_result_set::row& row) {
|
|
auto ks_name = row.get_as<sstring>("keyspace_name");
|
|
auto cf_name = row.get_as<sstring>("view_name");
|
|
return std::pair(std::move(ks_name), std::move(cf_name));
|
|
}) | std::ranges::to<std::vector<view_name>>();
|
|
});
|
|
}
|
|
|
|
future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_view_build_progress() {
|
|
return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
|
|
SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
|
std::vector<view_build_progress> progress;
|
|
for (auto& row : *cql_result) {
|
|
auto ks_name = row.get_as<sstring>("keyspace_name");
|
|
auto cf_name = row.get_as<sstring>("view_name");
|
|
auto first_token_opt = row.get_opt<sstring>("first_token").transform(dht::token::from_sstring);
|
|
|
|
auto next_token_sstring = row.get_opt<sstring>("next_token");
|
|
std::optional<dht::token> next_token;
|
|
if (next_token_sstring) {
|
|
next_token = dht::token::from_sstring(std::move(next_token_sstring).value());
|
|
}
|
|
auto cpu_id = row.get_as<int32_t>("cpu_id");
|
|
progress.emplace_back(view_build_progress{
|
|
view_name(std::move(ks_name), std::move(cf_name)),
|
|
std::move(first_token_opt),
|
|
std::move(next_token),
|
|
static_cast<shard_id>(cpu_id)});
|
|
}
|
|
return progress;
|
|
}).handle_exception([] (const std::exception_ptr& eptr) {
|
|
slogger.warn("Failed to load view build progress: {}", eptr);
|
|
return std::vector<view_build_progress>();
|
|
});
|
|
}
|
|
|
|
future<system_keyspace::view_build_status_map> system_keyspace::get_view_build_status_map() {
|
|
static const sstring query = format("SELECT * FROM {}.{}", NAME, VIEW_BUILD_STATUS_V2);
|
|
|
|
view_build_status_map map;
|
|
co_await _qp.query_internal(query, [&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
|
auto ks_name = row.get_as<sstring>("keyspace_name");
|
|
auto view_name = row.get_as<sstring>("view_name");
|
|
auto host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
|
auto status = view::build_status_from_string(row.get_as<sstring>("status"));
|
|
|
|
auto view = std::make_pair(std::move(ks_name), std::move(view_name));
|
|
map[view][host_id] = status;
|
|
co_return stop_iteration::no;
|
|
});
|
|
co_return map;
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_view_build_status_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id, view::build_status status) {
|
|
static const sstring stmt = format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS_V2);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second, host_id.uuid(), view::build_status_to_sstring(status)});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_view_build_status_update_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id, view::build_status status) {
|
|
static const sstring stmt = format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS_V2);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view::build_status_to_sstring(status), view_name.first, view_name.second, host_id.uuid()});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_remove_view_build_status_mutation(api::timestamp_type ts, system_keyspace_view_name view_name) {
|
|
static const sstring stmt = format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS_V2);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_remove_view_build_status_on_host_mutation(api::timestamp_type ts, system_keyspace_view_name view_name, locator::host_id host_id) {
|
|
static const sstring stmt = format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS_V2);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {view_name.first, view_name.second, host_id.uuid()});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
static constexpr auto VIEW_BUILDING_KEY = "view_building";
|
|
|
|
future<db::view::building_tasks> system_keyspace::get_view_building_tasks() {
|
|
static const sstring query = format("SELECT id, type, aborted, base_id, view_id, last_token, host_id, shard FROM {}.{} WHERE key = '{}'", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
|
|
using namespace db::view;
|
|
|
|
building_tasks tasks;
|
|
co_await _qp.query_internal(query, [&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
|
auto id = row.get_as<utils::UUID>("id");
|
|
auto type = task_type_from_string(row.get_as<sstring>("type"));
|
|
auto aborted = row.get_as<bool>("aborted");
|
|
auto base_id = table_id(row.get_as<utils::UUID>("base_id"));
|
|
auto view_id = row.get_opt<utils::UUID>("view_id").transform([] (const utils::UUID& uuid) { return table_id(uuid); });
|
|
auto last_token = dht::token::from_int64(row.get_as<int64_t>("last_token"));
|
|
auto host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
|
auto shard = unsigned(row.get_as<int32_t>("shard"));
|
|
|
|
locator::tablet_replica replica{host_id, shard};
|
|
view_building_task task{id, type, aborted, base_id, view_id, replica, last_token};
|
|
|
|
switch (type) {
|
|
case db::view::view_building_task::task_type::build_range:
|
|
if (!view_id) {
|
|
on_internal_error(slogger, fmt::format("view_id is not set for build_range task with id: {}", id));
|
|
}
|
|
tasks[base_id][replica].view_tasks[*view_id].insert({id, std::move(task)});
|
|
break;
|
|
case db::view::view_building_task::task_type::process_staging:
|
|
tasks[base_id][replica].staging_tasks.insert({id, std::move(task)});
|
|
break;
|
|
}
|
|
co_return stop_iteration::no;
|
|
});
|
|
co_return tasks;
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_view_building_task_mutation(api::timestamp_type ts, const db::view::view_building_task& task) {
|
|
static const sstring stmt = format("INSERT INTO {}.{}(key, id, type, aborted, base_id, view_id, last_token, host_id, shard) VALUES ('{}', ?, ?, ?, ?, ?, ?, ?, ?)", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
|
|
using namespace db::view;
|
|
|
|
data_value_or_unset view_id = unset_value{};
|
|
if (task.type == db::view::view_building_task::task_type::build_range) {
|
|
if (!task.view_id) {
|
|
on_internal_error(slogger, fmt::format("view_id is not set for build_range task with id: {}", task.id));
|
|
}
|
|
view_id = data_value(task.view_id->uuid());
|
|
}
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {
|
|
task.id, task_type_to_sstring(task.type), task.aborted,
|
|
task.base_id.uuid(), view_id, dht::token::to_int64(task.last_token),
|
|
task.replica.host.uuid(), int32_t(task.replica.shard)
|
|
});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id) {
|
|
static const sstring stmt = format("DELETE FROM {}.{} WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {id});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
static constexpr auto VIEW_BUILDING_PROCESSING_BASE_ID_KEY = "view_building_processing_base_id";
|
|
|
|
future<std::optional<table_id>> system_keyspace::get_view_building_processing_base_id() {
|
|
auto value = co_await get_scylla_local_param(VIEW_BUILDING_PROCESSING_BASE_ID_KEY);
|
|
co_return value.transform([] (sstring uuid) {
|
|
return table_id(utils::UUID(uuid));
|
|
});
|
|
}
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_view_building_processing_base_id_mutation() {
|
|
return get_scylla_local_mutation(_db, VIEW_BUILDING_PROCESSING_BASE_ID_KEY);
|
|
}
|
|
|
|
|
|
future<mutation> system_keyspace::make_view_building_processing_base_id_mutation(api::timestamp_type ts, table_id base_id) {
|
|
static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(
|
|
query, internal_system_query_state(),
|
|
ts, {VIEW_BUILDING_PROCESSING_BASE_ID_KEY, base_id.to_sstring()});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_remove_view_building_processing_base_id_mutation(api::timestamp_type ts) {
|
|
static sstring query = format("DELETE FROM {}.{} WHERE key = ?", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
|
|
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {VIEW_BUILDING_PROCESSING_BASE_ID_KEY});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<std::set<sstring>> system_keyspace::load_local_enabled_features() {
|
|
std::set<sstring> features;
|
|
auto features_str = co_await get_scylla_local_param(gms::feature_service::ENABLED_FEATURES_KEY);
|
|
if (features_str) {
|
|
features = gms::feature_service::to_feature_set(*features_str);
|
|
}
|
|
co_return features;
|
|
}
|
|
|
|
future<> system_keyspace::save_local_enabled_features(std::set<sstring> features, bool visible_before_cl_replay) {
|
|
auto features_str = fmt::to_string(fmt::join(features, ","));
|
|
co_await set_scylla_local_param(gms::feature_service::ENABLED_FEATURES_KEY, features_str, visible_before_cl_replay);
|
|
}
|
|
|
|
future<utils::UUID> system_keyspace::get_raft_group0_id() {
|
|
auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_group0_id");
|
|
co_return opt.value_or<utils::UUID>({});
|
|
}
|
|
|
|
future<> system_keyspace::set_raft_group0_id(utils::UUID uuid) {
|
|
return set_scylla_local_param_as<utils::UUID>("raft_group0_id", uuid, false);
|
|
}
|
|
|
|
static constexpr auto GROUP0_HISTORY_KEY = "history";
|
|
|
|
future<utils::UUID> system_keyspace::get_last_group0_state_id() {
|
|
auto rs = co_await execute_cql(
|
|
format(
|
|
"SELECT state_id FROM system.{} WHERE key = '{}' LIMIT 1",
|
|
GROUP0_HISTORY, GROUP0_HISTORY_KEY));
|
|
SCYLLA_ASSERT(rs);
|
|
if (rs->empty()) {
|
|
co_return utils::UUID{};
|
|
}
|
|
co_return rs->one().get_as<utils::UUID>("state_id");
|
|
}
|
|
|
|
future<bool> system_keyspace::group0_history_contains(utils::UUID state_id) {
|
|
auto rs = co_await execute_cql(
|
|
format(
|
|
"SELECT state_id FROM system.{} WHERE key = '{}' AND state_id = ?",
|
|
GROUP0_HISTORY, GROUP0_HISTORY_KEY),
|
|
state_id);
|
|
SCYLLA_ASSERT(rs);
|
|
co_return !rs->empty();
|
|
}
|
|
|
|
mutation system_keyspace::make_group0_history_state_id_mutation(
|
|
utils::UUID state_id, std::optional<gc_clock::duration> gc_older_than, std::string_view description) {
|
|
auto s = group0_history();
|
|
mutation m(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY));
|
|
auto& row = m.partition().clustered_row(*s, clustering_key::from_singular(*s, state_id));
|
|
auto ts = utils::UUID_gen::micros_timestamp(state_id);
|
|
row.apply(row_marker(ts));
|
|
if (!description.empty()) {
|
|
auto cdef = s->get_column_definition("description");
|
|
SCYLLA_ASSERT(cdef);
|
|
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, cdef->type->decompose(description)));
|
|
}
|
|
if (gc_older_than) {
|
|
using namespace std::chrono;
|
|
SCYLLA_ASSERT(*gc_older_than >= gc_clock::duration{0});
|
|
|
|
auto ts_micros = microseconds{ts};
|
|
auto gc_older_than_micros = duration_cast<microseconds>(*gc_older_than);
|
|
SCYLLA_ASSERT(gc_older_than_micros < ts_micros);
|
|
|
|
auto tomb_upper_bound = utils::UUID_gen::min_time_UUID(ts_micros - gc_older_than_micros);
|
|
// We want to delete all entries with IDs smaller than `tomb_upper_bound`
|
|
// but the deleted range is of the form (x, +inf) since the schema is reversed.
|
|
auto range = query::clustering_range::make_starting_with({
|
|
clustering_key_prefix::from_single_value(*s, timeuuid_type->decompose(tomb_upper_bound)), false});
|
|
auto bv = bound_view::from_range(range);
|
|
|
|
m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_clock::now()}});
|
|
}
|
|
return m;
|
|
}
|
|
|
|
future<mutation> system_keyspace::get_group0_history(sharded<replica::database>& db) {
|
|
auto s = group0_history();
|
|
auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY);
|
|
SCYLLA_ASSERT(rs);
|
|
auto& ps = rs->partitions();
|
|
for (auto& p: ps) {
|
|
// Note: we could decorate the frozen_mutation's key to check if it's the expected one
|
|
// but since this is a single partition table, we can just check after unfreezing the whole mutation.
|
|
auto mut = co_await unfreeze_gently(p.mut(), s);
|
|
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
|
if (partition_key == GROUP0_HISTORY_KEY) {
|
|
co_return mut;
|
|
}
|
|
slogger.warn("get_group0_history: unexpected partition in group0 history table: {}", partition_key);
|
|
}
|
|
|
|
slogger.warn("get_group0_history: '{}' partition not found", GROUP0_HISTORY_KEY);
|
|
co_return mutation(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY));
|
|
}
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_group0_schema_version() {
|
|
return get_scylla_local_mutation(_db, "group0_schema_version");
|
|
}
|
|
|
|
static constexpr auto AUTH_VERSION_KEY = "auth_version";
|
|
|
|
future<system_keyspace::auth_version_t> system_keyspace::get_auth_version() {
|
|
auto str_opt = co_await get_scylla_local_param(AUTH_VERSION_KEY);
|
|
if (!str_opt) {
|
|
co_return auth_version_t::v1;
|
|
}
|
|
auto& str = *str_opt;
|
|
if (str == "" || str == "1") {
|
|
co_return auth_version_t::v1;
|
|
}
|
|
if (str == "2") {
|
|
co_return auth_version_t::v2;
|
|
}
|
|
on_internal_error(slogger, fmt::format("unexpected auth_version in scylla_local got {}", str));
|
|
}
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_auth_version_mutation() {
|
|
return get_scylla_local_mutation(_db, AUTH_VERSION_KEY);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_auth_version_mutation(api::timestamp_type ts, db::system_keyspace::auth_version_t version) {
|
|
static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {AUTH_VERSION_KEY, std::to_string(int64_t(version))});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 auth_version mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
static constexpr auto VIEW_BUILDER_VERSION_KEY = "view_builder_version";
|
|
|
|
future<system_keyspace::view_builder_version_t> system_keyspace::get_view_builder_version() {
|
|
auto str_opt = co_await get_scylla_local_param(VIEW_BUILDER_VERSION_KEY);
|
|
if (!str_opt) {
|
|
co_return view_builder_version_t::v1;
|
|
}
|
|
auto& str = *str_opt;
|
|
if (str == "" || str == "10") {
|
|
co_return view_builder_version_t::v1;
|
|
}
|
|
if (str == "15") {
|
|
co_return view_builder_version_t::v1_5;
|
|
}
|
|
if (str == "20") {
|
|
co_return view_builder_version_t::v2;
|
|
}
|
|
on_internal_error(slogger, fmt::format("unexpected view_builder_version in scylla_local got {}", str));
|
|
}
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_view_builder_version_mutation() {
|
|
return get_scylla_local_mutation(_db, VIEW_BUILDER_VERSION_KEY);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_view_builder_version_mutation(api::timestamp_type ts, db::system_keyspace::view_builder_version_t version) {
|
|
static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {VIEW_BUILDER_VERSION_KEY, std::to_string(int64_t(version))});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, fmt::format("expected 1 view_builder_version mutation got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
static constexpr auto SERVICE_LEVEL_DRIVER_CREATED_KEY = "service_level_driver_created";
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_service_level_driver_created_mutation() {
|
|
return get_scylla_local_mutation(_db, SERVICE_LEVEL_DRIVER_CREATED_KEY);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_service_level_driver_created_mutation(bool is_created, api::timestamp_type timestamp) {
|
|
static const sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), timestamp, {SERVICE_LEVEL_DRIVER_CREATED_KEY, data_type_for<bool>()->to_string_impl(data_value(is_created))});
|
|
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, format("expecting single insert mutation, got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<std::optional<bool>> system_keyspace::get_service_level_driver_created() {
|
|
return get_scylla_local_param_as<bool>(SERVICE_LEVEL_DRIVER_CREATED_KEY);
|
|
}
|
|
|
|
static constexpr auto SERVICE_LEVELS_VERSION_KEY = "service_level_version";
|
|
|
|
future<std::optional<mutation>> system_keyspace::get_service_levels_version_mutation() {
|
|
return get_scylla_local_mutation(_db, SERVICE_LEVELS_VERSION_KEY);
|
|
}
|
|
|
|
future<mutation> system_keyspace::make_service_levels_version_mutation(int8_t version, api::timestamp_type timestamp) {
|
|
static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
|
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), timestamp, {SERVICE_LEVELS_VERSION_KEY, format("{}", version)});
|
|
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, format("expecting single insert mutation, got {}", muts.size()));
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
future<std::optional<int8_t>> system_keyspace::get_service_levels_version() {
|
|
return get_scylla_local_param_as<int8_t>(SERVICE_LEVELS_VERSION_KEY);
|
|
}
|
|
|
|
static constexpr auto GROUP0_UPGRADE_STATE_KEY = "group0_upgrade_state";
|
|
|
|
future<std::optional<sstring>> system_keyspace::load_group0_upgrade_state() {
|
|
return get_scylla_local_param_as<sstring>(GROUP0_UPGRADE_STATE_KEY);
|
|
}
|
|
|
|
future<> system_keyspace::save_group0_upgrade_state(sstring value) {
|
|
return set_scylla_local_param(GROUP0_UPGRADE_STATE_KEY, value, false);
|
|
}
|
|
|
|
static constexpr auto MUST_SYNCHRONIZE_TOPOLOGY_KEY = "must_synchronize_topology";
|
|
|
|
future<bool> system_keyspace::get_must_synchronize_topology() {
|
|
auto opt = co_await get_scylla_local_param_as<bool>(MUST_SYNCHRONIZE_TOPOLOGY_KEY);
|
|
co_return opt.value_or(false);
|
|
}
|
|
|
|
future<> system_keyspace::set_must_synchronize_topology(bool value) {
|
|
return set_scylla_local_param_as<bool>(MUST_SYNCHRONIZE_TOPOLOGY_KEY, value, false);
|
|
}
|
|
|
|
static std::set<sstring> decode_features(const set_type_impl::native_type& features) {
|
|
std::set<sstring> fset;
|
|
for (auto& f : features) {
|
|
fset.insert(value_cast<sstring>(std::move(f)));
|
|
}
|
|
return fset;
|
|
}
|
|
|
|
static bool must_have_tokens(service::node_state nst) {
|
|
switch (nst) {
|
|
case service::node_state::none: return false;
|
|
// Bootstrapping and replacing nodes don't have tokens at first,
|
|
// they are inserted only at some point during bootstrap/replace
|
|
case service::node_state::bootstrapping: return false;
|
|
case service::node_state::replacing: return false;
|
|
// A decommissioning node doesn't have tokens at the end, they are
|
|
// removed during transition to the left_token_ring state.
|
|
case service::node_state::decommissioning: return false;
|
|
// A removing node might or might not have tokens depending on whether
|
|
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
|
|
// cases, we allow removing nodes to not have tokens.
|
|
case service::node_state::removing: return false;
|
|
case service::node_state::rebuilding: return true;
|
|
case service::node_state::normal: return true;
|
|
case service::node_state::left: return false;
|
|
}
|
|
}
|
|
|
|
future<service::topology> system_keyspace::load_topology_state(const std::unordered_set<locator::host_id>& force_load_hosts) {
|
|
auto rs = co_await execute_cql(
|
|
format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
|
|
SCYLLA_ASSERT(rs);
|
|
|
|
service::topology_state_machine::topology_type ret;
|
|
|
|
if (rs->empty()) {
|
|
co_return ret;
|
|
}
|
|
|
|
const bool tablet_balancing_not_supported = _db.features().strongly_consistent_tables || _db.features().logstor;
|
|
|
|
for (auto& row : *rs) {
|
|
if (!row.has("host_id")) {
|
|
// There are no clustering rows, only the static row.
|
|
// Skip the whole loop, the static row is handled later.
|
|
break;
|
|
}
|
|
|
|
raft::server_id host_id{row.get_as<utils::UUID>("host_id")};
|
|
auto datacenter = row.get_as<sstring>("datacenter");
|
|
auto rack = row.get_as<sstring>("rack");
|
|
auto release_version = row.get_as<sstring>("release_version");
|
|
uint32_t num_tokens = row.get_as<int32_t>("num_tokens");
|
|
sstring tokens_string = row.get_as<sstring>("tokens_string");
|
|
size_t shard_count = row.get_as<int32_t>("shard_count");
|
|
uint8_t ignore_msb = row.get_as<int32_t>("ignore_msb");
|
|
sstring cleanup_status = row.get_as<sstring>("cleanup_status");
|
|
utils::UUID request_id = row.get_as<utils::UUID>("request_id");
|
|
|
|
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
|
|
|
|
std::optional<service::ring_slice> ring_slice;
|
|
if (row.has("tokens")) {
|
|
auto tokens = decode_tokens(deserialize_set_column(*topology(), row, "tokens"));
|
|
|
|
ring_slice = service::ring_slice {
|
|
.tokens = std::move(tokens),
|
|
};
|
|
} else {
|
|
auto zero_token = num_tokens == 0 && tokens_string.empty();
|
|
if (zero_token) {
|
|
// We distinguish normal zero-token nodes from token-owning nodes without tokens at the moment
|
|
// in the following way:
|
|
// - for normal zero-token nodes, ring_slice is engaged with an empty set of tokens,
|
|
// - for token-owning nodes without tokens at the moment, ring_slice equals std::nullopt.
|
|
// ring_slice also equals std::nullopt for joining zero-token nodes. The reason is that the
|
|
// topology coordinator assigns tokens in the join_group0 state handler, and we want to simulate
|
|
// assigning zero tokens for zero-token nodes. It allows us to have the same assertions for all nodes.
|
|
// The code below is correct because the join_group0 state is the last transition state if a joining
|
|
// node is zero-token.
|
|
// Note that we need this workaround because we store tokens in a non-frozen set, which doesn't
|
|
// distinguish an empty set from no value.
|
|
if (nstate != service::node_state::none && nstate != service::node_state::bootstrapping
|
|
&& nstate != service::node_state::replacing) {
|
|
ring_slice = service::ring_slice {
|
|
.tokens = std::unordered_set<dht::token>(),
|
|
};
|
|
}
|
|
} else if (must_have_tokens(nstate)) {
|
|
on_internal_error(slogger, format(
|
|
"load_topology_state: node {} in {} state but missing ring slice", host_id, nstate));
|
|
}
|
|
}
|
|
|
|
std::optional<raft::server_id> replaced_id;
|
|
if (row.has("replaced_id")) {
|
|
replaced_id = raft::server_id(row.get_as<utils::UUID>("replaced_id"));
|
|
}
|
|
|
|
std::optional<sstring> rebuild_option;
|
|
if (row.has("rebuild_option")) {
|
|
rebuild_option = row.get_as<sstring>("rebuild_option");
|
|
}
|
|
|
|
std::set<sstring> supported_features;
|
|
if (row.has("supported_features")) {
|
|
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
|
|
}
|
|
|
|
if (row.has("topology_request") && nstate != service::node_state::left) {
|
|
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
|
|
ret.requests.emplace(host_id, req);
|
|
switch(req) {
|
|
case service::topology_request::replace:
|
|
if (!replaced_id) {
|
|
on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id));
|
|
}
|
|
ret.req_param.emplace(host_id, service::replace_param{*replaced_id});
|
|
break;
|
|
case service::topology_request::rebuild:
|
|
if (!rebuild_option) {
|
|
on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id));
|
|
}
|
|
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
|
|
break;
|
|
default:
|
|
// no parameters for other requests
|
|
break;
|
|
}
|
|
} else {
|
|
switch (nstate) {
|
|
case service::node_state::bootstrapping:
|
|
// The tokens aren't generated right away when we enter the `bootstrapping` node state.
|
|
// Therefore we need to know the number of tokens when we generate them during the bootstrap process.
|
|
ret.req_param.emplace(host_id, service::join_param{num_tokens, tokens_string});
|
|
break;
|
|
case service::node_state::replacing:
|
|
// If a node is replacing we need to know which node it is replacing and which nodes are ignored
|
|
if (!replaced_id) {
|
|
on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id));
|
|
}
|
|
ret.req_param.emplace(host_id, service::replace_param{*replaced_id});
|
|
break;
|
|
case service::node_state::rebuilding:
|
|
// If a node is rebuilding it needs to know the parameter for the operation
|
|
if (!rebuild_option) {
|
|
on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id));
|
|
}
|
|
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
|
|
break;
|
|
default:
|
|
// no parameters for other operations
|
|
break;
|
|
}
|
|
}
|
|
|
|
std::unordered_map<raft::server_id, service::replica_state>* map = nullptr;
|
|
if (nstate == service::node_state::normal) {
|
|
map = &ret.normal_nodes;
|
|
} else if (nstate == service::node_state::left) {
|
|
ret.left_nodes.emplace(host_id);
|
|
if (force_load_hosts.contains(locator::host_id(host_id.uuid()))) {
|
|
map = &ret.left_nodes_rs;
|
|
}
|
|
} else if (nstate == service::node_state::none) {
|
|
map = &ret.new_nodes;
|
|
} else {
|
|
map = &ret.transition_nodes;
|
|
// Currently, at most one node at a time can be in transitioning state.
|
|
if (!map->empty()) {
|
|
const auto& [other_id, other_rs] = *map->begin();
|
|
on_internal_error(slogger, format(
|
|
"load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state",
|
|
other_id, other_rs.state, host_id, nstate));
|
|
}
|
|
}
|
|
if (map) {
|
|
map->emplace(host_id, service::replica_state{
|
|
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
|
ring_slice, shard_count, ignore_msb, std::move(supported_features),
|
|
service::cleanup_status_from_string(cleanup_status), request_id});
|
|
}
|
|
}
|
|
|
|
{
|
|
// Here we access static columns, any row will do.
|
|
auto& some_row = *rs->begin();
|
|
|
|
if (some_row.has("version")) {
|
|
ret.version = some_row.get_as<service::topology::version_t>("version");
|
|
}
|
|
|
|
if (some_row.has("fence_version")) {
|
|
ret.fence_version = some_row.get_as<service::topology::version_t>("fence_version");
|
|
}
|
|
|
|
if (some_row.has("transition_state")) {
|
|
ret.tstate = service::transition_state_from_string(some_row.get_as<sstring>("transition_state"));
|
|
} else {
|
|
// Any remaining transition_nodes must be in rebuilding state.
|
|
auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(),
|
|
[] (auto& p) { return p.second.state != service::node_state::rebuilding; });
|
|
if (it != ret.transition_nodes.end()) {
|
|
on_internal_error(slogger, format(
|
|
"load_topology_state: topology not in transition state"
|
|
" but transition node {} in rebuilding state is present", it->first));
|
|
}
|
|
}
|
|
|
|
if (some_row.has("new_cdc_generation_data_uuid")) {
|
|
ret.new_cdc_generation_data_uuid = some_row.get_as<utils::UUID>("new_cdc_generation_data_uuid");
|
|
}
|
|
|
|
if (some_row.has("committed_cdc_generations")) {
|
|
ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations"));
|
|
}
|
|
|
|
if (some_row.has("new_keyspace_rf_change_data")) {
|
|
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
|
|
ret.new_keyspace_rf_change_data = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
|
|
}
|
|
|
|
if (!ret.committed_cdc_generations.empty()) {
|
|
// Sanity check for CDC generation data consistency.
|
|
auto gen_id = ret.committed_cdc_generations.back();
|
|
auto gen_rows = co_await execute_cql(
|
|
format("SELECT count(range_end) as cnt FROM {}.{} WHERE key = '{}' AND id = ?",
|
|
NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY),
|
|
gen_id.id);
|
|
if (!gen_rows || gen_rows->empty()) {
|
|
on_internal_error(slogger, format(
|
|
"load_topology_state: last committed CDC generation time UUID ({}) present, but data missing", gen_id.id));
|
|
}
|
|
auto cnt = gen_rows->one().get_as<int64_t>("cnt");
|
|
slogger.debug("load_topology_state: last committed CDC generation time UUID ({}), loaded {} ranges", gen_id.id, cnt);
|
|
} else {
|
|
if (!ret.normal_nodes.empty()) {
|
|
on_internal_error(slogger,
|
|
"load_topology_state: normal nodes present but no committed CDC generations");
|
|
}
|
|
}
|
|
|
|
if (some_row.has("unpublished_cdc_generations")) {
|
|
ret.unpublished_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "unpublished_cdc_generations"));
|
|
}
|
|
|
|
if (some_row.has("global_topology_request")) {
|
|
auto req = service::global_topology_request_from_string(
|
|
some_row.get_as<sstring>("global_topology_request"));
|
|
ret.global_request.emplace(req);
|
|
}
|
|
|
|
if (some_row.has("global_topology_request_id")) {
|
|
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
|
|
}
|
|
|
|
if (some_row.has("global_requests")) {
|
|
for (auto&& v : deserialize_set_column(*topology(), some_row, "global_requests")) {
|
|
ret.global_requests_queue.push_back(value_cast<utils::UUID>(v));
|
|
}
|
|
}
|
|
|
|
if (some_row.has("paused_rf_change_requests")) {
|
|
for (auto&& v : deserialize_set_column(*topology(), some_row, "paused_rf_change_requests")) {
|
|
ret.paused_rf_change_requests.insert(value_cast<utils::UUID>(v));
|
|
}
|
|
}
|
|
|
|
if (some_row.has("enabled_features")) {
|
|
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
|
}
|
|
|
|
if (some_row.has("session")) {
|
|
ret.session = service::session_id(some_row.get_as<utils::UUID>("session"));
|
|
}
|
|
|
|
if (tablet_balancing_not_supported) {
|
|
ret.tablet_balancing_enabled = false;
|
|
} else if (some_row.has("tablet_balancing_enabled")) {
|
|
ret.tablet_balancing_enabled = some_row.get_as<bool>("tablet_balancing_enabled");
|
|
} else {
|
|
ret.tablet_balancing_enabled = true;
|
|
}
|
|
|
|
if (some_row.has("ignore_nodes")) {
|
|
ret.ignored_nodes = decode_nodes_ids(deserialize_set_column(*topology(), some_row, "ignore_nodes"));
|
|
}
|
|
|
|
ret.excluded_tablet_nodes = ret.ignored_nodes;
|
|
for (const auto& [id, _]: ret.left_nodes_rs) {
|
|
ret.excluded_tablet_nodes.insert(id);
|
|
}
|
|
}
|
|
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::optional<service::topology_features>> system_keyspace::load_topology_features_state() {
|
|
auto rs = co_await execute_cql(
|
|
format("SELECT host_id, node_state, supported_features, enabled_features FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
|
|
SCYLLA_ASSERT(rs);
|
|
|
|
co_return decode_topology_features_state(std::move(rs));
|
|
}
|
|
|
|
future<sstring> system_keyspace::load_topology_upgrade_state() {
|
|
auto rs = co_await execute_cql(
|
|
format("SELECT upgrade_state FROM system.{} WHERE key = '{}' LIMIT 1", TOPOLOGY, TOPOLOGY));
|
|
SCYLLA_ASSERT(rs);
|
|
if (rs->empty()) {
|
|
co_return "not_upgraded";
|
|
}
|
|
co_return rs->one().get_as<sstring>("upgrade_state");
|
|
}
|
|
|
|
std::optional<service::topology_features> system_keyspace::decode_topology_features_state(::shared_ptr<cql3::untyped_result_set> rs) {
|
|
service::topology_features ret;
|
|
|
|
if (rs->empty()) {
|
|
return std::nullopt;
|
|
}
|
|
|
|
auto& some_row = *rs->begin();
|
|
if (!some_row.has("enabled_features")) {
|
|
return std::nullopt;
|
|
}
|
|
|
|
for (auto& row : *rs) {
|
|
if (!row.has("host_id")) {
|
|
// There are no clustering rows, only the static row.
|
|
// Skip the whole loop, the static row is handled later.
|
|
break;
|
|
}
|
|
|
|
raft::server_id host_id{row.get_as<utils::UUID>("host_id")};
|
|
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
|
|
if (row.has("supported_features") && nstate == service::node_state::normal) {
|
|
ret.normal_supported_features.emplace(host_id, decode_features(deserialize_set_column(*topology(), row, "supported_features")));
|
|
}
|
|
}
|
|
|
|
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
|
|
|
return ret;
|
|
}
|
|
|
|
future<cdc::topology_description>
|
|
system_keyspace::read_cdc_generation(utils::UUID id) {
|
|
auto gen_desc = co_await read_cdc_generation_opt(id);
|
|
|
|
if (!gen_desc) {
|
|
on_internal_error(slogger, format(
|
|
"read_cdc_generation: data for CDC generation {} not present", id));
|
|
}
|
|
|
|
co_return std::move(*gen_desc);
|
|
}
|
|
|
|
future<std::optional<cdc::topology_description>>
|
|
system_keyspace::read_cdc_generation_opt(utils::UUID id) {
|
|
utils::chunked_vector<cdc::token_range_description> entries;
|
|
co_await _qp.query_internal(
|
|
format("SELECT range_end, streams, ignore_msb FROM {}.{} WHERE key = '{}' AND id = ?",
|
|
NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY),
|
|
db::consistency_level::ONE,
|
|
{ id },
|
|
1000, // for ~1KB rows, ~1MB page size
|
|
[&] (const cql3::untyped_result_set_row& row) {
|
|
std::vector<cdc::stream_id> streams;
|
|
row.get_list_data<bytes>("streams", std::back_inserter(streams));
|
|
entries.push_back(cdc::token_range_description{
|
|
dht::token::from_int64(row.get_as<int64_t>("range_end")),
|
|
std::move(streams),
|
|
uint8_t(row.get_as<int8_t>("ignore_msb"))});
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
});
|
|
|
|
if (entries.empty()) {
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
co_return cdc::topology_description{std::move(entries)};
|
|
}
|
|
|
|
future<> system_keyspace::sstables_registry_create_entry(table_id owner, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) {
|
|
static const auto req = format("INSERT INTO system.{} (owner, generation, status, state, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
|
slogger.trace("Inserting {}.{} into {}", owner, desc.generation, SSTABLES_REGISTRY);
|
|
co_await execute_cql(req, owner.id, desc.generation, status, sstables::state_to_dir(state), fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::sstables_registry_update_entry_status(table_id owner, sstables::generation_type gen, sstring status) {
|
|
static const auto req = format("UPDATE system.{} SET status = ? WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY);
|
|
slogger.trace("Updating {}.{} -> status={} in {}", owner, gen, status, SSTABLES_REGISTRY);
|
|
co_await execute_cql(req, status, owner.id, gen).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::sstables_registry_update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) {
|
|
static const auto req = format("UPDATE system.{} SET state = ? WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY);
|
|
auto new_state = sstables::state_to_dir(state);
|
|
slogger.trace("Updating {}.{} -> state={} in {}", owner, gen, new_state, SSTABLES_REGISTRY);
|
|
co_await execute_cql(req, new_state, owner.id, gen).discard_result();
|
|
}
|
|
|
|
future<> system_keyspace::sstables_registry_delete_entry(table_id owner, sstables::generation_type gen) {
|
|
static const auto req = format("DELETE FROM system.{} WHERE owner = ? AND generation = ?", SSTABLES_REGISTRY);
|
|
slogger.trace("Removing {}.{} from {}", owner, gen, SSTABLES_REGISTRY);
|
|
co_await execute_cql(req, owner.id, gen).discard_result();
|
|
|
|
}
|
|
|
|
future<> system_keyspace::sstables_registry_list(table_id owner, sstable_registry_entry_consumer consumer) {
|
|
static const auto req = format("SELECT status, state, generation, version, format FROM system.{} WHERE owner = ?", SSTABLES_REGISTRY);
|
|
slogger.trace("Listing {} entries from {}", owner, SSTABLES_REGISTRY);
|
|
|
|
co_await _qp.query_internal(req, db::consistency_level::ONE, { owner.id }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
|
auto status = row.get_as<sstring>("status");
|
|
auto state = sstables::state_from_dir(row.get_as<sstring>("state"));
|
|
auto gen = sstables::generation_type(row.get_as<utils::UUID>("generation"));
|
|
auto ver = sstables::version_from_string(row.get_as<sstring>("version"));
|
|
auto fmt = sstables::format_from_string(row.get_as<sstring>("format"));
|
|
sstables::entry_descriptor desc(gen, ver, fmt, sstables::component_type::TOC);
|
|
co_await consumer(std::move(status), std::move(state), std::move(desc));
|
|
co_return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<service::topology_request_state> system_keyspace::get_topology_request_state(utils::UUID id, bool require_entry) {
|
|
auto rs = co_await execute_cql(
|
|
format("SELECT done, error FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
|
|
if (!rs || rs->empty()) {
|
|
if (require_entry) {
|
|
on_internal_error(slogger, format("no entry for request id {}", id));
|
|
} else {
|
|
co_return service::topology_request_state{false, ""};
|
|
}
|
|
}
|
|
|
|
auto& row = rs->one();
|
|
sstring error;
|
|
|
|
if (row.has("error")) {
|
|
error = row.get_as<sstring>("error");
|
|
}
|
|
|
|
co_return service::topology_request_state{row.get_as<bool>("done"), std::move(error)};
|
|
}
|
|
|
|
system_keyspace::topology_requests_entry system_keyspace::topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row) {
|
|
topology_requests_entry entry;
|
|
entry.id = id;
|
|
if (row.has("initiating_host")) {
|
|
entry.initiating_host = row.get_as<utils::UUID>("initiating_host");
|
|
}
|
|
if (row.has("request_type")) {
|
|
auto rts = row.get_as<sstring>("request_type");
|
|
auto rt = service::try_topology_request_from_string(rts);
|
|
if (rt) {
|
|
entry.request_type = *rt;
|
|
} else {
|
|
entry.request_type = service::global_topology_request_from_string(rts);
|
|
}
|
|
}
|
|
if (row.has("start_time")) {
|
|
entry.start_time = row.get_as<db_clock::time_point>("start_time");
|
|
}
|
|
if (row.has("done")) {
|
|
entry.done = row.get_as<bool>("done");
|
|
}
|
|
if (row.has("error")) {
|
|
entry.error = row.get_as<sstring>("error");
|
|
}
|
|
if (row.has("end_time")) {
|
|
entry.end_time = row.get_as<db_clock::time_point>("end_time");
|
|
}
|
|
if (row.has("truncate_table_id")) {
|
|
entry.truncate_table_id = table_id(row.get_as<utils::UUID>("truncate_table_id"));
|
|
}
|
|
if (row.has("new_keyspace_rf_change_data")) {
|
|
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
|
|
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
|
|
}
|
|
if (row.has("snapshot_table_ids")) {
|
|
entry.snapshot_tag = row.get_as<sstring>("snapshot_tag");
|
|
entry.snapshot_skip_flush = row.get_as<bool>("snapshot_skip_flush");
|
|
entry.snapshot_table_ids = row.get_set<utils::UUID>("snapshot_table_ids")
|
|
| std::views::transform([](auto& uuid) { return table_id(uuid); })
|
|
| std::ranges::to<std::unordered_set>()
|
|
;
|
|
;
|
|
if (row.has("snapshot_expiry")) {
|
|
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
|
|
}
|
|
}
|
|
|
|
return entry;
|
|
}
|
|
|
|
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id) {
|
|
auto r = co_await get_topology_request_entry_opt(id);
|
|
if (!r) {
|
|
on_internal_error(slogger, format("no entry for request id {}", id));
|
|
}
|
|
co_return std::move(*r);
|
|
}
|
|
|
|
future<std::optional<system_keyspace::topology_requests_entry>> system_keyspace::get_topology_request_entry_opt(utils::UUID id) {
|
|
auto rs = co_await execute_cql(
|
|
format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
|
|
|
|
if (!rs || rs->empty()) {
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
const auto& row = rs->one();
|
|
co_return topology_request_row_to_entry(id, row);
|
|
}
|
|
|
|
future<system_keyspace::topology_requests_entries> system_keyspace::get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit) {
|
|
sstring request_types_str = "";
|
|
bool first = true;
|
|
for (const auto& rt : request_types) {
|
|
if (!std::exchange(first, false)) {
|
|
request_types_str += ", ";
|
|
}
|
|
request_types_str += std::visit([] (auto&& arg) { return fmt::format("'{}'", arg); }, rt);
|
|
}
|
|
|
|
// Running requests.
|
|
auto rs_running = co_await execute_cql(
|
|
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, request_types_str));
|
|
|
|
// Requests which finished after end_time_limit.
|
|
auto rs_done = co_await execute_cql(
|
|
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(), request_types_str));
|
|
|
|
topology_requests_entries m;
|
|
for (const auto& row: *rs_done) {
|
|
auto id = row.get_as<utils::UUID>("id");
|
|
m.emplace(id, topology_request_row_to_entry(id, row));
|
|
}
|
|
|
|
for (const auto& row: *rs_running) {
|
|
auto id = row.get_as<utils::UUID>("id");
|
|
// If a topology request finishes between the reads, it may be contained in both row sets.
|
|
// Keep the latest info.
|
|
m.emplace(id, topology_request_row_to_entry(id, row));
|
|
}
|
|
|
|
co_return m;
|
|
}
|
|
|
|
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
|
|
return get_topology_request_entries({
|
|
service::topology_request::join,
|
|
service::topology_request::replace,
|
|
service::topology_request::rebuild,
|
|
service::topology_request::leave,
|
|
service::topology_request::remove
|
|
}, end_time_limit);
|
|
}
|
|
|
|
future<mutation> system_keyspace::get_insert_dict_mutation(
|
|
std::string_view name,
|
|
bytes data,
|
|
locator::host_id host_id,
|
|
db_clock::time_point dict_ts,
|
|
api::timestamp_type write_ts
|
|
) const {
|
|
slogger.debug("Publishing new compression dictionary: {} {} {}", name, dict_ts, host_id);
|
|
|
|
static sstring insert_new = format("INSERT INTO {}.{} (name, timestamp, origin, data) VALUES (?, ?, ?, ?);", NAME, DICTS);
|
|
auto muts = co_await _qp.get_mutations_internal(insert_new, internal_system_query_state(), write_ts, {
|
|
data_value(name),
|
|
data_value(dict_ts),
|
|
data_value(host_id.uuid()),
|
|
data_value(std::move(data)),
|
|
});
|
|
if (muts.size() != 1) {
|
|
on_internal_error(slogger, "Expected to prepare a single mutation, but got multiple.");
|
|
}
|
|
co_return std::move(muts[0]);
|
|
}
|
|
|
|
mutation system_keyspace::get_delete_dict_mutation(std::string_view name, api::timestamp_type write_ts) {
|
|
auto s = db::system_keyspace::dicts();
|
|
mutation m(s, partition_key::from_single_value(*s,
|
|
data_value(name).serialize_nonnull()
|
|
));
|
|
m.partition().apply(tombstone(write_ts, gc_clock::now()));
|
|
return m;
|
|
}
|
|
|
|
future<std::vector<sstring>> system_keyspace::query_all_dict_names() const {
|
|
std::vector<sstring> result;
|
|
sstring query = format("SELECT name from {}.{}", NAME, DICTS);
|
|
auto rs = co_await _qp.execute_internal(
|
|
query, db::consistency_level::ONE, internal_system_query_state(), {}, cql3::query_processor::cache_internal::yes);
|
|
for (const auto& row : *rs) {
|
|
result.push_back(row.get_as<sstring>("name"));
|
|
}
|
|
co_return result;
|
|
}
|
|
|
|
future<netw::shared_dict> system_keyspace::query_dict(std::string_view name) const {
|
|
static sstring query = format("SELECT * FROM {}.{} WHERE name = ?;", NAME, DICTS);
|
|
auto result_set = co_await _qp.execute_internal(
|
|
query, db::consistency_level::ONE, internal_system_query_state(), {name}, cql3::query_processor::cache_internal::yes);
|
|
if (!result_set->empty()) {
|
|
auto &&row = result_set->one();
|
|
auto content = row.get_as<bytes>("data");
|
|
auto timestamp = row.get_as<db_clock::time_point>("timestamp").time_since_epoch().count();
|
|
auto origin = row.get_as<utils::UUID>("origin");
|
|
const int zstd_compression_level = 1;
|
|
co_return netw::shared_dict(
|
|
std::as_bytes(std::span(content)),
|
|
timestamp,
|
|
origin,
|
|
zstd_compression_level
|
|
);
|
|
} else {
|
|
co_return netw::shared_dict();
|
|
}
|
|
}
|
|
|
|
future<std::optional<db_clock::time_point>> system_keyspace::query_dict_timestamp(std::string_view name) const {
|
|
static sstring query = format("SELECT timestamp FROM {}.{} WHERE name = ?;", NAME, DICTS);
|
|
auto result_set = co_await _qp.execute_internal(
|
|
query, db::consistency_level::ONE, internal_system_query_state(), {name}, cql3::query_processor::cache_internal::yes);
|
|
if (!result_set->empty()) {
|
|
auto &&row = result_set->one();
|
|
auto timestamp = row.get_as<db_clock::time_point>("timestamp");
|
|
co_return timestamp;
|
|
} else {
|
|
co_return std::nullopt;
|
|
}
|
|
}
|
|
|
|
sstring system_keyspace_name() {
|
|
return system_keyspace::NAME;
|
|
}
|
|
|
|
system_keyspace::system_keyspace(
|
|
cql3::query_processor& qp, replica::database& db) noexcept
|
|
: _qp(qp)
|
|
, _db(db)
|
|
, _cache(std::make_unique<local_cache>())
|
|
{
|
|
_db.plug_system_keyspace(*this);
|
|
}
|
|
|
|
system_keyspace::~system_keyspace() {
|
|
}
|
|
|
|
future<> system_keyspace::shutdown() {
|
|
if (!_shutdown) {
|
|
_shutdown = true;
|
|
co_await _db.unplug_system_keyspace();
|
|
}
|
|
}
|
|
|
|
future<> system_keyspace::stop() {
|
|
co_await shutdown();
|
|
}
|
|
|
|
future<::shared_ptr<cql3::untyped_result_set>> system_keyspace::execute_cql(const sstring& query_string, const data_value_list& values) {
|
|
return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes);
|
|
}
|
|
|
|
future<> system_keyspace::apply_mutation(mutation m) {
|
|
if (m.schema()->ks_name() != NAME) {
|
|
on_internal_error(slogger, fmt::format("system_keyspace::apply_mutation(): attempted to apply mutation belonging to table {}.{}", m.schema()->cf_name(), m.schema()->ks_name()));
|
|
}
|
|
|
|
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
|
|
}
|
|
|
|
// The names are persisted in system tables so should not be changed.
|
|
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
|
|
{system_keyspace::repair_task_operation::requested, "requested"},
|
|
{system_keyspace::repair_task_operation::finished, "finished"},
|
|
};
|
|
|
|
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
|
|
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
|
|
for (auto&& [v, s] : repair_task_operation_to_name) {
|
|
result.emplace(s, v);
|
|
}
|
|
return result;
|
|
});
|
|
|
|
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
|
|
auto i = repair_task_operation_to_name.find(op);
|
|
if (i == repair_task_operation_to_name.end()) {
|
|
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
|
|
}
|
|
return i->second;
|
|
}
|
|
|
|
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
|
|
return repair_task_operation_from_name.at(name);
|
|
}
|
|
|
|
} // namespace db
|
|
|
|
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
|
|
}
|