Rename start_cleanup -> start_vnodes_cleanup for clarity. Pass topology_request and server_id in start_vnodes_cleanup, we will need them for better logging later.
357 lines
14 KiB
C++
357 lines
14 KiB
C++
/*
|
|
* Copyright (C) 2024-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "utils/assert.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "topology_mutation.hh"
|
|
#include "types/tuple.hh"
|
|
#include "types/types.hh"
|
|
#include "types/set.hh"
|
|
#include "types/map.hh"
|
|
|
|
namespace db {
|
|
extern thread_local data_type cdc_generation_ts_id_type;
|
|
}
|
|
|
|
namespace service {
|
|
|
|
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) :
|
|
_s(db::system_keyspace::topology()),
|
|
_m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)),
|
|
_ts(ts) {
|
|
}
|
|
|
|
topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) :
|
|
_builder(builder),
|
|
_r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) {
|
|
_r.apply(row_marker(_builder._ts));
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
|
|
const column_definition* cdef = self().schema().get_column_definition(cell);
|
|
SCYLLA_ASSERT(cdef);
|
|
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl()));
|
|
return self();
|
|
}
|
|
|
|
template<typename Builder>
|
|
template<std::ranges::range C>
|
|
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
|
|
Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) {
|
|
const column_definition* cdef = self().schema().get_column_definition(cell);
|
|
SCYLLA_ASSERT(cdef);
|
|
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
|
|
|
|
std::set<bytes, serialized_compare> cset(vtype->as_less_comparator());
|
|
for (const auto& v : c) {
|
|
cset.insert(vtype->decompose(data_value(v)));
|
|
}
|
|
|
|
collection_mutation_description cm;
|
|
cm.cells.reserve(cset.size());
|
|
for (const bytes& raw : cset) {
|
|
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl()));
|
|
}
|
|
|
|
if (apply_mode == collection_apply_mode::overwrite) {
|
|
cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now());
|
|
}
|
|
|
|
self().row().apply(*cdef, cm.serialize(*cdef->type));
|
|
return self();
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
|
|
auto cdef = self().schema().get_column_definition(cell);
|
|
SCYLLA_ASSERT(cdef);
|
|
if (!cdef->type->is_multi_cell()) {
|
|
self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now()));
|
|
} else {
|
|
collection_mutation_description cm;
|
|
cm.tomb = tombstone{self().timestamp(), gc_clock::now()};
|
|
self().row().apply(*cdef, cm.serialize(*cdef->type));
|
|
}
|
|
return self();
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, node_state value) {
|
|
return apply_atomic(cell, sstring{::format("{}", value)});
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology_request value) {
|
|
return apply_atomic(cell, sstring{::format("{}", value)});
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, global_topology_request value) {
|
|
return apply_atomic(cell, sstring{::format("{}", value)});
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
|
|
return apply_atomic(cell, value);
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const raft::server_id& value) {
|
|
return apply_atomic(cell, value.uuid());
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const uint32_t& value) {
|
|
return apply_atomic(cell, int32_t(value));
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, cleanup_status value) {
|
|
return apply_atomic(cell, sstring{::format("{}", value)});
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const utils::UUID& value) {
|
|
return apply_atomic(cell, value);
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, bool value) {
|
|
return apply_atomic(cell, value);
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const char* value) {
|
|
return apply_atomic(cell, value);
|
|
}
|
|
|
|
template<typename Builder>
|
|
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const db_clock::time_point& value) {
|
|
return apply_atomic(cell, value);
|
|
}
|
|
|
|
row& topology_node_mutation_builder::row() {
|
|
return _r.cells();
|
|
}
|
|
|
|
api::timestamp_type topology_node_mutation_builder::timestamp() const {
|
|
return _builder._ts;
|
|
}
|
|
|
|
const schema& topology_node_mutation_builder::schema() const {
|
|
return *_builder._s;
|
|
}
|
|
|
|
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids) {
|
|
return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | std::views::transform([] (const auto& node_id) { return node_id.id; }));
|
|
}
|
|
|
|
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
|
|
return apply_set(cell, collection_apply_mode::overwrite, tokens | std::views::transform([] (const auto& t) { return t.to_sstring(); }));
|
|
}
|
|
|
|
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set<sstring>& features) {
|
|
return apply_set(cell, collection_apply_mode::overwrite, features | std::views::transform([] (const auto& f) { return sstring(f); }));
|
|
}
|
|
|
|
canonical_mutation topology_node_mutation_builder::build() {
|
|
return canonical_mutation{std::move(_builder._m)};
|
|
}
|
|
|
|
row& topology_mutation_builder::row() {
|
|
return _m.partition().static_row().maybe_create();
|
|
}
|
|
|
|
api::timestamp_type topology_mutation_builder::timestamp() const {
|
|
return _ts;
|
|
}
|
|
|
|
const schema& topology_mutation_builder::schema() const {
|
|
return *_s;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
|
|
return apply_atomic("transition_state", ::format("{}", value));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) {
|
|
_m.set_static_cell("version", value, _ts);
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) {
|
|
_m.set_static_cell("fence_version", value, _ts);
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_session(session_id value) {
|
|
_m.set_static_cell("session", value.uuid(), _ts);
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) {
|
|
_m.set_static_cell("tablet_balancing_enabled", value, _ts);
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
|
|
return del("transition_state");
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::del_session() {
|
|
return del("session");
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
|
|
const utils::UUID& value) {
|
|
return apply_atomic("new_cdc_generation_data_uuid", value);
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_committed_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
|
|
auto dv = values | std::views::transform([&] (const auto& v) {
|
|
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
|
|
});
|
|
return apply_set("committed_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_new_keyspace_rf_change_data(
|
|
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
|
|
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
|
|
apply_atomic("new_keyspace_rf_change_data",
|
|
make_map_value(schema().get_column_definition("new_keyspace_rf_change_data")->type,
|
|
map_type_impl::native_type(rf_per_dc.begin(), rf_per_dc.end())));
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
|
|
auto dv = values | std::views::transform([&] (const auto& v) {
|
|
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
|
|
});
|
|
return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
|
|
return apply_atomic("global_topology_request", ::format("{}", value));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_global_topology_request_id(const utils::UUID& value) {
|
|
return apply_atomic("global_topology_request_id", value);
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::queue_global_topology_request_id(const utils::UUID& value) {
|
|
return apply_set("global_requests", collection_apply_mode::update, std::vector<data_value>{value});
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector<utils::UUID>& values, const utils::UUID& id) {
|
|
if (!values.empty() && values[0] == id) {
|
|
return apply_set("global_requests", collection_apply_mode::overwrite, std::span(values.begin() + 1, values.size() - 1));
|
|
} else {
|
|
return *this;
|
|
}
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_upgrade_state(topology::upgrade_state_type value) {
|
|
return apply_atomic("upgrade_state", ::format("{}", value));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set<sstring>& features) {
|
|
return apply_set("enabled_features", collection_apply_mode::update, features | std::views::transform([] (const auto& f) { return sstring(f); }));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::add_new_committed_cdc_generation(const cdc::generation_id_v2& value) {
|
|
auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}}));
|
|
apply_set("committed_cdc_generations", collection_apply_mode::update, std::vector<data_value>{dv});
|
|
apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector<data_value>{std::move(dv)});
|
|
return *this;
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::add_ignored_nodes(const std::unordered_set<raft::server_id>& value) {
|
|
return apply_set("ignore_nodes", collection_apply_mode::update, value | std::views::transform([] (const auto& id) { return id.uuid(); }));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::set_ignored_nodes(const std::unordered_set<raft::server_id>& value) {
|
|
return apply_set("ignore_nodes", collection_apply_mode::overwrite, value | std::views::transform([] (const auto& id) { return id.uuid(); }));
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
|
|
return del("global_topology_request");
|
|
}
|
|
|
|
topology_mutation_builder& topology_mutation_builder::del_global_topology_request_id() {
|
|
return del("global_topology_request_id");
|
|
}
|
|
|
|
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
|
|
_node_builder.emplace(*this, n);
|
|
return *_node_builder;
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id, bool set_type) :
|
|
_s(db::system_keyspace::topology_requests()),
|
|
_m(_s, partition_key::from_singular(*_s, id)),
|
|
_ts(utils::UUID_gen::micros_timestamp(id)),
|
|
_r(_m.partition().clustered_row(*_s, clustering_key::make_empty())),
|
|
_set_type(set_type) {
|
|
_r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl()));
|
|
}
|
|
|
|
ttl_opt topology_request_tracking_mutation_builder::ttl() const {
|
|
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::microseconds(_ts)) + std::chrono::months(1)
|
|
- std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch());
|
|
}
|
|
|
|
const schema& topology_request_tracking_mutation_builder::schema() const {
|
|
return *_s;
|
|
}
|
|
|
|
row& topology_request_tracking_mutation_builder::row() {
|
|
return _r.cells();
|
|
}
|
|
|
|
api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const {
|
|
return _ts;
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set(const char* cell, topology_request value) {
|
|
return _set_type ? builder_base::set(cell, value) : *this;
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set(const char* cell, global_topology_request value) {
|
|
return _set_type ? builder_base::set(cell, value) : *this;
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
|
|
set("end_time", db_clock::now());
|
|
if (error) {
|
|
set("error", *error);
|
|
}
|
|
return set("done", true);
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_truncate_table_data(const table_id& table_id) {
|
|
apply_atomic("truncate_table_id", table_id.uuid());
|
|
return *this;
|
|
}
|
|
|
|
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_new_keyspace_rf_change_data(
|
|
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
|
|
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
|
|
apply_atomic("new_keyspace_rf_change_data",
|
|
make_map_value(schema().get_column_definition("new_keyspace_rf_change_data")->type,
|
|
map_type_impl::native_type(rf_per_dc.begin(), rf_per_dc.end())));
|
|
return *this;
|
|
}
|
|
|
|
|
|
template class topology_mutation_builder_base<topology_mutation_builder>;
|
|
template class topology_mutation_builder_base<topology_node_mutation_builder>;
|
|
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
|
|
|
|
} // namespace service
|