Files
scylladb/replica/tablets.cc
Wojciech Mitros 654fe4b1ca db: add system tables for strongly consistent tables' raft groups
Add three new system tables for storing raft state for strongly
consistent tablets, corresponding to the tables for group0:

- system.raft_groups: Stores the raft log, term/vote, snapshot_id,
  and commit_idx for each tablet's raft group.

- system.raft_groups_snapshots: Stores snapshot descriptors
  (index, term) for each group.

- system.raft_groups_snapshot_config: Stores the raft configuration
  (current and previous voters) for each snapshot.

These tables use a (shard, group_id) composite partition key with
the newly added raft_groups_partitioner and raft_groups_sharder, ensuring
data is co-located with the tablet replica that owns the raft group.

The tables are only created when the STRONGLY_CONSISTENT_TABLES experimental
feature is enabled.
2026-02-25 12:34:58 +01:00

1323 lines
58 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <fmt/ranges.h>
#include <seastar/coroutine/maybe_yield.hh>
#include "types/types.hh"
#include "types/tuple.hh"
#include "types/list.hh"
#include "db/system_keyspace.hh"
#include "schema/schema_builder.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/stats.hh"
#include "gms/feature_service.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include "replica/tablet_mutation_builder.hh"
#include "sstables/sstable_set.hh"
#include "dht/token.hh"
#include "mutation/async_utils.hh"
#include "compaction/compaction_manager.hh"
#include "dht/fixed_shard.hh"
namespace replica {
using namespace locator;
static thread_local auto repair_scheduler_config_type = user_type_impl::get_instance(
"system", "repair_scheduler_config", {"auto_repair_enabled", "auto_repair_threshold"},
{boolean_type, long_type}, false);
static thread_local auto tablet_task_info_type = user_type_impl::get_instance(
"system", "tablet_task_info", {"request_type", "tablet_task_id", "request_time", "sched_nr", "sched_time", "repair_hosts_filter", "repair_dcs_filter"},
{utf8_type, uuid_type, timestamp_type, long_type, timestamp_type, utf8_type, utf8_type}, false);
static thread_local auto replica_type = tuple_type_impl::get_instance({uuid_type, int32_type});
static thread_local auto replica_set_type = list_type_impl::get_instance(replica_type, false);
static thread_local auto tablet_info_type = tuple_type_impl::get_instance({long_type, long_type, replica_set_type});
data_type get_replica_set_type() {
return replica_set_type;
}
data_type get_tablet_info_type() {
return tablet_info_type;
}
void tablet_add_repair_scheduler_user_types(const sstring& ks, replica::database& db) {
db.find_keyspace(ks).add_user_type(repair_scheduler_config_type);
db.find_keyspace(ks).add_user_type(tablet_task_info_type);
}
static bool strongly_consistent_tables_enabled = false;
void set_strongly_consistent_tables_enabled(bool enabled) {
strongly_consistent_tables_enabled = enabled;
}
schema_ptr make_tablets_schema() {
// FIXME: Allow UDTs in system keyspace:
// CREATE TYPE tablet_replica (replica_id uuid, shard int);
// replica_set_type = frozen<list<tablet_replica>>
auto id = generate_legacy_id(db::system_keyspace::NAME, db::system_keyspace::TABLETS);
// Bump the schema version offset for tablet repair scheduler columns
auto builder = schema_builder(db::system_keyspace::NAME, db::system_keyspace::TABLETS, id);
builder
.with_column("table_id", uuid_type, column_kind::partition_key)
.with_column("tablet_count", int32_type, column_kind::static_column)
.with_column("keyspace_name", utf8_type, column_kind::static_column)
.with_column("table_name", utf8_type, column_kind::static_column)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("replicas", replica_set_type)
.with_column("new_replicas", replica_set_type)
.with_column("stage", utf8_type)
.with_column("transition", utf8_type)
.with_column("session", uuid_type)
.with_column("resize_type", utf8_type, column_kind::static_column)
.with_column("resize_seq_number", long_type, column_kind::static_column)
.with_column("repair_time", timestamp_type)
.with_column("repair_task_info", tablet_task_info_type)
.with_column("repair_scheduler_config", repair_scheduler_config_type, column_kind::static_column)
.with_column("sstables_repaired_at", long_type)
.with_column("repair_incremental_mode", utf8_type)
.with_column("migration_task_info", tablet_task_info_type)
.with_column("resize_task_info", tablet_task_info_type, column_kind::static_column)
.with_column("base_table", uuid_type, column_kind::static_column);
if (strongly_consistent_tables_enabled) {
builder
.with_column("raft_group_id", uuid_type);
}
return builder
.with_hash_version()
.build();
}
schema_ptr make_raft_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
// raft log part
.with_column("index", long_type, column_kind::clustering_key)
.with_column("term", long_type)
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
// persisted term and vote
.with_column("vote_term", long_type, column_kind::static_column)
.with_column("vote", uuid_type, column_kind::static_column)
// id of the most recent persisted snapshot
.with_column("snapshot_id", uuid_type, column_kind::static_column)
.with_column("commit_idx", long_type, column_kind::static_column)
.with_hash_version()
.set_caching_options(caching_options::get_disabled_caching_options());
if (is_group0) {
return builder
.set_comment("Persisted RAFT log, votes and snapshot info")
.build();
} else {
return builder
.set_comment("Persisted RAFT log, votes and snapshot info for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
schema_ptr make_raft_snapshots_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft snapshots table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("snapshot_id", uuid_type)
// Index and term of last entry in the snapshot
.with_column("idx", long_type)
.with_column("term", long_type)
.with_hash_version();
if (is_group0) {
return builder
.set_comment("Persisted RAFT snapshots for strongly consistent tablets")
.build();
} else {
return builder
.set_comment("Persisted RAFT snapshot descriptors info for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
schema_ptr make_raft_snapshot_config_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft snapshot config table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
.with_hash_version();
if (is_group0) {
return builder
.set_comment("RAFT configuration for the latest snapshot descriptor")
.build();
} else {
return builder
.set_comment("RAFT configuration for the snapshot descriptor for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
std::vector<data_value> replicas_to_data_value(const tablet_replica_set& replicas) {
std::vector<data_value> result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(make_tuple_value(replica_type, {
data_value(utils::UUID(replica.host.uuid())),
data_value(int(replica.shard))
}));
}
return result;
};
data_value tablet_task_info_to_data_value(const locator::tablet_task_info& info) {
data_value result = make_user_value(tablet_task_info_type, {
data_value(locator::tablet_task_type_to_string(info.request_type)),
data_value(info.tablet_task_id.uuid()),
data_value(info.request_time),
data_value(info.sched_nr),
data_value(info.sched_time),
data_value(locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)),
data_value(locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)),
});
return result;
};
data_value repair_scheduler_config_to_data_value(const locator::repair_scheduler_config& config) {
data_value result = make_user_value(repair_scheduler_config_type, {
data_value(config.auto_repair_enabled),
data_value(int64_t(config.auto_repair_threshold.count())),
});
return result;
};
// Based on calibration run measuring 6ms time to freeze
// mutation with 16K tablets (with 9 replicas each) on a
// 3.4GHz amd64 cpu, and twice as much for unfreeze.
// 1K tablets would take around 0.4 ms to freeze and 0.8 ms
// to unfreeze.
constexpr size_t min_tablets_in_mutation = 1024;
future<>
tablet_map_to_mutations(const tablet_map& tablets, table_id id, const sstring& keyspace_name, const sstring& table_name,
api::timestamp_type ts, const gms::feature_service& features, std::function<future<>(mutation)> process_mutation) {
auto s = db::system_keyspace::tablets();
auto gc_now = gc_clock::now();
auto tombstone_ts = ts - 1;
auto key = partition_key::from_single_value(*s,
data_value(id.uuid()).serialize_nonnull()
);
auto make_mutation = [&] () {
mutation m(s, key);
m.partition().apply(tombstone(tombstone_ts, gc_now));
return m;
};
auto m = make_mutation();
m.set_static_cell("tablet_count", data_value(int(tablets.tablet_count())), ts);
m.set_static_cell("keyspace_name", data_value(keyspace_name), ts);
m.set_static_cell("table_name", data_value(table_name), ts);
m.set_static_cell("resize_type", data_value(tablets.resize_decision().type_name()), ts);
m.set_static_cell("resize_seq_number", data_value(int64_t(tablets.resize_decision().sequence_number)), ts);
if (features.tablet_resize_virtual_task && tablets.resize_task_info().is_valid()) {
m.set_static_cell("resize_task_info", tablet_task_info_to_data_value(tablets.resize_task_info()), ts);
}
if (features.tablet_repair_scheduler) {
auto config = tablets.get_repair_scheduler_config();
if (config) {
m.set_static_cell("repair_scheduler_config", repair_scheduler_config_to_data_value(*config), ts);
}
}
tablet_id tid = tablets.first_tablet();
size_t tablets_in_mutation = 0;
for (auto&& tablet : tablets.tablets()) {
if (++tablets_in_mutation >= min_tablets_in_mutation && seastar::need_preempt()) {
tablets_in_mutation = 0;
co_await coroutine::maybe_yield();
co_await process_mutation(std::exchange(m, make_mutation()));
}
auto last_token = tablets.get_last_token(tid);
auto ck = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(last_token)).serialize_nonnull());
m.set_clustered_cell(ck, "replicas", make_list_value(replica_set_type, replicas_to_data_value(tablet.replicas)), ts);
if (features.tablet_migration_virtual_task && tablet.migration_task_info.is_valid()) {
m.set_clustered_cell(ck, "migration_task_info", tablet_task_info_to_data_value(tablet.migration_task_info), ts);
}
if (features.tablet_repair_scheduler) {
if (tablet.repair_task_info.is_valid()) {
m.set_clustered_cell(ck, "repair_task_info", tablet_task_info_to_data_value(tablet.repair_task_info), ts);
if (features.tablet_incremental_repair) {
m.set_clustered_cell(ck, "repair_incremental_mode", locator::tablet_repair_incremental_mode_to_string(tablet.repair_task_info.repair_incremental_mode), ts);
}
}
if (tablet.repair_time != db_clock::time_point{}) {
m.set_clustered_cell(ck, "repair_time", data_value(tablet.repair_time), ts);
}
}
if (features.tablet_incremental_repair) {
m.set_clustered_cell(ck, "sstables_repaired_at", data_value(tablet.sstables_repaired_at), ts);
}
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
m.set_clustered_cell(ck, "transition", tablet_transition_kind_to_string(tr_info->transition), ts);
m.set_clustered_cell(ck, "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts);
if (tr_info->session_id) {
m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts);
}
}
if (tablets.has_raft_info()) {
const auto& raft_info = tablets.get_tablet_raft_info(tid);
m.set_clustered_cell(ck, "raft_group_id", raft_info.group_id.uuid(), ts);
}
tid = *tablets.next_tablet(tid);
}
co_await process_mutation(std::move(m));
}
mutation
colocated_tablet_map_to_mutation(table_id id, const sstring& keyspace_name, const sstring& table_name, table_id base_table, api::timestamp_type ts) {
auto s = db::system_keyspace::tablets();
auto gc_now = gc_clock::now();
auto tombstone_ts = ts - 1;
mutation m(s, partition_key::from_single_value(*s,
data_value(id.uuid()).serialize_nonnull()
));
m.partition().apply(tombstone(tombstone_ts, gc_now));
m.set_static_cell("keyspace_name", data_value(keyspace_name), ts);
m.set_static_cell("table_name", data_value(table_name), ts);
m.set_static_cell("base_table", data_value(base_table.uuid()), ts);
return m;
}
tablet_mutation_builder&
tablet_mutation_builder::set_new_replicas(dht::token last_token, locator::tablet_replica_set replicas) {
_m.set_clustered_cell(get_ck(last_token), "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(replicas)), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_replicas(dht::token last_token, locator::tablet_replica_set replicas) {
_m.set_clustered_cell(get_ck(last_token), "replicas", make_list_value(replica_set_type, replicas_to_data_value(replicas)), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_stage(dht::token last_token, locator::tablet_transition_stage stage) {
_m.set_clustered_cell(get_ck(last_token), "stage", data_value(tablet_transition_stage_to_string(stage)), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_transition(dht::token last_token, locator::tablet_transition_kind kind) {
_m.set_clustered_cell(get_ck(last_token), "transition", data_value(tablet_transition_kind_to_string(kind)), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_session(dht::token last_token, service::session_id session_id) {
_m.set_clustered_cell(get_ck(last_token), "session", data_value(session_id.uuid()), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_session(dht::token last_token) {
auto session_col = _s->get_column_definition("session");
_m.set_clustered_cell(get_ck(last_token), *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_transition(dht::token last_token) {
auto ck = get_ck(last_token);
auto stage_col = _s->get_column_definition("stage");
_m.set_clustered_cell(ck, *stage_col, atomic_cell::make_dead(_ts, gc_clock::now()));
auto transition_col = _s->get_column_definition("transition");
_m.set_clustered_cell(ck, *transition_col, atomic_cell::make_dead(_ts, gc_clock::now()));
auto new_replicas_col = _s->get_column_definition("new_replicas");
_m.set_clustered_cell(ck, *new_replicas_col, atomic_cell::make_dead(_ts, gc_clock::now()));
auto session_col = _s->get_column_definition("session");
_m.set_clustered_cell(ck, *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_resize_decision(locator::resize_decision resize_decision, const gms::feature_service& features) {
_m.set_static_cell("resize_type", data_value(resize_decision.type_name()), _ts);
_m.set_static_cell("resize_seq_number", data_value(int64_t(resize_decision.sequence_number)), _ts);
if (resize_decision.split_or_merge()) {
auto resize_task_info = std::holds_alternative<resize_decision::split>(resize_decision.way)
? locator::tablet_task_info::make_split_request()
: locator::tablet_task_info::make_merge_request();
resize_task_info.sched_nr++;
resize_task_info.sched_time = db_clock::now();
return set_resize_task_info(std::move(resize_task_info), features);
} else {
return del_resize_task_info(features);
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_repair_scheduler_config(locator::repair_scheduler_config config) {
_m.set_static_cell("repair_scheduler_config", repair_scheduler_config_to_data_value(config), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_repair_time(dht::token last_token, db_clock::time_point repair_time) {
_m.set_clustered_cell(get_ck(last_token), "repair_time", data_value(repair_time), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_sstables_repair_at(dht::token last_token, int64_t sstables_repaired_at) {
_m.set_clustered_cell(get_ck(last_token), "sstables_repaired_at", data_value(sstables_repaired_at), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_repair_task_info(dht::token last_token, locator::tablet_task_info repair_task_info, const gms::feature_service& features) {
_m.set_clustered_cell(get_ck(last_token), "repair_task_info", tablet_task_info_to_data_value(repair_task_info), _ts);
if (features.tablet_incremental_repair) {
auto mode = locator::tablet_repair_incremental_mode_to_string(repair_task_info.repair_incremental_mode);
_m.set_clustered_cell(get_ck(last_token), "repair_incremental_mode", data_value(mode), _ts);
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_repair_task_info(dht::token last_token, const gms::feature_service& features) {
auto col = _s->get_column_definition("repair_task_info");
_m.set_clustered_cell(get_ck(last_token), *col, atomic_cell::make_dead(_ts, gc_clock::now()));
if (features.tablet_incremental_repair) {
auto col = _s->get_column_definition("repair_incremental_mode");
_m.set_clustered_cell(get_ck(last_token), *col, atomic_cell::make_dead(_ts, gc_clock::now()));
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_migration_task_info(dht::token last_token, locator::tablet_task_info migration_task_info, const gms::feature_service& features) {
if (features.tablet_migration_virtual_task) {
_m.set_clustered_cell(get_ck(last_token), "migration_task_info", tablet_task_info_to_data_value(migration_task_info), _ts);
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_migration_task_info(dht::token last_token, const gms::feature_service& features) {
if (features.tablet_migration_virtual_task) {
auto col = _s->get_column_definition("migration_task_info");
_m.set_clustered_cell(get_ck(last_token), *col, atomic_cell::make_dead(_ts, gc_clock::now()));
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_resize_task_info(locator::tablet_task_info resize_task_info, const gms::feature_service& features) {
if (features.tablet_resize_virtual_task) {
_m.set_static_cell("resize_task_info", tablet_task_info_to_data_value(resize_task_info), _ts);
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_resize_task_info(const gms::feature_service& features) {
if (features.tablet_resize_virtual_task) {
auto col = _s->get_column_definition("resize_task_info");
_m.set_static_cell(*col, atomic_cell::make_dead(_ts, gc_clock::now()));
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_base_table(table_id base_table) {
_m.set_static_cell("base_table", data_value(base_table.uuid()), _ts);
return *this;
}
mutation make_drop_tablet_map_mutation(table_id id, api::timestamp_type ts) {
auto s = db::system_keyspace::tablets();
mutation m(s, partition_key::from_single_value(*s,
data_value(id.uuid()).serialize_nonnull()
));
m.partition().apply(tombstone(ts, gc_clock::now()));
return m;
}
tablet_replica_set tablet_replica_set_from_cell(const data_value& v) {
tablet_replica_set result;
auto list_v = value_cast<list_type_impl::native_type>(v);
result.reserve(list_v.size());
for (const data_value& replica_v : list_v) {
std::vector<data_value> replica_dv = value_cast<tuple_type_impl::native_type>(replica_v);
result.emplace_back(
host_id(value_cast<utils::UUID>(replica_dv[0])),
shard_id(value_cast<int>(replica_dv[1]))
);
}
return result;
}
static
tablet_replica_set deserialize_replica_set(cql3::untyped_result_set_row::view_type raw_value) {
return tablet_replica_set_from_cell(
replica_set_type->deserialize_value(raw_value));
}
locator::tablet_task_info tablet_task_info_from_cell(const data_value& v) {
std::vector<data_value> dv = value_cast<user_type_impl::native_type>(v);
auto result = locator::tablet_task_info{
locator::tablet_task_type_from_string(value_cast<sstring>(dv[0])),
locator::tablet_task_id(value_cast<utils::UUID>(dv[1])),
value_cast<db_clock::time_point>(dv[2]),
value_cast<int64_t>(dv[3]),
value_cast<db_clock::time_point>(dv[4]),
locator::tablet_task_info::deserialize_repair_hosts_filter(value_cast<sstring>(dv[5])),
locator::tablet_task_info::deserialize_repair_dcs_filter(value_cast<sstring>(dv[6])),
locator::tablet_repair_incremental_mode::disabled,
};
return result;
}
static
locator::tablet_task_info deserialize_tablet_task_info(cql3::untyped_result_set_row::view_type raw_value) {
return tablet_task_info_from_cell(
tablet_task_info_type->deserialize_value(raw_value));
}
locator::repair_scheduler_config repair_scheduler_config_from_cell(const data_value& v) {
std::vector<data_value> dv = value_cast<user_type_impl::native_type>(v);
auto result = locator::repair_scheduler_config{
value_cast<bool>(dv[0]),
std::chrono::seconds(value_cast<int64_t>(dv[1])),
};
return result;
}
static
locator::repair_scheduler_config deserialize_repair_scheduler_config(cql3::untyped_result_set_row::view_type raw_value) {
return repair_scheduler_config_from_cell(
repair_scheduler_config_type->deserialize_value(raw_value));
}
future<> save_tablet_metadata(replica::database& db, const tablet_metadata& tm, api::timestamp_type ts) {
tablet_logger.trace("Saving tablet metadata: {}", tm);
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(tm.all_tables_ungrouped().size());
for (auto&& [base_id, tables] : tm.all_table_groups()) {
// FIXME: Should we ignore missing tables? Currently doesn't matter because this is only used in tests.
const auto& tablets = tm.get_tablet_map(base_id);
auto s = db.find_schema(base_id);
co_await tablet_map_to_mutations(tablets, base_id, s->ks_name(), s->cf_name(), ts, db.features(), [&] (mutation m) -> future<> {
muts.emplace_back(co_await freeze_gently(m));
});
for (auto id : tables) {
if (id != base_id) {
auto s = db.find_schema(id);
muts.emplace_back(
colocated_tablet_map_to_mutation(id, s->ks_name(), s->cf_name(), base_id, ts));
}
}
}
co_await db.apply(muts, db::no_timeout);
}
static table_id to_tablet_metadata_key(const schema& s, const partition_key& key) {
const auto elements = key.explode(s);
return ::table_id(value_cast<utils::UUID>(uuid_type->deserialize_value(elements.front())));
}
static dht::token to_tablet_metadata_row_key(const schema& s, const clustering_key& key) {
const auto elements = key.explode(s);
return dht::token::from_int64(value_cast<int64_t>(long_type->deserialize_value(elements[0])));
}
static void do_update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hint, const schema& s, const mutation& m) {
const auto table_id = to_tablet_metadata_key(s, m.key());
auto it = hint.tables.try_emplace(table_id, locator::tablet_metadata_change_hint::table_hint{table_id, {}}).first;
const auto& mp = m.partition();
auto& tokens = it->second.tokens;
if (mp.partition_tombstone() || !mp.row_tombstones().empty() || !mp.static_row().empty()) {
// If there is a partition tombstone, range tombstone or static row,
// update the entire partition. Also clear any row hints that might be
// present to force a full read of the partition.
tokens.clear();
return;
}
for (const auto& row : mp.clustered_rows()) {
// TODO: we do not handle deletions yet, will revisit when tablet count
// reduction is worked out.
if (row.row().deleted_at()) {
tokens.clear();
return;
}
tokens.push_back(to_tablet_metadata_row_key(s, row.key()));
}
}
static std::optional<tablet_replica_set> maybe_deserialize_replica_set(const rows_entry& row, const column_definition& cdef) {
const auto* cell = row.row().cells().find_cell(cdef.id);
if (!cell) {
return std::nullopt;
}
auto dv = cdef.type->deserialize_value(cell->as_atomic_cell(cdef).value());
return tablet_replica_set_from_cell(dv);
}
static void do_validate_tablet_metadata_change(const locator::tablet_metadata& tm, const schema& s, const mutation& m) {
const auto table_id = to_tablet_metadata_key(s, m.key());
const auto& mp = m.partition();
if (mp.partition_tombstone() || !mp.row_tombstones().empty() || !mp.static_row().empty()) {
return;
}
if (mp.row_count() && !tm.is_base_table(table_id)) {
throw std::runtime_error(fmt::format("Table {} is a co-located table, it cannot have clustering rows.", table_id));
}
auto& r_cdef = *s.get_column_definition("replicas");
auto& nr_cdef = *s.get_column_definition("new_replicas");
for (const auto& row : mp.clustered_rows()) {
if (row.row().deleted_at()) {
return;
}
auto new_replicas = maybe_deserialize_replica_set(row, nr_cdef);
if (!new_replicas) {
continue;
}
auto token = to_tablet_metadata_row_key(s, row.key());
auto replicas = maybe_deserialize_replica_set(row, r_cdef);
if (!replicas) {
replicas = tm.get_tablet_map(table_id).get_tablet_info(token).replicas;
}
std::unordered_set<tablet_replica> pending = substract_sets(*new_replicas, *replicas);
if (pending.size() > 1) {
throw std::runtime_error(fmt::format("Too many pending replicas for table {} last_token {}: {}",
table_id, token, pending));
}
}
}
std::optional<locator::tablet_metadata_change_hint> get_tablet_metadata_change_hint(const utils::chunked_vector<canonical_mutation>& mutations) {
tablet_logger.trace("tablet_metadata_change_hint({})", mutations.size());
auto s = db::system_keyspace::tablets();
std::optional<locator::tablet_metadata_change_hint> hint;
for (const auto& cm : mutations) {
tablet_logger.trace("tablet_metadata_change_hint() {} == {}", cm.column_family_id(), s->id());
if (cm.column_family_id() != s->id()) {
continue;
}
if (!hint) {
hint.emplace();
hint->tables.reserve(mutations.size());
}
do_update_tablet_metadata_change_hint(*hint, *s, cm.to_mutation(s));
}
return hint;
}
void validate_tablet_metadata_change(const locator::tablet_metadata& tm, const utils::chunked_vector<canonical_mutation>& mutations) {
auto s = db::system_keyspace::tablets();
for (const auto& cm : mutations) {
if (cm.column_family_id() != s->id()) {
continue;
}
do_validate_tablet_metadata_change(tm, *s, cm.to_mutation(s));
}
}
void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hint, const mutation& m) {
auto s = db::system_keyspace::tablets();
if (m.column_family_id() != s->id()) {
return;
}
do_update_tablet_metadata_change_hint(hint, *s, m);
}
namespace {
tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
tablet_replica_set tablet_replicas;
if (row.has("replicas")) {
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
}
tablet_replica_set new_tablet_replicas;
if (row.has("new_replicas")) {
new_tablet_replicas = deserialize_replica_set(row.get_view("new_replicas"));
}
db_clock::time_point repair_time;
bool update_repair_time = false;
if (row.has("repair_time")) {
repair_time = row.get_as<db_clock::time_point>("repair_time");
update_repair_time = true;
}
int64_t sstables_repaired_at = 0;
if (row.has("sstables_repaired_at")) {
sstables_repaired_at = row.get_as<int64_t>("sstables_repaired_at");
}
locator::tablet_task_info repair_task_info;
if (row.has("repair_task_info")) {
repair_task_info = deserialize_tablet_task_info(row.get_view("repair_task_info"));
if (row.has("repair_incremental_mode")) {
auto inc = row.get_as<sstring>("repair_incremental_mode");
repair_task_info.repair_incremental_mode = locator::tablet_repair_incremental_mode_from_string(inc);
}
}
locator::tablet_task_info migration_task_info;
if (row.has("migration_task_info")) {
migration_task_info = deserialize_tablet_task_info(row.get_view("migration_task_info"));
}
if (row.has("stage")) {
auto stage = tablet_transition_stage_from_string(row.get_as<sstring>("stage"));
auto transition = tablet_transition_kind_from_string(row.get_as<sstring>("transition"));
std::unordered_set<tablet_replica> pending = substract_sets(new_tablet_replicas, tablet_replicas);
if (pending.size() > 1) {
throw std::runtime_error(fmt::format("Too many pending replicas for table {} tablet {}: {}",
table, tid, pending));
}
std::optional<tablet_replica> pending_replica;
if (pending.size() != 0) {
pending_replica = *pending.begin();
}
service::session_id session_id;
if (row.has("session")) {
session_id = service::session_id(row.get_as<utils::UUID>("session"));
}
map.set_tablet_transition_info(tid, tablet_transition_info{stage, transition,
std::move(new_tablet_replicas), pending_replica, session_id});
}
tablet_logger.debug("Set sstables_repaired_at={} table={} tablet={}", sstables_repaired_at, table, tid);
map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info, sstables_repaired_at});
if (row.has("raft_group_id")) {
if (!map.has_raft_info()) {
on_internal_error(tablet_logger,
format("Unexpected raft group id for tablet {} of table {}", tid, table));
}
map.set_tablet_raft_info(tid, {
.group_id = raft::group_id(row.get_as<utils::UUID>("raft_group_id"))
});
} else if (map.has_raft_info()) {
on_internal_error(tablet_logger,
format("Raft group id is not set for tablet {} of table {}", tid, table));
}
if (update_repair_time && db) {
auto myid = db->get_token_metadata().get_my_id();
auto range = map.get_token_range(tid);
auto& info = map.get_tablet_info(tid);
for (auto r : info.replicas) {
if (r.host == myid) {
auto& gc_state = db->get_compaction_manager().get_shared_tombstone_gc_state();
gc_state.insert_pending_repair_time_update(table, range, to_gc_clock(repair_time), r.shard);
tablet_logger.debug("Insert pending repair time for tombstone gc: table={} tablet={} range={} repair_time={}",
table, tid, range, repair_time);
break;
}
}
}
auto persisted_last_token = dht::token::from_int64(row.get_as<int64_t>("last_token"));
auto current_last_token = map.get_last_token(tid);
if (current_last_token != persisted_last_token) {
tablet_logger.debug("current tablet_map: {}", map);
throw std::runtime_error(format("last_token mismatch between on-disk ({}) and in-memory ({}) tablet map for table {} tablet {}",
persisted_last_token, current_last_token, table, tid));
}
return *map.next_tablet(tid);
}
struct tablet_metadata_builder {
tablet_metadata& tm;
struct active_tablet_map {
table_id table;
tablet_map map;
tablet_id tid;
};
std::optional<active_tablet_map> current;
// maps a co-located table to its base table.
// when reading the tablet metadata of a co-located table, we store it in the map, and we apply
// all co-located tables in on_end_of_stream. This is because we want to apply all normal tables first,
// to ensure the base table tablet map is already present when we apply the co-located tables.
std::unordered_map<table_id, table_id> base_tables;
void process_row(const cql3::untyped_result_set_row& row, replica::database* db) {
auto table = table_id(row.get_as<utils::UUID>("table_id"));
if (!current || current->table != table) {
if (current) {
tm.set_tablet_map(current->table, std::move(current->map));
}
if (row.has("base_table")) {
auto base_table = table_id(row.get_as<utils::UUID>("base_table"));
base_tables[table] = base_table;
current = {};
} else {
auto tablet_count = row.get_as<int>("tablet_count");
auto with_raft_info = db->features().strongly_consistent_tables && row.has("raft_group_id");
auto tmap = tablet_map(tablet_count, with_raft_info);
auto first_tablet = tmap.first_tablet();
current = active_tablet_map{table, std::move(tmap), first_tablet};
}
// Resize decision fields are static columns, so set them only once per table.
if (row.has("resize_type") && row.has("resize_seq_number")) {
auto resize_type_name = row.get_as<sstring>("resize_type");
int64_t resize_seq_number = row.get_as<int64_t>("resize_seq_number");
locator::resize_decision resize_decision(std::move(resize_type_name), resize_seq_number);
current->map.set_resize_decision(std::move(resize_decision));
}
if (row.has("resize_task_info")) {
current->map.set_resize_task_info(deserialize_tablet_task_info(row.get_view("resize_task_info")));
}
if (row.has("repair_scheduler_config")) {
auto config = deserialize_repair_scheduler_config(row.get_view("repair_scheduler_config"));
current->map.set_repair_scheduler_config(std::move(config));
}
}
if (row.has("last_token")) {
current->tid = process_one_row(db, current->table, current->map, current->tid, row);
}
}
future<> on_end_of_stream() {
if (current) {
tm.set_tablet_map(current->table, std::move(current->map));
}
// Set co-located tables after setting all other tablet maps to ensure the tablet map
// of the base table is found.
for (auto&& [table, base_table] : base_tables) {
co_await tm.set_colocated_table(table, base_table);
}
}
};
} // anonymous namespace
future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
tablet_metadata tm;
tablet_metadata_builder builder{tm};
tablet_logger.trace("Start reading tablet metadata");
try {
co_await qp.query_internal("select * from system.tablets",
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
builder.process_row(row, qp.db().real_database_ptr());
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} catch (...) {
if (builder.current) {
std::throw_with_nested(std::runtime_error(format("Failed to read tablet metadata for table {}", builder.current->table)));
} else {
std::throw_with_nested(std::runtime_error("Failed to read tablet metadata"));
}
}
co_await builder.on_end_of_stream();
tablet_logger.trace("Read tablet metadata: {}", tm);
co_return std::move(tm);
}
future<std::unordered_set<locator::host_id>> read_required_hosts(cql3::query_processor& qp) {
std::unordered_set<locator::host_id> hosts;
auto process_row = [&] (const cql3::untyped_result_set_row& row) {
tablet_replica_set tablet_replicas;
if (row.has("replicas")) {
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
}
for (auto&& r : tablet_replicas) {
hosts.insert(r.host);
}
if (row.has("new_replicas")) {
tablet_replica_set new_tablet_replicas;
new_tablet_replicas = deserialize_replica_set(row.get_view("new_replicas"));
for (auto&& r : new_tablet_replicas) {
hosts.insert(r.host);
}
}
};
try {
co_await qp.query_internal("select replicas, new_replicas from system.tablets",
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
process_row(row);
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} catch (...) {
std::throw_with_nested(std::runtime_error("Failed to read tablet required hosts"));
}
co_return std::move(hosts);
}
static future<>
do_update_tablet_metadata_partition(cql3::query_processor& qp, tablet_metadata& tm, const tablet_metadata_change_hint::table_hint& hint, tablet_metadata_builder& builder) {
co_await qp.query_internal(
"select * from system.tablets where table_id = ?",
db::consistency_level::ONE,
{data_value(hint.table_id.uuid())},
1000,
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
builder.process_row(row, qp.db().real_database_ptr());
return make_ready_future<stop_iteration>(stop_iteration::no);
});
if (builder.current) {
tm.set_tablet_map(builder.current->table, std::move(builder.current->map));
builder.current = {};
} else if (builder.base_tables.contains(hint.table_id)) {
// it's a co-located table. we handle it later, after processing all tables, by builder.on_end_of_stream().
} else {
tm.drop_tablet_map(hint.table_id);
}
}
static future<>
do_update_tablet_metadata_rows(replica::database& db, cql3::query_processor& qp, tablet_map& tmap, const tablet_metadata_change_hint::table_hint& hint) {
for (const auto token : hint.tokens) {
auto res = co_await qp.execute_internal(
"select * from system.tablets where table_id = ? and last_token = ?",
db::consistency_level::ONE,
{data_value(hint.table_id.uuid()), data_value(dht::token::to_int64(token))},
cql3::query_processor::cache_internal::yes);
const auto tid = tmap.get_tablet_id(token);
if (res->empty()) {
throw std::runtime_error("Failed to update tablet metadata: updated row is empty");
} else {
tmap.clear_tablet_transition_info(tid);
process_one_row(&db, hint.table_id, tmap, tid, res->one());
}
}
}
future<> update_tablet_metadata(replica::database& db, cql3::query_processor& qp, tablet_metadata& tm, const locator::tablet_metadata_change_hint& hint) {
tablet_metadata_builder builder{tm};
try {
for (const auto& [_, table_hint] : hint.tables) {
if (table_hint.tokens.empty()) {
co_await do_update_tablet_metadata_partition(qp, tm, table_hint, builder);
} else {
co_await tm.mutate_tablet_map_async(table_hint.table_id, [&] (tablet_map& tmap) -> future<> {
co_await do_update_tablet_metadata_rows(db, qp, tmap, table_hint);
});
}
}
} catch (...) {
std::throw_with_nested(std::runtime_error("Failed to read tablet metadata"));
}
co_await builder.on_end_of_stream();
tablet_logger.trace("Updated tablet metadata: {}", tm);
}
future<> read_tablet_mutations(seastar::sharded<replica::database>& db, std::function<void(canonical_mutation)> process_mutation) {
auto s = db::system_keyspace::tablets();
auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::TABLETS);
utils::chunked_vector<canonical_mutation> result;
result.reserve(rs->partitions().size());
constexpr size_t max_rows = min_tablets_in_mutation;
for (auto& p: rs->partitions()) {
co_await unfreeze_and_split_gently(p.mut(), s, max_rows, [&] (mutation m) -> future<> {
process_mutation(co_await make_canonical_mutation_gently(m));
});
}
}
// This sstable set provides access to all the stables in the table, using a snapshot of all
// its tablets/storage_groups compound_sstable_set:s.
// The managed sets cannot be modified through tablet_sstable_set, but only jointly read from, so insert() and erase() are disabled.
class tablet_sstable_set : public sstables::sstable_set_impl {
schema_ptr _schema;
locator::tablet_map _tablet_map;
// Keep a single (compound) sstable_set per tablet/storage_group
absl::flat_hash_map<size_t, lw_shared_ptr<const sstables::sstable_set>, absl::Hash<size_t>> _sstable_sets;
// Used when ordering is required for correctness, but hot paths will use flat_hash_map
// which provides faster lookup time.
std::set<size_t> _sstable_set_ids;
size_t _size = 0;
sstables::file_size_stats _file_size_stats;
public:
tablet_sstable_set(const tablet_sstable_set& o)
: _schema(o._schema)
, _tablet_map(o._tablet_map.clone())
, _sstable_sets(o._sstable_sets)
, _sstable_set_ids(o._sstable_set_ids)
, _size(o._size)
, _file_size_stats(o._file_size_stats)
{}
tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap)
: _schema(std::move(s))
, _tablet_map(tmap.tablet_count())
{
sgm.for_each_storage_group([this] (size_t id, storage_group& sg) {
auto set = sg.make_sstable_set();
_size += set->size();
_file_size_stats += set->get_file_size_stats();
_sstable_sets[id] = std::move(set);
_sstable_set_ids.insert(id);
});
}
static lw_shared_ptr<sstables::sstable_set> make(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) {
return make_lw_shared<sstables::sstable_set>(std::make_unique<tablet_sstable_set>(std::move(s), sgm, tmap));
}
const schema_ptr& schema() const noexcept {
return _schema;
}
virtual std::unique_ptr<sstable_set_impl> clone() const override {
return std::make_unique<tablet_sstable_set>(*this);
}
virtual std::vector<sstables::shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const sstables::shared_sstable&)> func) const override;
virtual future<stop_iteration> for_each_sstable_gently_until(std::function<future<stop_iteration>(const sstables::shared_sstable&)> func) const override;
virtual bool insert(sstables::shared_sstable sst) override;
virtual bool erase(sstables::shared_sstable sst) override;
virtual size_t size() const noexcept override {
return _size;
}
virtual sstables::file_size_stats get_file_size_stats() const noexcept override {
return _file_size_stats;
}
virtual selector_and_schema_t make_incremental_selector() const override;
virtual mutation_reader create_single_key_sstable_reader(
replica::column_family*,
schema_ptr,
reader_permit,
utils::estimated_histogram&,
const dht::partition_range&,
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstables::sstable_predicate&,
sstables::integrity_check integrity = sstables::integrity_check::no) const override;
// Will always return an engaged sstable set ptr.
const lw_shared_ptr<const sstables::sstable_set>& find_sstable_set(size_t i) const {
auto it = _sstable_sets.find(i);
if (it == _sstable_sets.end() || !it->second) [[unlikely]] {
throw std::runtime_error(format("SSTable set wasn't found for tablet {} of table {}.{}", i, schema()->ks_name(), schema()->cf_name()));
}
return it->second;
}
private:
size_t group_of(const dht::token& t) const noexcept {
return _tablet_map.get_tablet_id(t).id;
}
dht::token first_token_of(size_t idx) const noexcept {
#ifndef SCYLLA_BUILD_MODE_RELEASE
if (idx >= _tablet_map.tablet_count()) {
on_fatal_internal_error(tablet_logger, format("first_token_of: idx={} out of range", idx));
}
#endif
return _tablet_map.get_first_token(tablet_id(idx));
}
dht::token last_token_of(size_t idx) const noexcept {
#ifndef SCYLLA_BUILD_MODE_RELEASE
if (idx >= _tablet_map.tablet_count()) {
on_fatal_internal_error(tablet_logger, format("last_token_of: idx={} out of range", idx));
}
#endif
return _tablet_map.get_last_token(tablet_id(idx));
}
stop_iteration for_each_sstable_set_until(const dht::partition_range&, std::function<stop_iteration(lw_shared_ptr<const sstables::sstable_set>)>) const;
future<stop_iteration> for_each_sstable_set_gently_until(const dht::partition_range&, std::function<future<stop_iteration>(lw_shared_ptr<const sstables::sstable_set>)>) const;
auto subrange(const dht::partition_range& pr) const {
size_t candidate_start = pr.start() ? group_of(pr.start()->value().token()) : size_t(0);
size_t candidate_end = pr.end() ? group_of(pr.end()->value().token()) : (_tablet_map.tablet_count() - 1);
return std::ranges::subrange(_sstable_set_ids.lower_bound(candidate_start), _sstable_set_ids.upper_bound(candidate_end));
}
friend class tablet_incremental_selector;
};
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) {
return tablet_sstable_set::make(std::move(s), sgm, tmap);
}
future<std::optional<table_id>> read_base_table(cql3::query_processor& qp, table_id tid) {
auto rs = co_await qp.execute_internal("select * from system.tablets where table_id = ?",
{tid.uuid()}, cql3::query_processor::cache_internal::no);
if (rs->empty() || !rs->front().has("base_table")) {
co_return std::nullopt;
}
co_return table_id(rs->front().get_as<utils::UUID>("base_table"));
}
future<std::optional<tablet_transition_stage>> read_tablet_transition_stage(cql3::query_processor& qp, table_id tid, dht::token last_token) {
if (auto base_table = co_await read_base_table(qp, tid)) {
tid = *base_table;
}
auto rs = co_await qp.execute_internal("select stage from system.tablets where table_id = ? and last_token = ?",
{tid.uuid(), dht::token::to_int64(last_token)}, cql3::query_processor::cache_internal::no);
if (rs->empty() || !rs->one().has("stage")) {
co_return std::nullopt;
}
co_return tablet_transition_stage_from_string(rs->one().get_as<sstring>("stage"));
}
stop_iteration tablet_sstable_set::for_each_sstable_set_until(const dht::partition_range& pr, std::function<stop_iteration(lw_shared_ptr<const sstables::sstable_set>)> func) const {
for (const auto& i : subrange(pr)) {
const auto& set = find_sstable_set(i);
if (func(set) == stop_iteration::yes) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
future<stop_iteration> tablet_sstable_set::for_each_sstable_set_gently_until(const dht::partition_range& pr, std::function<future<stop_iteration>(lw_shared_ptr<const sstables::sstable_set>)> func) const {
for (const auto& i : subrange(pr)) {
const auto& set = find_sstable_set(i);
if (co_await func(set) == stop_iteration::yes) {
co_return stop_iteration::yes;
}
}
co_return stop_iteration::no;
}
std::vector<sstables::shared_sstable> tablet_sstable_set::select(const dht::partition_range& range) const {
std::vector<sstables::shared_sstable> ret;
ret.reserve(size());
for_each_sstable_set_until(range, [&] (lw_shared_ptr<const sstables::sstable_set> set) {
auto ssts = set->select(range);
if (ret.empty()) {
ret = std::move(ssts);
} else {
std::move(ssts.begin(), ssts.end(), std::back_inserter(ret));
}
return stop_iteration::no;
});
tablet_logger.debug("tablet_sstable_set::select: range={} ret={}", range, ret.size());
return ret;
}
lw_shared_ptr<const sstable_list> tablet_sstable_set::all() const {
auto ret = make_lw_shared<sstable_list>();
ret->reserve(size());
for_each_sstable_set_until(query::full_partition_range, [&] (lw_shared_ptr<const sstables::sstable_set> set) {
set->for_each_sstable([&] (const sstables::shared_sstable& sst) {
ret->insert(sst);
});
return stop_iteration::no;
});
return ret;
}
stop_iteration tablet_sstable_set::for_each_sstable_until(std::function<stop_iteration(const sstables::shared_sstable&)> func) const {
return for_each_sstable_set_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr<const sstables::sstable_set> set) {
return set->for_each_sstable_until(func);
});
}
future<stop_iteration> tablet_sstable_set::for_each_sstable_gently_until(std::function<future<stop_iteration>(const sstables::shared_sstable&)> func) const {
return for_each_sstable_set_gently_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr<const sstables::sstable_set> set) {
return set->for_each_sstable_gently_until(func);
});
}
bool tablet_sstable_set::insert(sstables::shared_sstable sst) {
throw_with_backtrace<std::bad_function_call>();
}
bool tablet_sstable_set::erase(sstables::shared_sstable sst) {
throw_with_backtrace<std::bad_function_call>();
}
class tablet_incremental_selector : public sstables::incremental_selector_impl {
const tablet_sstable_set& _tset;
// _cur_set and _cur_selector contain a snapshot
// for the currently selected compaction_group.
lw_shared_ptr<const sstables::sstable_set> _cur_set;
std::optional<sstables::sstable_set::incremental_selector> _cur_selector;
dht::token _lowest_next_token = dht::maximum_token();
public:
tablet_incremental_selector(const tablet_sstable_set& tset)
: _tset(tset)
{}
virtual std::tuple<dht::partition_range, std::vector<sstables::shared_sstable>, dht::ring_position_ext> select(const selector_pos& s) override {
// Always return minimum singular range, such that incremental_selector::select() will always call this function,
// which in turn will find the next sstable set to select sstables from.
const dht::partition_range current_range = dht::partition_range::make_singular(dht::ring_position::min());
// pos must be monotonically increasing in the weak sense
// but caller can skip to a position outside the current set
const dht::ring_position_view& pos = s.pos;
auto token = pos.token();
auto pr_end = s.range ? dht::ring_position_view::for_range_end(*s.range) : dht::ring_position_view::max();
// End of stream is reached when pos is past the end of the read range (i.e. exclude tablets
// that doesn't intersect with the range).
// We don't want to advance next position when EOS has been reached, such that a fast forward
// to the next tablet range will work.
bool eos_reached = dht::ring_position_tri_compare(*_tset.schema(), pos, pr_end) > 0;
if ((!_cur_set || pos.token() >= _lowest_next_token) && !eos_reached) {
auto idx = _tset.group_of(token);
if (_tset._sstable_set_ids.contains(idx)) {
_cur_set = _tset.find_sstable_set(idx);
}
// Set the next token to point to the next engaged storage group.
// It will be considered later on when the _cur_set is exhausted
_lowest_next_token = find_lowest_next_token(idx);
}
if (!_cur_set) {
auto lowest_next_position = _lowest_next_token.is_maximum()
? dht::ring_position_ext::max()
: dht::ring_position_ext::starting_at(_lowest_next_token);
tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning 0 sstables, next_pos={}",
_tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, lowest_next_position);
return std::make_tuple(std::move(current_range), std::vector<sstables::shared_sstable>{}, lowest_next_position);
}
_cur_selector.emplace(_cur_set->make_incremental_selector());
auto res = _cur_selector->select(s);
// Return all sstables selected on the requested position from the first matching sstable set.
// This assumes that the underlying sstable sets are disjoint in their token ranges so
// only one of them contain any given token.
auto sstables = std::move(res.sstables);
// Return the lowest next position, such that this function will be called again to select the
// lowest next position from the selector which previously returned it.
// Until the current selector is exhausted. In that case,
// jump to the next compaction_group sstable set.
dht::ring_position_ext next_position = res.next_position;
if (next_position.is_max()) {
// _cur_selector is exhausted.
// Return a position starting at `_lowest_next_token`
// that was calculated for the _cur_set
// (unless it's already maximum_token in which case we just return next_position == ring_position::max()).
_cur_set = {};
_cur_selector.reset();
if (!_lowest_next_token.is_maximum()) {
next_position = dht::ring_position_ext::starting_at(_lowest_next_token);
}
}
tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning {} sstables, next_pos={}",
_tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, sstables.size(), next_position);
return std::make_tuple(std::move(current_range), std::move(sstables), std::move(next_position));
}
private:
// Find the start token of the first engaged sstable_set
// starting the search from `current_idx` (exclusive).
dht::token find_lowest_next_token(size_t current_idx) {
auto it = _tset._sstable_set_ids.upper_bound(current_idx);
if (it != _tset._sstable_set_ids.end()) {
return _tset.first_token_of(*it);
}
return dht::maximum_token();
}
};
mutation_reader
tablet_sstable_set::create_single_key_sstable_reader(
replica::column_family* cf,
schema_ptr schema,
reader_permit permit,
utils::estimated_histogram& sstable_histogram,
const dht::partition_range& pr,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate,
sstables::integrity_check integrity) const {
// The singular partition_range start bound must be engaged.
auto idx = group_of(pr.start()->value().token());
const auto& set = find_sstable_set(idx);
return set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
}
sstables::sstable_set_impl::selector_and_schema_t tablet_sstable_set::make_incremental_selector() const {
return std::make_tuple(std::make_unique<tablet_incremental_selector>(*this), *_schema);
}
} // namespace replica