Files
scylladb/locator/tablets.cc
Yaniv Michael Kaul 37004ac4dc locator/tablets: reformat code style
Reformat indentation, brace placement, lambda formatting, and
line wrapping for consistency.

The seastar logger already checks is_enabled() before formatting
arguments, so explicit guards around debug calls with simple
variable arguments are unnecessary.
AI-assisted: OpenCode / Claude Opus 4.6
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
2026-03-24 18:30:40 +02:00

1940 lines
80 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "locator/network_topology_strategy.hh"
#include "locator/tablet_replication_strategy.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "locator/tablet_sharder.hh"
#include "locator/token_range_splitter.hh"
#include "db/system_keyspace.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "utils/stall_free.hh"
#include "utils/rjson.hh"
#include "gms/feature_service.hh"
#include <algorithm>
#include <flat_set>
#include <iterator>
#include <chrono>
#include <fmt/ranges.h>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <type_traits>
namespace locator {
seastar::logger tablet_logger("tablets");
std::optional<std::pair<tablet_id, tablet_id>> tablet_map::sibling_tablets(tablet_id t) const {
if (tablet_count() == 1) {
return std::nullopt;
}
auto first_sibling = tablet_id(t.value() & ~0x1);
return std::make_pair(first_sibling, *next_tablet(first_sibling));
}
static write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) {
switch (stage) {
case tablet_transition_stage::allow_write_both_read_old:
return write_replica_set_selector::previous;
case tablet_transition_stage::write_both_read_old:
return write_replica_set_selector::both;
case tablet_transition_stage::write_both_read_old_fallback_cleanup:
return write_replica_set_selector::both;
case tablet_transition_stage::streaming:
return write_replica_set_selector::both;
case tablet_transition_stage::rebuild_repair:
return write_replica_set_selector::both;
case tablet_transition_stage::repair:
return write_replica_set_selector::previous;
case tablet_transition_stage::end_repair:
return write_replica_set_selector::previous;
case tablet_transition_stage::write_both_read_new:
return write_replica_set_selector::both;
case tablet_transition_stage::use_new:
return write_replica_set_selector::next;
case tablet_transition_stage::cleanup:
return write_replica_set_selector::next;
case tablet_transition_stage::cleanup_target:
return write_replica_set_selector::previous;
case tablet_transition_stage::revert_migration:
return write_replica_set_selector::previous;
case tablet_transition_stage::end_migration:
return write_replica_set_selector::next;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
static read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) {
switch (stage) {
case tablet_transition_stage::allow_write_both_read_old:
return read_replica_set_selector::previous;
case tablet_transition_stage::write_both_read_old:
return read_replica_set_selector::previous;
case tablet_transition_stage::write_both_read_old_fallback_cleanup:
return read_replica_set_selector::previous;
case tablet_transition_stage::streaming:
return read_replica_set_selector::previous;
case tablet_transition_stage::rebuild_repair:
return read_replica_set_selector::previous;
case tablet_transition_stage::repair:
return read_replica_set_selector::previous;
case tablet_transition_stage::end_repair:
return read_replica_set_selector::previous;
case tablet_transition_stage::write_both_read_new:
return read_replica_set_selector::next;
case tablet_transition_stage::use_new:
return read_replica_set_selector::next;
case tablet_transition_stage::cleanup:
return read_replica_set_selector::next;
case tablet_transition_stage::cleanup_target:
return read_replica_set_selector::previous;
case tablet_transition_stage::revert_migration:
return read_replica_set_selector::previous;
case tablet_transition_stage::end_migration:
return read_replica_set_selector::next;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_transition_kind transition, tablet_replica_set next,
std::optional<tablet_replica> pending_replica, service::session_id session_id)
: stage(stage)
, transition(transition)
, next(std::move(next))
, pending_replica(std::move(pending_replica))
, session_id(session_id)
, writes(get_selector_for_writes(stage))
, reads(get_selector_for_reads(stage)) {
}
tablet_migration_streaming_info get_migration_streaming_info(const locator::topology& topo, const tablet_info& tinfo, const tablet_migration_info& trinfo) {
return get_migration_streaming_info(topo, tinfo, migration_to_transition_info(tinfo, trinfo));
}
tablet_migration_streaming_info get_migration_streaming_info(const locator::topology& topo, const tablet_info& tinfo, const tablet_transition_info& trinfo) {
tablet_migration_streaming_info result;
switch (trinfo.transition) {
case tablet_transition_kind::intranode_migration:
[[fallthrough]];
case tablet_transition_kind::migration:
result.read_from = substract_sets(tinfo.replicas, trinfo.next);
result.written_to = substract_sets(trinfo.next, tinfo.replicas);
return result;
case tablet_transition_kind::rebuild:
if (!trinfo.pending_replica.has_value()) {
return result; // No nodes to stream to -> no nodes to stream from
}
result.written_to.insert(*trinfo.pending_replica);
result.read_from = std::unordered_set<tablet_replica>(trinfo.next.begin(), trinfo.next.end());
result.read_from.erase(*trinfo.pending_replica);
erase_if(result.read_from, [&](const tablet_replica& r) {
auto* n = topo.find_node(r.host);
return !n || n->is_excluded();
});
return result;
case tablet_transition_kind::rebuild_v2: {
if (!trinfo.pending_replica.has_value()) {
return result; // No nodes to stream to -> no nodes to stream from
}
auto s = std::unordered_set<tablet_replica>(tinfo.replicas.begin(), tinfo.replicas.end());
erase_if(s, [&](const tablet_replica& r) {
auto* n = topo.find_node(r.host);
return !n || n->is_excluded();
});
result.stream_weight = locator::tablet_migration_stream_weight_repair;
result.read_from = s;
result.written_to = std::move(s);
result.written_to.insert(*trinfo.pending_replica);
return result;
}
case tablet_transition_kind::repair:
auto s = std::unordered_set<tablet_replica>(tinfo.replicas.begin(), tinfo.replicas.end());
result.stream_weight = locator::tablet_migration_stream_weight_repair;
result.read_from = s;
result.written_to = std::move(s);
return result;
}
on_internal_error(tablet_logger, format("Invalid tablet transition kind: {}", static_cast<int>(trinfo.transition)));
}
bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info& tinfo) {
for (const auto& r : tinfo.replicas) {
auto* n = topo.find_node(r.host);
if (!n || n->is_excluded()) {
return true;
}
}
return false;
}
tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info, tablet_task_info migration_task_info,
int64_t sstables_repaired_at)
: replicas(std::move(replicas))
, repair_time(repair_time)
, repair_task_info(std::move(repair_task_info))
, migration_task_info(std::move(migration_task_info))
, sstables_repaired_at(sstables_repaired_at) {
}
tablet_info::tablet_info(tablet_replica_set replicas)
: tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}, tablet_task_info{}, int64_t(0)) {
}
std::optional<tablet_info> merge_tablet_info(tablet_info a, tablet_info b) {
auto repair_task_info = tablet_task_info::merge_repair_tasks(a.repair_task_info, b.repair_task_info);
if (!repair_task_info) {
return {};
}
auto sorted = [](tablet_replica_set rs) {
std::ranges::sort(rs, std::less<tablet_replica>());
return rs;
};
if (sorted(a.replicas) != sorted(b.replicas)) {
return {};
}
auto repair_time = std::max(a.repair_time, b.repair_time);
int64_t sstables_repaired_at = std::max(a.sstables_repaired_at, b.sstables_repaired_at);
auto info = tablet_info(std::move(a.replicas), repair_time, *repair_task_info, a.migration_task_info, sstables_repaired_at);
return info;
}
std::optional<tablet_replica> get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
auto leaving = substract_sets(tinfo.replicas, trinfo.next);
if (leaving.empty()) {
return {};
}
if (leaving.size() > 1) {
throw std::runtime_error(format("More than one leaving replica"));
}
return *leaving.begin();
}
bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tablet_transition_info& trinfo) {
if (replica == locator::get_leaving_replica(tinfo, trinfo)) {
// we do tablet cleanup on the leaving replica in the `cleanup` stage, after which there is only the `end_migration` stage.
return trinfo.stage == locator::tablet_transition_stage::end_migration;
}
if (replica == trinfo.pending_replica) {
// we do tablet cleanup on the pending replica in the `cleanup_target` stage, after which there is only the `revert_migration` stage.
return trinfo.stage == locator::tablet_transition_stage::revert_migration;
}
return false;
}
tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migration_info& mig) {
return replace_replica(tinfo.replicas, mig.src, mig.dst);
}
tablet_replica_set get_primary_replicas(
const locator::tablet_map& tablet_map, tablet_id tid, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
const auto& info = tablet_map.get_tablet_info(tid);
const auto* transition = tablet_map.get_tablet_transition_info(tid);
auto write_selector = [&] {
if (!transition) {
return write_replica_set_selector::previous;
}
return transition->writes;
};
auto primary = [tid, filter = std::move(filter), &topo](tablet_replica_set set) -> std::optional<tablet_replica> {
return maybe_get_primary_replica(tid, set, topo, filter);
};
auto add = [](tablet_replica r1, tablet_replica r2) -> tablet_replica_set {
// if primary replica is not the one leaving, then only primary will be streamed to.
return (r1 == r2) ? tablet_replica_set{r1} : tablet_replica_set{r1, r2};
};
switch (write_selector()) {
case write_replica_set_selector::previous: {
auto primary_opt = primary(info.replicas);
return primary_opt.has_value() ? tablet_replica_set{primary_opt.value()} : tablet_replica_set{};
}
case write_replica_set_selector::both: {
auto previous_primary = primary(info.replicas);
auto next_primary = primary(transition->next);
if (previous_primary.has_value() && next_primary.has_value()) {
return add(previous_primary.value(), next_primary.value());
} else if (previous_primary.has_value()) {
return tablet_replica_set{previous_primary.value()};
} else if (next_primary.has_value()) {
return tablet_replica_set{next_primary.value()};
} else {
return tablet_replica_set{};
}
}
case write_replica_set_selector::next: {
auto primary_opt = primary(transition->next);
return primary_opt.has_value() ? tablet_replica_set{primary_opt.value()} : tablet_replica_set{};
}
}
}
tablet_transition_info migration_to_transition_info(const tablet_info& ti, const tablet_migration_info& mig) {
return tablet_transition_info{tablet_transition_stage::allow_write_both_read_old, mig.kind, get_new_replicas(ti, mig), mig.dst};
}
no_such_tablet_map::no_such_tablet_map(const table_id& id)
: runtime_error{fmt::format("Tablet map not found for table {}", id)} {
}
const tablet_map& tablet_metadata::get_tablet_map(table_id id) const {
try {
return *_tablets.at(id);
} catch (const std::out_of_range&) {
throw_with_backtrace<no_such_tablet_map>(id);
}
}
future<tablet_metadata::tablet_map_ptr> tablet_metadata::get_tablet_map_ptr(table_id id) const {
try {
// lightweight since it only copies the shared ptr, not the map itself.
co_return co_await _tablets.at(id).copy();
} catch (const std::out_of_range&) {
throw_with_backtrace<no_such_tablet_map>(id);
}
}
bool tablet_metadata::has_tablet_map(table_id id) const {
return _tablets.contains(id);
}
table_id tablet_metadata::get_base_table(table_id id) const {
if (auto it = _base_table.find(id); it != _base_table.end()) {
return it->second;
} else {
return id;
}
}
bool tablet_metadata::is_base_table(table_id id) const {
return !_base_table.contains(id);
}
future<> tablet_metadata::mutate_tablet_map_async(table_id id, noncopyable_function<future<>(tablet_map&)> func) {
auto it = _tablets.find(id);
if (it == _tablets.end()) {
throw no_such_tablet_map(id);
}
auto tablet_map_copy = make_lw_shared<tablet_map>(co_await it->second->clone_gently());
co_await func(*tablet_map_copy);
auto new_map_ptr = lw_shared_ptr<const tablet_map>(std::move(tablet_map_copy));
// share the tablet map with all co-located tables
for (auto colocated_id : _table_groups.at(id)) {
if (colocated_id != id) {
_tablets[colocated_id] = make_foreign(new_map_ptr);
}
}
it->second = make_foreign(std::move(new_map_ptr));
}
future<tablet_metadata> tablet_metadata::copy() const {
tablet_metadata copy;
for (const auto& e : _tablets) {
copy._tablets.emplace(e.first, co_await e.second.copy());
}
copy._table_groups = _table_groups;
copy._base_table = _base_table;
copy._balancing_enabled = _balancing_enabled;
co_return copy;
}
void tablet_metadata::set_tablet_map(table_id id, tablet_map map) {
auto map_ptr = make_lw_shared<const tablet_map>(std::move(map));
if (auto it = _table_groups.find(id); it == _table_groups.end()) {
_table_groups[id] = {id};
} else {
for (auto colocated_id : it->second) {
if (colocated_id != id) {
_tablets[colocated_id] = map_ptr;
}
}
}
auto it = _tablets.find(id);
if (it == _tablets.end()) {
_tablets.emplace(id, std::move(map_ptr));
} else {
it->second = std::move(map_ptr);
}
}
future<> tablet_metadata::set_colocated_table(table_id id, table_id base_id) {
if (auto it = _table_groups.find(id); it != _table_groups.end()) {
// Allow changing a base table to be a co-located table of another base table, if it doesn't have any other co-located tables.
// This shouldn't be used normally except for unit tests.
tablet_logger.warn("Changing base table {} to be a co-located table of another base table {}. This should be used only in tests.", id, base_id);
if (it->second.size() > 1) {
on_internal_error(tablet_logger,
format("Table {} is already a base table for {} and cannot be set as a co-located table of another base table.", id, it->second));
}
_table_groups.erase(it);
}
if (!_table_groups.contains(base_id)) {
on_internal_error(tablet_logger, format("Trying to set co-located table {} with base table {} but it's not a base table.", id, base_id));
}
if (auto it = _base_table.find(id); it == _base_table.end()) {
_base_table[id] = base_id;
_table_groups[base_id].push_back(id);
if (!_tablets.contains(base_id)) {
on_internal_error(tablet_logger, format("Base table {} of co-located table {} does not have a tablet map", base_id, id));
}
auto map_ptr = co_await _tablets.at(base_id).copy();
_tablets[id] = std::move(map_ptr);
} else if (it->second != base_id) {
on_internal_error(tablet_logger, format("Cannot set base table {} for table {} because it already has base table {}", base_id, id, it->second));
}
}
void tablet_metadata::drop_tablet_map(table_id id) {
if (auto it = _base_table.find(id); it != _base_table.end()) {
// it's a co-located table. We need to remove it from the base table's colocated tables list.
auto base_id = it->second;
if (auto group_it = _table_groups.find(base_id); group_it != _table_groups.end()) {
auto& tables = group_it->second;
tables.erase(std::remove(tables.begin(), tables.end(), id), tables.end());
if (tables.empty()) {
_table_groups.erase(group_it);
}
}
_base_table.erase(it);
}
_table_groups.erase(id);
_tablets.erase(id);
}
future<> tablet_metadata::clear_gently() {
tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this));
// First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs
// on this shard. We don't use sharded<> here since it will require a similar
// submit_to to each shard owner per tablet-map.
std::vector<std::vector<tablet_map_ptr>> tablet_maps_per_shard;
tablet_maps_per_shard.resize(smp::count);
for (auto& [_, map_ptr] : _tablets) {
tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr));
}
_tablets.clear();
// Now destroy the foreign tablet map pointers on each shard.
co_await smp::invoke_on_all([&] -> future<> {
for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) {
auto map = map_ptr.release();
co_await utils::clear_gently(map);
}
});
co_await utils::clear_gently(_table_groups);
co_await utils::clear_gently(_base_table);
co_return;
}
bool tablet_metadata::operator==(const tablet_metadata& o) const {
if (_tablets.size() != o._tablets.size()) {
return false;
}
for (const auto& [k, v] : _tablets) {
const auto it = o._tablets.find(k);
if (it == o._tablets.end() || *v != *it->second) {
return false;
}
}
return true;
}
tablet_map::tablet_map(size_t tablet_count, bool with_raft_info)
: _log2_tablets(log2ceil(tablet_count)) {
if (tablet_count != 1ul << _log2_tablets) {
on_internal_error(tablet_logger, format("Tablet count not a power of 2: {}", tablet_count));
}
_tablets.resize(tablet_count);
if (with_raft_info) {
_raft_info.resize(tablet_count);
}
}
tablet_map tablet_map::clone() const {
return tablet_map(_tablets, _log2_tablets, _transitions, _resize_decision, _resize_task_info, _repair_scheduler_config, _raft_info);
}
future<tablet_map> tablet_map::clone_gently() const {
tablet_container tablets;
tablets.reserve(_tablets.size());
for (const auto& t : _tablets) {
tablets.emplace_back(t);
co_await coroutine::maybe_yield();
}
transitions_map transitions;
transitions.reserve(_transitions.size());
for (const auto& [id, trans] : _transitions) {
transitions.emplace(id, trans);
co_await coroutine::maybe_yield();
}
raft_info_container raft_info;
raft_info.reserve(_raft_info.size());
for (const auto& i : _raft_info) {
raft_info.emplace_back(i);
co_await coroutine::maybe_yield();
}
co_return tablet_map(
std::move(tablets), _log2_tablets, std::move(transitions), _resize_decision, _resize_task_info, _repair_scheduler_config, std::move(raft_info));
}
void tablet_map::check_tablet_id(tablet_id id) const {
if (size_t(id) >= tablet_count()) {
throw std::logic_error(format("Invalid tablet id: {} >= {}", id, tablet_count()));
}
}
const tablet_info& tablet_map::get_tablet_info(tablet_id id) const {
check_tablet_id(id);
return _tablets[size_t(id)];
}
tablet_id tablet_map::get_tablet_id(token t) const {
return tablet_id(dht::compaction_group_of(_log2_tablets, t));
}
tablet_range_side tablet_map::get_tablet_range_side(token t) const {
auto id_after_split = dht::compaction_group_of(_log2_tablets + 1, t);
return tablet_range_side(id_after_split & 0x1);
}
std::pair<tablet_id, tablet_range_side> tablet_map::get_tablet_id_and_range_side(token t) const {
auto id_after_split = dht::compaction_group_of(_log2_tablets + 1, t);
auto current_id = id_after_split >> 1;
return {tablet_id(current_id), tablet_range_side(id_after_split & 0x1)};
}
dht::token tablet_map::get_last_token(tablet_id id, size_t log2_tablets) const {
return dht::last_token_of_compaction_group(log2_tablets, size_t(id));
}
dht::token tablet_map::get_last_token(tablet_id id) const {
check_tablet_id(id);
return get_last_token(id, _log2_tablets);
}
dht::token tablet_map::get_first_token(tablet_id id) const {
if (id == first_tablet()) {
return dht::first_token();
} else {
return dht::next_token(get_last_token(tablet_id(size_t(id) - 1)));
}
}
dht::token_range tablet_map::get_token_range(tablet_id id, size_t log2_tablets) const {
if (id == first_tablet()) {
return dht::token_range::make({dht::minimum_token(), false}, {get_last_token(id, log2_tablets), true});
} else {
return dht::token_range::make({get_last_token(tablet_id(size_t(id) - 1), log2_tablets), false}, {get_last_token(id, log2_tablets), true});
}
}
dht::token_range tablet_map::get_token_range(tablet_id id) const {
check_tablet_id(id);
return get_token_range(id, _log2_tablets);
}
dht::token_range tablet_map::get_token_range_after_split(const token& t) const noexcept {
// when the tablets are split, the tablet count doubles, (i.e.) _log2_tablets increases by 1
const auto log2_tablets_after_split = _log2_tablets + 1;
auto id_after_split = tablet_id(dht::compaction_group_of(log2_tablets_after_split, t));
return get_token_range(id_after_split, log2_tablets_after_split);
}
auto tablet_replica_comparator(const locator::topology& topo) {
return [&topo](const tablet_replica& a, const tablet_replica& b) {
const auto loc_a = topo.get_location(a.host);
const auto loc_b = topo.get_location(b.host);
if (loc_a.dc != loc_b.dc) {
return loc_a.dc < loc_b.dc;
}
if (loc_a.rack != loc_b.rack) {
return loc_a.rack < loc_b.rack;
}
return a.host < b.host;
};
}
std::optional<tablet_replica> maybe_get_primary_replica(
tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
tablet_replica_set replica_set_copy = replica_set;
std::ranges::sort(replica_set_copy, tablet_replica_comparator(topo));
const auto replicas = replica_set_copy | std::views::filter(std::move(filter)) | std::ranges::to<tablet_replica_set>();
if (replicas.empty()) {
tablet_logger.debug("No replicas in scope for tablet {}", id);
return std::nullopt;
}
// Once the filter was pushed down, we got here a set of replicas which live within the selected scope.
// replicas[ (id + id / size) % size ] is used to distribute the load evenly across all replicas in the scope.
// Let's take for instance a cluster with 1 dc, 3 racks, 9 nodes, rf=3, scope=dc. The `replicas` array here will contain 3
// replicas - one from each rack.
// Tablet id=0 will end up choosing replicas[0]
// Tablet id=1 will end up choosing replicas[1]
// Tablet id=2 will end up choosing replicas[2]
// We want tablet id=3 to choose replicas[1] and subsequently tablet id=6 to pick replicas[2] to ensure even distribution.
auto primary = replicas.at((size_t(id) + size_t(id) / replicas.size()) % replicas.size());
tablet_logger.debug("Primary replica of tablet {} is {}: replicas in scope: {}", id, primary, fmt::join(replicas, ","));
return primary;
}
tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topology& topo) const {
const auto& replicas = get_tablet_info(id).replicas;
return maybe_get_primary_replica(id, replicas, topo, [&](const auto& _) {
return true;
}).value();
}
tablet_replica tablet_map::get_secondary_replica(tablet_id id, const locator::topology& topo) const {
const auto& orig_replicas = get_tablet_info(id).replicas;
if (orig_replicas.size() < 2) {
throw std::runtime_error(format("No secondary replica for tablet id {}", id));
}
tablet_replica_set replicas = orig_replicas;
std::ranges::sort(replicas, tablet_replica_comparator(topo));
// This formula must match the one in get_primary_replica(),
// just with + 1.
return replicas.at((size_t(id) + size_t(id) / replicas.size() + 1) % replicas.size());
}
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, topo, [&](const auto& tr) {
return tablet_task_info.selected_by_filters(tr, topo);
});
}
future<utils::chunked_vector<token>> tablet_map::get_sorted_tokens() const {
utils::chunked_vector<token> tokens;
tokens.reserve(tablet_count());
for (auto id : tablet_ids()) {
tokens.push_back(get_last_token(id));
co_await coroutine::maybe_yield();
}
co_return tokens;
}
void tablet_map::set_tablet(tablet_id id, tablet_info info) {
check_tablet_id(id);
_tablets[size_t(id)] = std::move(info);
}
void tablet_map::set_tablet_transition_info(tablet_id id, tablet_transition_info info) {
check_tablet_id(id);
_transitions.insert_or_assign(id, std::move(info));
}
void tablet_map::set_resize_decision(locator::resize_decision decision) {
_resize_decision = std::move(decision);
}
void tablet_map::set_resize_task_info(tablet_task_info task_info) {
_resize_task_info = std::move(task_info);
}
void tablet_map::set_repair_scheduler_config(std::optional<locator::repair_scheduler_config> config) {
_repair_scheduler_config = std::move(config);
}
void tablet_map::clear_tablet_transition_info(tablet_id id) {
check_tablet_id(id);
_transitions.erase(id);
}
future<> tablet_map::for_each_tablet(seastar::noncopyable_function<future<>(tablet_id, const tablet_info&)> func) const {
std::optional<tablet_id> tid = first_tablet();
for (const tablet_info& ti : tablets()) {
co_await func(*tid, ti);
tid = next_tablet(*tid);
}
}
future<> tablet_map::for_each_sibling_tablets(seastar::noncopyable_function<future<>(tablet_desc, std::optional<tablet_desc>)> func) const {
auto make_desc = [this](tablet_id tid) {
return tablet_desc{tid, &get_tablet_info(tid), get_tablet_transition_info(tid)};
};
if (_tablets.size() == 1) {
co_return co_await func(make_desc(first_tablet()), std::nullopt);
}
for (std::optional<tablet_id> tid = first_tablet(); tid; tid = next_tablet(*tid)) {
auto tid1 = tid;
auto tid2 = tid = next_tablet(*tid);
if (!tid2) {
// Cannot happen with power-of-two invariant.
throw std::logic_error(format("Cannot retrieve sibling tablet with tablet count {}", tablet_count()));
}
co_await func(make_desc(*tid1), make_desc(*tid2));
}
}
void tablet_map::clear_transitions() {
_transitions.clear();
}
bool tablet_map::has_replica(tablet_id tid, tablet_replica r) const {
auto& tinfo = get_tablet_info(tid);
if (contains(tinfo.replicas, r)) {
return true;
}
auto* trinfo = get_tablet_transition_info(tid);
if (trinfo && contains(trinfo->next, r)) {
return true;
}
return false;
}
future<> tablet_map::clear_gently() {
return utils::clear_gently(_tablets);
}
const tablet_transition_info* tablet_map::get_tablet_transition_info(tablet_id id) const {
auto i = _transitions.find(id);
if (i == _transitions.end()) {
return nullptr;
}
return &i->second;
}
const tablet_raft_info& tablet_map::get_tablet_raft_info(tablet_id id) const {
check_tablet_id(id);
if (_raft_info.empty()) {
on_internal_error(tablet_logger, "Tablet map doesn't have raft info");
}
return _raft_info[size_t(id)];
}
void tablet_map::set_tablet_raft_info(tablet_id id, tablet_raft_info raft_info) {
check_tablet_id(id);
if (_raft_info.empty()) {
on_internal_error(tablet_logger, format("Tablet map has no raft info, tablet_id {}, group_id {}", id, raft_info.group_id));
}
_raft_info[size_t(id)] = std::move(raft_info);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<tablet_transition_stage, sstring> tablet_transition_stage_to_name = {
{tablet_transition_stage::allow_write_both_read_old, "allow_write_both_read_old"},
{tablet_transition_stage::write_both_read_old, "write_both_read_old"},
{tablet_transition_stage::write_both_read_old_fallback_cleanup, "write_both_read_old_fallback_cleanup"},
{tablet_transition_stage::write_both_read_new, "write_both_read_new"},
{tablet_transition_stage::streaming, "streaming"},
{tablet_transition_stage::rebuild_repair, "rebuild_repair"},
{tablet_transition_stage::repair, "repair"},
{tablet_transition_stage::end_repair, "end_repair"},
{tablet_transition_stage::use_new, "use_new"},
{tablet_transition_stage::cleanup, "cleanup"},
{tablet_transition_stage::cleanup_target, "cleanup_target"},
{tablet_transition_stage::revert_migration, "revert_migration"},
{tablet_transition_stage::end_migration, "end_migration"},
};
static const std::unordered_map<sstring, tablet_transition_stage> tablet_transition_stage_from_name = std::invoke([] {
std::unordered_map<sstring, tablet_transition_stage> result;
for (auto&& [v, s] : tablet_transition_stage_to_name) {
result.emplace(s, v);
}
return result;
});
tablet_transition_kind choose_rebuild_transition_kind(const gms::feature_service& features) {
return features.repair_based_tablet_rebuild ? tablet_transition_kind::rebuild_v2 : tablet_transition_kind::rebuild;
}
sstring tablet_transition_stage_to_string(tablet_transition_stage stage) {
auto i = tablet_transition_stage_to_name.find(stage);
if (i == tablet_transition_stage_to_name.end()) {
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
return i->second;
}
tablet_transition_stage tablet_transition_stage_from_string(const sstring& name) {
return tablet_transition_stage_from_name.at(name);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<tablet_transition_kind, sstring> tablet_transition_kind_to_name = {
{tablet_transition_kind::migration, "migration"},
{tablet_transition_kind::intranode_migration, "intranode_migration"},
{tablet_transition_kind::rebuild, "rebuild"},
{tablet_transition_kind::rebuild_v2, "rebuild_v2"},
{tablet_transition_kind::repair, "repair"},
};
static const std::unordered_map<sstring, tablet_transition_kind> tablet_transition_kind_from_name = std::invoke([] {
std::unordered_map<sstring, tablet_transition_kind> result;
for (auto&& [v, s] : tablet_transition_kind_to_name) {
result.emplace(s, v);
}
return result;
});
sstring tablet_transition_kind_to_string(tablet_transition_kind kind) {
auto i = tablet_transition_kind_to_name.find(kind);
if (i == tablet_transition_kind_to_name.end()) {
on_internal_error(tablet_logger, format("Invalid tablet transition kind: {}", static_cast<int>(kind)));
}
return i->second;
}
tablet_transition_kind tablet_transition_kind_from_string(const sstring& name) {
return tablet_transition_kind_from_name.at(name);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<tablet_task_type, sstring> tablet_task_type_to_name = {
{locator::tablet_task_type::none, "none"},
{locator::tablet_task_type::user_repair, "user_repair"},
{locator::tablet_task_type::auto_repair, "auto_repair"},
{locator::tablet_task_type::migration, "migration"},
{locator::tablet_task_type::intranode_migration, "intranode_migration"},
{locator::tablet_task_type::split, "split"},
{locator::tablet_task_type::merge, "merge"},
};
static const std::unordered_map<sstring, tablet_task_type> tablet_task_type_from_name = std::invoke([] {
std::unordered_map<sstring, tablet_task_type> result;
for (auto&& [v, s] : tablet_task_type_to_name) {
result.emplace(s, v);
}
return result;
});
sstring tablet_task_type_to_string(tablet_task_type kind) {
auto i = tablet_task_type_to_name.find(kind);
if (i == tablet_task_type_to_name.end()) {
on_internal_error(tablet_logger, format("Invalid tablet task type: {}", static_cast<int>(kind)));
}
return i->second;
}
tablet_task_type tablet_task_type_from_string(const sstring& name) {
return tablet_task_type_from_name.at(name);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<locator::tablet_repair_incremental_mode, sstring> tablet_repair_incremental_mode_to_name = {
{locator::tablet_repair_incremental_mode::disabled, "disabled"},
{locator::tablet_repair_incremental_mode::incremental, "incremental"},
{locator::tablet_repair_incremental_mode::full, "full"},
};
static const std::unordered_map<sstring, tablet_repair_incremental_mode> tablet_repair_incremental_mode_from_name = std::invoke([] {
std::unordered_map<sstring, tablet_repair_incremental_mode> result;
for (auto&& [v, s] : tablet_repair_incremental_mode_to_name) {
result.emplace(s, v);
}
return result;
});
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode kind) {
auto i = tablet_repair_incremental_mode_to_name.find(kind);
if (i == tablet_repair_incremental_mode_to_name.end()) {
on_internal_error(tablet_logger, format("Invalid tablet repair incremental mode: {}", static_cast<int>(kind)));
}
return i->second;
}
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring& name) {
return tablet_repair_incremental_mode_from_name.at(name);
}
size_t tablet_map::external_memory_usage() const {
size_t result = _tablets.external_memory_usage();
for (auto&& tablet : _tablets) {
result += tablet.replicas.external_memory_usage();
}
return result;
}
bool resize_decision::operator==(const resize_decision& o) const {
return way.index() == o.way.index() && sequence_number == o.sequence_number;
}
bool tablet_map::needs_split() const {
return std::holds_alternative<resize_decision::split>(_resize_decision.way);
}
bool tablet_map::needs_merge() const {
return std::holds_alternative<resize_decision::merge>(_resize_decision.way);
}
const locator::resize_decision& tablet_map::resize_decision() const {
return _resize_decision;
}
const tablet_task_info& tablet_map::resize_task_info() const {
return _resize_task_info;
}
const std::optional<locator::repair_scheduler_config> tablet_map::get_repair_scheduler_config() const {
return _repair_scheduler_config;
}
static auto to_resize_type(sstring decision) {
static const std::unordered_map<sstring, decltype(resize_decision::way)> string_to_type = {
{"none", resize_decision::none{}},
{"split", resize_decision::split{}},
{"merge", resize_decision::merge{}},
};
return string_to_type.at(decision);
}
resize_decision::resize_decision(sstring decision, uint64_t seq_number)
: way(to_resize_type(decision))
, sequence_number(seq_number) {
}
sstring resize_decision::type_name() const {
return fmt::format("{}", way);
}
resize_decision::seq_number_t resize_decision::next_sequence_number() const {
// Doubt we'll ever wrap around, but just in case.
// Even if sequence number is bumped every second, it would take 292471208677 years
// for it to happen, about 21x the age of the universe, or ~11x according to the new
// prediction after james webb.
return (sequence_number == std::numeric_limits<seq_number_t>::max()) ? 0 : sequence_number + 1;
}
table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept {
size_in_bytes = size_in_bytes + s.size_in_bytes;
split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number);
return *this;
}
uint64_t tablet_load_stats::add_tablet_sizes(const tablet_load_stats& tls) {
uint64_t table_sizes_sum = 0;
for (auto& [table, sizes] : tls.tablet_sizes) {
for (auto& [range, tablet_size] : sizes) {
tablet_sizes[table][range] = tablet_size;
table_sizes_sum += tablet_size;
}
}
return table_sizes_sum;
}
load_stats load_stats::from_v1(load_stats_v1&& stats) {
return {.tables = std::move(stats.tables)};
}
load_stats& load_stats::operator+=(const load_stats& s) {
for (auto& [id, stats] : s.tables) {
tables[id] += stats;
}
for (auto& [host, cap] : s.capacity) {
capacity[host] = cap;
}
for (auto& [host, cdu] : s.critical_disk_utilization) {
critical_disk_utilization[host] = cdu;
}
for (auto& [host, tablet_ls] : s.tablet_stats) {
tablet_stats[host].effective_capacity = tablet_ls.effective_capacity;
tablet_stats[host].add_tablet_sizes(tablet_ls);
}
return *this;
}
std::optional<uint64_t> load_stats::get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const {
if (auto host_i = tablet_stats.find(host); host_i != tablet_stats.end()) {
auto& sizes_per_table = host_i->second.tablet_sizes;
if (auto table_i = sizes_per_table.find(rb_tid.table); table_i != sizes_per_table.end()) {
auto& tablet_sizes = table_i->second;
if (auto size_i = tablet_sizes.find(rb_tid.range); size_i != tablet_sizes.end()) {
return size_i->second;
}
}
}
tablet_logger.debug("Unable to find tablet size on host: {} for tablet: {}", host, rb_tid);
return std::nullopt;
}
std::optional<uint64_t> load_stats::get_tablet_size_in_transition(
host_id host, const range_based_tablet_id& rb_tid, const tablet_info& ti, const tablet_transition_info* trinfo) const {
std::optional<uint64_t> tablet_size_opt;
tablet_size_opt = get_tablet_size(host, rb_tid);
if (tablet_size_opt) {
return tablet_size_opt;
}
// If the tablet is in transition,
// try to find it on the leaving replica, in case of tablet migration,
// or get the avg tablet size of all the replicas, in case we have a rebuild
if (trinfo) {
switch (trinfo->transition) {
case tablet_transition_kind::migration:
// Search for the tablet size on leaving replica
if (trinfo->pending_replica && trinfo->pending_replica->host == host) {
if (auto leaving_replica = get_leaving_replica(ti, *trinfo)) {
tablet_size_opt = get_tablet_size(leaving_replica->host, rb_tid);
} else {
on_internal_error_noexcept(tablet_logger, ::format("No leaving replica for tablet migration in table {}. ti.replicas: {} trinfo->next: {}",
rb_tid.table, ti.replicas, trinfo->next));
}
}
break;
case tablet_transition_kind::rebuild:
[[fallthrough]];
case tablet_transition_kind::rebuild_v2: {
// Get the avg tablet size from the available replicas
size_t replica_count = 0;
uint64_t tablet_size_sum = 0;
for (auto& replica : ti.replicas) {
auto new_tablet_size_opt = get_tablet_size(replica.host, rb_tid);
if (new_tablet_size_opt) {
tablet_size_sum += *new_tablet_size_opt;
replica_count++;
}
}
if (replica_count) {
tablet_size_opt = tablet_size_sum / replica_count;
}
break;
}
case tablet_transition_kind::intranode_migration:
[[fallthrough]];
case tablet_transition_kind::repair:
break;
}
}
return tablet_size_opt;
}
lw_shared_ptr<load_stats> load_stats::reconcile_tablets_resize(
const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const {
lw_shared_ptr<load_stats> reconciled_stats{make_lw_shared<load_stats>(*this)};
load_stats& new_stats = *reconciled_stats;
for (table_id table : tables) {
if (!new_tm.tablets().has_tablet_map(table)) {
// Table has been dropped, remove it from stats
for (auto& [host, tls] : new_stats.tablet_stats) {
tls.tablet_sizes.erase(table);
}
continue;
}
const auto& old_tmap = old_tm.tablets().get_tablet_map(table);
const auto& new_tmap = new_tm.tablets().get_tablet_map(table);
size_t old_tablet_count = old_tmap.tablet_count();
size_t new_tablet_count = new_tmap.tablet_count();
if (old_tablet_count == new_tablet_count * 2) {
// Reconcile for merge
for (size_t i = 0; i < old_tablet_count; i += 2) {
range_based_tablet_id rb_tid1{table, old_tmap.get_token_range(tablet_id(i))};
range_based_tablet_id rb_tid2{table, old_tmap.get_token_range(tablet_id(i + 1))};
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt1 = new_stats.get_tablet_size(replica.host, rb_tid1);
auto tablet_size_opt2 = new_stats.get_tablet_size(replica.host, rb_tid2);
if (!tablet_size_opt1 || !tablet_size_opt2) {
if (!tablet_size_opt1) {
tablet_logger.debug(
"Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid1, replica.host);
}
if (!tablet_size_opt2) {
tablet_logger.debug(
"Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid2, replica.host);
}
return nullptr;
}
dht::token_range new_range{new_tmap.get_token_range(tablet_id(i / 2))};
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
uint64_t merged_tablet_size = *tablet_size_opt1 + *tablet_size_opt2;
sizes_for_table[new_range] = merged_tablet_size;
sizes_for_table.erase(rb_tid1.range);
sizes_for_table.erase(rb_tid2.range);
}
}
} else if (old_tablet_count == new_tablet_count / 2) {
// Reconcile for split
for (size_t i = 0; i < old_tablet_count; i++) {
range_based_tablet_id rb_tid{table, old_tmap.get_token_range(tablet_id(i))};
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt = new_stats.get_tablet_size(replica.host, rb_tid);
if (!tablet_size_opt) {
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid, replica.host);
return nullptr;
}
dht::token_range new_range1{new_tmap.get_token_range(tablet_id(i * 2))};
dht::token_range new_range2{new_tmap.get_token_range(tablet_id(i * 2 + 1))};
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
uint64_t split_tablet_size = *tablet_size_opt / 2;
sizes_for_table[new_range1] = split_tablet_size;
sizes_for_table[new_range2] = split_tablet_size;
sizes_for_table.erase(rb_tid.range);
}
}
}
}
return reconciled_stats;
}
lw_shared_ptr<load_stats> load_stats::migrate_tablet_size(
locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) const {
lw_shared_ptr<load_stats> result;
if (leaving != pending) {
range_based_tablet_id rb_tid{gid.table, trange};
if (get_tablet_size(leaving, rb_tid) && !get_tablet_size(pending, rb_tid) && tablet_stats.contains(pending)) {
tablet_logger.debug("Moving tablet size for tablet: {} from: {} to: {}", gid, leaving, pending);
result = make_lw_shared<locator::load_stats>(*this);
auto& new_leaving_ts = result->tablet_stats.at(leaving);
auto& new_pending_ts = result->tablet_stats.at(pending);
auto map_node = new_leaving_ts.tablet_sizes.at(gid.table).extract(trange);
new_pending_ts.tablet_sizes[gid.table].insert(std::move(map_node));
if (new_leaving_ts.tablet_sizes.at(gid.table).empty()) {
new_leaving_ts.tablet_sizes.erase(gid.table);
}
}
}
return result;
}
tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
: _schema(std::move(schema))
, _ranges(ranges)
, _ranges_it(_ranges.begin()) {
// Filter all tablets and save only those that have a replica on the specified host.
for (auto tid = std::optional(tablets.first_tablet()); tid; tid = tablets.next_tablet(*tid)) {
const auto& tablet_info = tablets.get_tablet_info(*tid);
auto replica_it = std::ranges::find_if(tablet_info.replicas, [&](auto&& r) {
return r.host == host;
});
if (replica_it == tablet_info.replicas.end()) {
continue;
}
_tablet_ranges.emplace_back(range_split_result{replica_it->shard, dht::to_partition_range(tablets.get_token_range(*tid))});
}
_tablet_ranges_it = _tablet_ranges.begin();
}
std::optional<tablet_range_splitter::range_split_result> tablet_range_splitter::operator()() {
if (_ranges_it == _ranges.end() || _tablet_ranges_it == _tablet_ranges.end()) {
return {};
}
dht::ring_position_comparator cmp(*_schema);
while (_ranges_it != _ranges.end()) {
// First, skip all tablet-ranges that are completely before the current range.
while (_ranges_it->other_is_before(_tablet_ranges_it->range, cmp)) {
++_tablet_ranges_it;
}
// Generate intersections with all tablet-ranges that overlap with the current range.
if (auto intersection = _ranges_it->intersection(_tablet_ranges_it->range, cmp)) {
const auto shard = _tablet_ranges_it->shard;
if (_ranges_it->end() && cmp(_ranges_it->end()->value(), _tablet_ranges_it->range.end()->value()) < 0) {
// The current tablet range extends beyond the current range,
// move to the next range.
++_ranges_it;
} else {
// The current range extends beyond the current tablet range,
// move to the next tablet range.
++_tablet_ranges_it;
}
return range_split_result{shard, std::move(*intersection)};
}
// Current tablet-range is completely after the current range, move to the next range.
++_ranges_it;
}
return {};
}
// Estimates the external memory usage of std::unordered_map<>.
// Does not include external memory usage of elements.
template <typename K, typename V>
static size_t estimate_external_memory_usage(const std::unordered_map<K, V>& map) {
return map.bucket_count() * sizeof(void*) + map.size() * (sizeof(std::pair<const K, V>) + 8);
}
size_t tablet_metadata::external_memory_usage() const {
size_t result = estimate_external_memory_usage(_tablets);
for (auto&& [id, map] : _tablets) {
result += map->external_memory_usage();
}
return result;
}
bool tablet_metadata::has_replica_on(host_id host) const {
for (auto&& [id, map] : _tablets) {
for (auto&& tablet : map->tablet_ids()) {
auto& tinfo = map->get_tablet_info(tablet);
for (auto&& r : tinfo.replicas) {
if (r.host == host) {
return true;
}
}
auto* trinfo = map->get_tablet_transition_info(tablet);
if (trinfo && trinfo->pending_replica && trinfo->pending_replica->host == host) {
return true;
}
}
}
return false;
}
future<bool> check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host) {
bool valid = true;
for (const auto& [table, tmap] : tm.all_tables_ungrouped()) {
co_await tmap->for_each_tablet([this_host, &valid](locator::tablet_id tid, const tablet_info& tinfo) -> future<> {
for (const auto& replica : tinfo.replicas) {
if (replica.host == this_host) {
valid &= replica.shard < smp::count;
}
}
return make_ready_future<>();
});
if (!valid) {
break;
}
}
co_return valid;
}
class tablet_effective_replication_map : public effective_replication_map {
table_id _table;
tablet_sharder _sharder;
mutable const tablet_map* _tmap = nullptr;
private:
host_id_vector_replica_set to_host_set(const tablet_replica_set& replicas) const {
host_id_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(replica.host);
}
return result;
}
const tablet_map& get_tablet_map() const {
if (!_tmap) {
_tmap = &_tmptr->tablets().get_tablet_map(_table);
}
return *_tmap;
}
const tablet_replica_set& get_replicas_for_write(dht::token search_token) const {
auto&& tablets = get_tablet_map();
auto tablet = tablets.get_tablet_id(search_token);
auto* info = tablets.get_tablet_transition_info(tablet);
auto&& replicas = std::invoke([&]() -> const tablet_replica_set& {
if (!info) {
return tablets.get_tablet_info(tablet).replicas;
}
switch (info->writes) {
case write_replica_set_selector::previous:
[[fallthrough]];
case write_replica_set_selector::both:
return tablets.get_tablet_info(tablet).replicas;
case write_replica_set_selector::next: {
return info->next;
}
}
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
});
tablet_logger.trace("get_replicas_for_write({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
return replicas;
}
host_id_vector_topology_change get_pending_helper(const token& search_token) const {
auto&& tablets = get_tablet_map();
auto tablet = tablets.get_tablet_id(search_token);
auto&& info = tablets.get_tablet_transition_info(tablet);
if (!info || info->transition == tablet_transition_kind::intranode_migration) {
return {};
}
switch (info->writes) {
case write_replica_set_selector::previous:
return {};
case write_replica_set_selector::both: {
if (!info->pending_replica) {
return {};
}
tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}", search_token, _table, tablet, *info->pending_replica);
return {info->pending_replica->host};
}
case write_replica_set_selector::next:
return {};
}
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
}
host_id_vector_replica_set get_for_reading_helper(const token& search_token) const {
auto&& tablets = get_tablet_map();
auto tablet = tablets.get_tablet_id(search_token);
auto&& info = tablets.get_tablet_transition_info(tablet);
auto&& replicas = std::invoke([&]() -> const tablet_replica_set& {
if (!info) {
return tablets.get_tablet_info(tablet).replicas;
}
switch (info->reads) {
case read_replica_set_selector::previous:
return tablets.get_tablet_info(tablet).replicas;
case read_replica_set_selector::next: {
return info->next;
}
}
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->reads)));
});
tablet_logger.trace("get_endpoints_for_reading({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
return to_host_set(replicas);
}
public:
tablet_effective_replication_map(table_id table, replication_strategy_ptr rs, token_metadata_ptr tmptr, size_t replication_factor)
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
, _table(table)
, _sharder(*_tmptr, table) {
}
virtual ~tablet_effective_replication_map() = default;
virtual host_id_vector_replica_set get_replicas(const token& search_token, bool is_vnode = false) const override {
return to_host_set(get_replicas_for_write(search_token));
}
virtual host_id_vector_replica_set get_natural_replicas(const token& search_token, bool is_vnode = false) const override {
return to_host_set(get_replicas_for_write(search_token));
}
virtual future<dht::token_range_vector> get_ranges(host_id ep) const override {
dht::token_range_vector ret;
auto& tablet_map = get_tablet_map();
for (auto tablet_id : tablet_map.tablet_ids()) {
auto endpoints = get_natural_replicas(tablet_map.get_last_token(tablet_id));
auto should_add_range = std::find(std::begin(endpoints), std::end(endpoints), ep) != std::end(endpoints);
if (should_add_range) {
ret.push_back(tablet_map.get_token_range(tablet_id));
}
co_await coroutine::maybe_yield();
}
co_return ret;
}
virtual host_id_vector_topology_change get_pending_replicas(const token& search_token) const override {
return get_pending_helper(search_token);
}
virtual host_id_vector_replica_set get_replicas_for_reading(const token& search_token, bool is_vnode = false) const override {
return get_for_reading_helper(search_token);
}
std::optional<tablet_routing_info> check_locality(const token& search_token) const override {
auto&& tablets = get_tablet_map();
auto tid = tablets.get_tablet_id(search_token);
auto&& info = tablets.get_tablet_info(tid);
auto host = get_token_metadata().get_my_id();
auto shard = this_shard_id();
auto make_tablet_routing_info = [&] {
dht::token first_token;
if (tid == tablets.first_tablet()) {
first_token = dht::minimum_token();
} else {
first_token = tablets.get_last_token(tablet_id(size_t(tid) - 1));
}
auto token_range = std::make_pair(first_token, tablets.get_last_token(tid));
return tablet_routing_info{info.replicas, token_range};
};
for (auto&& r : info.replicas) {
if (r.host == host) {
if (r.shard == shard) {
return std::nullopt; // routed correctly
} else {
return make_tablet_routing_info();
}
}
}
auto tinfo = tablets.get_tablet_transition_info(tid);
if (tinfo && tinfo->pending_replica && tinfo->pending_replica->host == host && tinfo->pending_replica->shard == shard) {
return std::nullopt; // routed correctly
}
return make_tablet_routing_info();
}
virtual bool has_pending_ranges(locator::host_id host_id) const override {
for (const auto& [id, transition_info] : get_tablet_map().transitions()) {
if (transition_info.pending_replica && transition_info.pending_replica->host == host_id) {
return true;
}
}
return false;
}
virtual std::unique_ptr<token_range_splitter> make_splitter() const override {
class splitter : public token_range_splitter {
token_metadata_ptr _tmptr; // To keep the tablet map alive.
const tablet_map& _tmap;
std::optional<tablet_id> _next;
public:
splitter(token_metadata_ptr tmptr, const tablet_map& tmap)
: _tmptr(std::move(tmptr))
, _tmap(tmap) {
}
void reset(dht::ring_position_view pos) override {
_next = _tmap.get_tablet_id(pos.token());
}
std::optional<dht::token> next_token() override {
if (!_next) {
return std::nullopt;
}
auto t = _tmap.get_last_token(*_next);
_next = _tmap.next_tablet(*_next);
return t;
}
};
return std::make_unique<splitter>(_tmptr, get_tablet_map());
}
const dht::sharder& get_sharder(const schema& s) const override {
return _sharder;
}
virtual dht::shard_replica_set shards_ready_for_reads(const schema& s, const token& token) const override {
return _sharder.shards_ready_for_reads(token);
}
};
void tablet_aware_replication_strategy::validate_tablet_options(
const abstract_replication_strategy& ars, const gms::feature_service& fs, const replication_strategy_config_options& opts) const {
if (ars._uses_tablets && !fs.tablets) {
throw exceptions::configuration_exception("Tablet replication is not enabled");
}
}
void tablet_aware_replication_strategy::process_tablet_options(
abstract_replication_strategy& ars, replication_strategy_config_options& opts, replication_strategy_params params) {
if (ars._uses_tablets) {
_initial_tablets = params.initial_tablets.value_or(0);
_consistency = params.consistency.value_or(data_dictionary::consistency_config_option::eventual);
mark_as_per_table(ars);
}
}
effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replication_map(
table_id table, replication_strategy_ptr rs, token_metadata_ptr tm, size_t replication_factor) const {
return seastar::make_shared<tablet_effective_replication_map>(table, std::move(rs), std::move(tm), replication_factor);
}
void tablet_metadata_guard::check() noexcept {
auto erm = _table->get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table);
auto& old_tmap = _erm->get_token_metadata().tablets().get_tablet_map(_tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet);
tablet_logger.debug("tablet_metadata_guard::check: table {}.{}, tablet {}, "
"old erm version {}, new erm version {}, old tablet map {}, new tablet map {}",
_table->schema()->ks_name(), _table->schema()->cf_name(), _tablet, _erm.get()->get_token_metadata().get_version(),
erm.get()->get_token_metadata().get_version(), old_tmap, tmap);
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage) || old_tmap.tablet_count() != tmap.tablet_count()) {
tablet_logger.debug("tablet_metadata_guard::check: retain the erm and abort the guard");
_abort_source.request_abort();
} else {
tablet_logger.debug("tablet_metadata_guard::check: refresh the erm");
_erm = std::move(erm);
subscribe();
}
}
tablet_metadata_guard::tablet_metadata_guard(replica::table& table, global_tablet_id tablet)
: _table(table.shared_from_this())
, _tablet(tablet)
, _erm(table.get_effective_replication_map()) {
subscribe();
if (auto* trinfo = get_tablet_map().get_tablet_transition_info(tablet.tablet)) {
_stage = trinfo->stage;
}
}
tablet_metadata_guard::~tablet_metadata_guard() = default;
void tablet_metadata_guard::subscribe() {
_callback = _erm->get_validity_abort_source().subscribe([this]() noexcept {
check();
});
}
token_metadata_guard::token_metadata_guard(replica::table& table, dht::token token)
: _guard(std::invoke([&] -> guard_type {
auto erm = table.get_effective_replication_map();
if (!table.uses_tablets()) {
return std::move(erm);
}
const auto table_id = table.schema()->id();
const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(table_id);
return make_lw_shared<tablet_metadata_guard>(table, global_tablet_id{.table = table_id, .tablet = tablet_map.get_tablet_id(token)});
})) {
}
const effective_replication_map_ptr& token_metadata_guard::get_erm() const {
const auto* g = get_if<lw_shared_ptr<tablet_metadata_guard>>(&_guard);
return g ? (**g).get_erm() : get<effective_replication_map_ptr>(_guard);
}
static void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr tmptr, const abstract_replication_strategy& ars,
const std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<host_id>>>& dc_rack_map,
std::function<bool(host_id)> is_normal_token_owner, std::function<bool(host_id)> is_transitioning_token_owner) {
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Starting verifying that keyspace '{}' is RF-rack-valid", ks);
// Any keyspace that does NOT use tablets is RF-rack-valid.
if (!ars.uses_tablets()) {
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Keyspace '{}' has been verified to be RF-rack-valid (no tablets)", ks);
return;
}
// Tablets can only be used with NetworkTopologyStrategy.
SCYLLA_ASSERT(ars.get_type() == replication_strategy_type::network_topology);
const auto& nts = *static_cast<const network_topology_strategy*>(std::addressof(ars));
for (const auto& dc : nts.get_datacenters()) {
if (!dc_rack_map.contains(dc)) {
on_internal_error(tablet_logger, seastar::format("Precondition violated: DC '{}' is part of the passed replication strategy, but it is not "
"known by the passed locator::token_metadata_ptr.",
dc));
}
}
for (const auto& [dc, rack_map] : dc_rack_map) {
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Verifying for '{}' / '{}'", ks, dc);
auto rf_data = nts.get_replication_factor_data(dc);
if (!rf_data) {
continue;
}
auto required_racks = rf_data->is_rack_based() ? std::flat_set<sstring>(std::from_range, rf_data->get_rack_list()) : std::flat_set<sstring>{};
size_t normal_rack_count = 0;
size_t transitioning_rack_count = 0;
for (const auto& [rack, rack_nodes] : rack_map) {
// We must ignore zero-token nodes because they don't take part in replication.
// Verify that this rack has at least one normal node.
const bool normal_rack = std::ranges::any_of(rack_nodes, is_normal_token_owner);
if (normal_rack) {
++normal_rack_count;
required_racks.erase(rack);
} else {
const bool is_transitioning_rack = std::ranges::any_of(rack_nodes, is_transitioning_token_owner);
if (is_transitioning_rack) {
++transitioning_rack_count;
}
}
}
if (required_racks.size() > 0) {
const auto rack_list = required_racks | std::views::join_with(std::string_view(", ")) | std::ranges::to<std::string>();
throw std::invalid_argument(std::format("The keyspace '{}' is required to be RF-rack-valid. "
"That condition is violated for DC '{}': the following racks are "
"required by the replication strategy, but they don't exist in the topology: {}.",
ks, std::string_view(dc), rack_list));
}
if (rf_data->is_rack_based()) {
continue;
}
auto rf = rf_data->count();
// We must not allow for a keyspace to become RF-rack-invalid. Any attempt at that must be rejected.
// For more context, see: scylladb/scylladb#23276.
// If some rack is undetermined because it has only transitioning nodes and no normal nodes, we cannot
// use normal_rack_count to determine RF-rack-validity.
const bool valid_rf = (rf == normal_rack_count && transitioning_rack_count == 0) || rf == 1 || rf == 0;
// Edge case: the DC in question is an arbiter DC and does NOT take part in replication.
// Any positive RF for that DC is invalid.
const bool invalid_arbiter_dc = normal_rack_count == 0 && rf > 0;
if (!valid_rf || invalid_arbiter_dc) {
throw std::invalid_argument(std::format("The keyspace '{}' is required to be RF-rack-valid. "
"That condition is violated for DC '{}': RF={} vs. rack count={}.",
ks, std::string_view(dc), rf, normal_rack_count));
}
}
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Keyspace '{}' has been verified to be RF-rack-valid", ks);
}
static bool is_normal_token_owner(const token_metadata& tm, locator::host_id host) {
auto& node = tm.get_topology().get_node(host);
return tm.is_normal_token_owner(host) && !tm.is_leaving(host) && !node.is_draining();
}
static bool is_transitioning_token_owner(const token_metadata& tm, locator::host_id host) {
auto& node = tm.get_topology().get_node(host);
return tm.get_topology().get_node(host).is_bootstrapping() || (tm.is_normal_token_owner(host) && (tm.is_leaving(host) || node.is_draining()));
}
void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr tmptr, const abstract_replication_strategy& ars) {
assert_rf_rack_valid_keyspace(
ks, tmptr, ars, tmptr->get_topology().get_datacenter_racks(),
[&tmptr](host_id host) {
return is_normal_token_owner(*tmptr, host);
},
[&tmptr](host_id host) {
return is_transitioning_token_owner(*tmptr, host);
});
}
void assert_rf_rack_valid_keyspace(
std::string_view ks, const token_metadata_ptr tmptr, const abstract_replication_strategy& ars, rf_rack_topology_operation op) {
auto dc_rack_map = tmptr->get_topology().get_datacenter_racks();
switch (op.tag) {
case rf_rack_topology_operation::type::add:
// include the new node only if it's added to an existing DC.
if (dc_rack_map.contains(op.dc)) {
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Including new node {} in DC '{}' and rack '{}'", op.node_id, op.dc, op.rack);
dc_rack_map[op.dc][op.rack].insert(op.node_id);
}
break;
case rf_rack_topology_operation::type::remove:
// remove the node from the map, if it exists.
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Excluding removed node {} from DC '{}' and rack '{}'", op.node_id, op.dc, op.rack);
if (dc_rack_map.contains(op.dc) && dc_rack_map[op.dc].contains(op.rack)) {
dc_rack_map[op.dc][op.rack].erase(op.node_id);
if (dc_rack_map[op.dc][op.rack].empty()) {
dc_rack_map[op.dc].erase(op.rack);
if (dc_rack_map[op.dc].empty()) {
dc_rack_map.erase(op.dc);
}
}
}
break;
}
assert_rf_rack_valid_keyspace(
ks, tmptr, ars, dc_rack_map,
[&tmptr, &op](host_id host) {
if (op.tag == rf_rack_topology_operation::type::add && host == op.node_id) {
return true;
}
return is_normal_token_owner(*tmptr, host);
},
[&tmptr, &op](host_id host) {
if (op.tag == rf_rack_topology_operation::type::add && host == op.node_id) {
return false;
}
return is_transitioning_token_owner(*tmptr, host);
});
}
rack_list get_allowed_racks(const locator::token_metadata& tm, const sstring& dc) {
auto& topo = tm.get_topology();
auto normal_nodes = [&](const sstring& rack) {
int count = 0;
for (auto n : topo.get_datacenter_rack_nodes().at(dc).at(rack)) {
count += int(n.get().is_normal() && !n.get().is_draining());
}
return count;
};
const auto& all_dcs = tm.get_datacenter_racks_token_owners();
auto it = all_dcs.find(dc);
if (it != all_dcs.end()) {
return it->second | std::views::keys | std::views::filter([&](const sstring& rack) {
return normal_nodes(rack) > 0;
}) | std::ranges::to<std::vector<sstring>>();
}
return {};
}
} // namespace locator
auto fmt::formatter<locator::resize_decision_way>::format(const locator::resize_decision_way& way, fmt::format_context& ctx) const -> decltype(ctx.out()) {
static const std::array<sstring, 3> index_to_string = {
"none",
"split",
"merge",
};
static_assert(std::variant_size_v<locator::resize_decision_way> == index_to_string.size());
return fmt::format_to(ctx.out(), "{}", index_to_string[way.index()]);
}
auto fmt::formatter<locator::global_tablet_id>::format(const locator::global_tablet_id& id, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}:{}", id.table, id.tablet);
}
auto fmt::formatter<locator::tablet_transition_stage>::format(const locator::tablet_transition_stage& stage, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", locator::tablet_transition_stage_to_string(stage));
}
auto fmt::formatter<locator::tablet_transition_kind>::format(const locator::tablet_transition_kind& kind, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", locator::tablet_transition_kind_to_string(kind));
}
auto fmt::formatter<locator::tablet_task_type>::format(const locator::tablet_task_type& kind, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", locator::tablet_task_type_to_string(kind));
}
auto fmt::formatter<locator::tablet_repair_incremental_mode>::format(const locator::tablet_repair_incremental_mode& mode, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", locator::tablet_repair_incremental_mode_to_string(mode));
}
auto fmt::formatter<locator::tablet_map>::format(const locator::tablet_map& r, fmt::format_context& ctx) const -> decltype(ctx.out()) {
auto out = ctx.out();
if (r.tablet_count() == 0) {
return fmt::format_to(out, "{{}}");
}
out = fmt::format_to(out, "{{");
bool first = true;
locator::tablet_id tid = r.first_tablet();
for (auto&& tablet : r._tablets) {
if (!first) {
out = fmt::format_to(out, ",");
}
out = fmt::format_to(out, "\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas);
if (auto tr = r.get_tablet_transition_info(tid)) {
out = fmt::format_to(out, ", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica);
if (tr->session_id) {
out = fmt::format_to(out, ", session={}", tr->session_id);
}
}
if (r.has_raft_info()) {
const auto& raft_info = r.get_tablet_raft_info(tid);
if (raft_info.group_id) {
out = fmt::format_to(out, ", group_id={}", raft_info.group_id);
}
}
first = false;
tid = *r.next_tablet(tid);
}
return fmt::format_to(out, "}}");
}
auto fmt::formatter<locator::tablet_metadata>::format(const locator::tablet_metadata& tm, fmt::format_context& ctx) const -> decltype(ctx.out()) {
auto out = ctx.out();
out = fmt::format_to(out, "{{");
bool first = true;
for (auto&& [id, map] : tm._tablets) {
if (!first) {
out = fmt::format_to(out, ",");
}
out = fmt::format_to(out, "\n {}: {}", id, *map);
first = false;
}
return fmt::format_to(out, "\n}}");
}
auto fmt::formatter<locator::tablet_metadata_change_hint>::format(const locator::tablet_metadata_change_hint& hint, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
auto out = ctx.out();
out = fmt::format_to(out, "{{");
bool first = true;
for (auto&& [table_id, table_hint] : hint.tables) {
if (!first) {
out = fmt::format_to(out, ",");
}
out = fmt::format_to(out, "\n [{}]: {}", table_id, table_hint.tokens);
first = false;
}
return fmt::format_to(out, "\n}}");
}
auto fmt::formatter<locator::repair_scheduler_config>::format(const locator::repair_scheduler_config& config, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
std::map<sstring, sstring> ret{
{"auto_repair_enabled", config.auto_repair_enabled ? "true" : "false"},
{"auto_repair_threshold", std::to_string(config.auto_repair_threshold.count())},
};
return fmt::format_to(ctx.out(), "{}", rjson::print(rjson::from_string_map(ret)));
};
auto fmt::formatter<locator::tablet_task_info>::format(const locator::tablet_task_info& info, fmt::format_context& ctx) const -> decltype(ctx.out()) {
std::map<sstring, sstring> ret{
{"request_type", fmt::to_string(info.request_type)},
{"tablet_task_id", fmt::to_string(info.tablet_task_id)},
{"request_time", fmt::to_string(db_clock::to_time_t(info.request_time))},
{"sched_nr", fmt::to_string(info.sched_nr)},
{"sched_time", fmt::to_string(db_clock::to_time_t(info.sched_time))},
{"repair_hosts_filter", locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)},
{"repair_dcs_filter", locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)},
{"repair_incremental_mode", fmt::to_string(info.repair_incremental_mode)},
};
return fmt::format_to(ctx.out(), "{}", rjson::print(rjson::from_string_map(ret)));
};
bool locator::tablet_task_info::is_valid() const {
return request_type != locator::tablet_task_type::none;
}
bool locator::tablet_task_info::is_user_repair_request() const {
return request_type == locator::tablet_task_type::user_repair;
}
bool locator::tablet_task_info::selected_by_filters(const tablet_replica& replica, const topology& topo) const {
if (!repair_hosts_filter.empty() && !repair_hosts_filter.contains(replica.host)) {
return false;
}
auto dc = topo.get_datacenter(replica.host);
if (!repair_dcs_filter.empty() && !repair_dcs_filter.contains(dc)) {
return false;
}
return true;
}
locator::tablet_task_info locator::tablet_task_info::make_auto_repair_request(
std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, tablet_repair_incremental_mode mode) {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{
locator::tablet_task_type::auto_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter, mode};
}
locator::tablet_task_info locator::tablet_task_info::make_user_repair_request(
std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, tablet_repair_incremental_mode mode) {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{
locator::tablet_task_type::user_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter, mode};
}
std::optional<locator::tablet_task_info> locator::tablet_task_info::merge_repair_tasks(
const locator::tablet_task_info& t1, const locator::tablet_task_info& t2) {
if (t1.is_valid() && t2.is_valid()) {
// In most cases, all tablets are requested to be repaired by a single
// API request, so they share the same task_id, request_type and other
// parameters. If both tablets have a valid repair_task_info, we could
// merge them most of the time.
if (t1.request_type == t2.request_type && t1.tablet_task_id == t2.tablet_task_id && t1.repair_incremental_mode == t2.repair_incremental_mode &&
t1.repair_dcs_filter == t2.repair_dcs_filter && t1.repair_hosts_filter == t2.repair_hosts_filter) {
// Allow repair_task_info merge, use combination of t1 and t2;
tablet_task_info t = t1;
t.request_time = std::min(t1.request_time, t2.request_time);
t.sched_nr = t1.sched_nr + t2.sched_nr;
t.sched_time = std::max(t1.sched_time, t2.sched_time);
return t;
} else {
// Do not allow repair_task_info merge
return {};
}
} else if (!t1.is_valid() && !t2.is_valid()) {
// Allow repair_task_info merge, none of them are valid, use either is ok, use t1.
return t1;
} else if (t1.is_valid()) {
// Allow repair_task_info merge, only t1 is valid, use t1
return t1;
} else {
// Allow repair_task_info merge, only t2 is valid, usb t2.
return t2;
}
}
sstring locator::tablet_task_info::serialize_repair_hosts_filter(std::unordered_set<locator::host_id> filter) {
sstring res = "";
bool first = true;
for (const auto& host : filter) {
if (!std::exchange(first, false)) {
res += ",";
}
res += host.to_sstring();
}
return res;
}
sstring locator::tablet_task_info::serialize_repair_dcs_filter(std::unordered_set<sstring> filter) {
sstring res = "";
bool first = true;
for (const auto& dc : filter) {
if (!std::exchange(first, false)) {
res += ",";
}
res += dc;
}
return res;
}
std::unordered_set<locator::host_id> locator::tablet_task_info::deserialize_repair_hosts_filter(sstring filter) {
if (filter.empty()) {
return {};
}
sstring delim = ",";
return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) {
return locator::host_id(utils::UUID(std::string_view{h}));
}) | std::ranges::to<std::unordered_set>();
}
std::unordered_set<sstring> locator::tablet_task_info::deserialize_repair_dcs_filter(sstring filter) {
if (filter.empty()) {
return {};
}
sstring delim = ",";
return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) {
return sstring{std::string_view{h}};
}) | std::ranges::to<std::unordered_set>();
}
locator::tablet_task_info locator::tablet_task_info::make_migration_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::migration, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}
locator::tablet_task_info locator::tablet_task_info::make_intranode_migration_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::intranode_migration, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}
locator::tablet_task_info locator::tablet_task_info::make_split_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::split, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}
locator::tablet_task_info locator::tablet_task_info::make_merge_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::merge, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}