mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'Cluster features on raft: add storage for supported and enabled features' from Piotr Dulikowski
This PR implements the storage part of the cluster features on raft functionality, as described in the "Cluster features on raft v2" doc. These changes will be useful for later PRs that will implement the remaining parts of the feature. Two new columns are added to `system.topology`: - `supported_features set<text>` is a new clustering column which holds the features that given node advertises as supported. It will be first initialized when the node joins the cluster, and then updated every time the node reboots and its supported features set changes. - `enabled_features set<text>` is a new static column which holds the features that are considered enabled by the cluster. Unlike in the current gossip-based implementation the features will not be enabled implicitly when all nodes support a feature, but rather via an explicit action of the topology coordinator. These columns are reflected in the `topology_state_machine` structure and are populated when the topology state is loaded. Appropriate methods are added to the `topology_mutation_builder` and `topology_node_mutation_builder` in order to allow setting/modifying those columns. During startup, nodes update their corresponding `supported_features` column to reflect their current feature set. For now it is done unconditionally, but in the future appropriate checks will be added which will prevent nodes from joining / starting their server for group 0 if they can't guarantee that they support all enabled features. Closes #14232 * github.com:scylladb/scylladb: storage_service: update supported cluster features in group0 on start storage_service: add methods for features to topology mutation builder storage_service: use explicit ::set overload instead of a template storage_service: reimplement mutation builder setters storage_service: introduce topology_mutation_builder_base topology_state_machine: include information about features system_keyspace: introduce deserialize_set_column db/system_keyspace: add storage for cluster features managed in group 0
This commit is contained in:
@@ -256,12 +256,14 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("num_tokens", int32_type)
|
||||
.with_column("shard_count", int32_type)
|
||||
.with_column("ignore_msb", int32_type)
|
||||
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("new_cdc_generation_data_uuid", uuid_type, column_kind::static_column)
|
||||
.with_column("version", long_type, column_kind::static_column)
|
||||
.with_column("transition_state", utf8_type, column_kind::static_column)
|
||||
.with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column)
|
||||
.with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column)
|
||||
.with_column("global_topology_request", utf8_type, column_kind::static_column)
|
||||
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
@@ -1554,6 +1556,13 @@ future<db_clock::time_point> system_keyspace::get_truncated_at(table_id cf_id) {
|
||||
});
|
||||
}
|
||||
|
||||
static set_type_impl::native_type deserialize_set_column(const schema& s, const cql3::untyped_result_set_row& row, const char* name) {
|
||||
auto blob = row.get_blob(name);
|
||||
auto cdef = s.get_column_definition(name);
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
return value_cast<set_type_impl::native_type>(deserialized);
|
||||
}
|
||||
|
||||
static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::token>& tokens) {
|
||||
set_type_impl::native_type tset;
|
||||
for (auto& t: tokens) {
|
||||
@@ -1562,7 +1571,7 @@ static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::t
|
||||
return tset;
|
||||
}
|
||||
|
||||
std::unordered_set<dht::token> decode_tokens(set_type_impl::native_type& tokens) {
|
||||
std::unordered_set<dht::token> decode_tokens(const set_type_impl::native_type& tokens) {
|
||||
std::unordered_set<dht::token> tset;
|
||||
for (auto& t: tokens) {
|
||||
auto str = value_cast<sstring>(t);
|
||||
@@ -1593,11 +1602,7 @@ future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> sy
|
||||
for (auto& row : *cql_result) {
|
||||
auto peer = gms::inet_address(row.get_as<net::inet_address>("peer"));
|
||||
if (row.has("tokens")) {
|
||||
auto blob = row.get_blob("tokens");
|
||||
auto cdef = peers()->get_column_definition("tokens");
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
auto tokens = value_cast<set_type_impl::native_type>(deserialized);
|
||||
ret.emplace(peer, decode_tokens(tokens));
|
||||
ret.emplace(peer, decode_tokens(deserialize_set_column(*peers(), row, "tokens")));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@@ -1791,12 +1796,8 @@ future<std::unordered_set<dht::token>> system_keyspace::get_saved_tokens() {
|
||||
return make_ready_future<std::unordered_set<dht::token>>();
|
||||
}
|
||||
|
||||
auto blob = msg->one().get_blob("tokens");
|
||||
auto cdef = local()->get_column_definition("tokens");
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
auto tokens = value_cast<set_type_impl::native_type>(deserialized);
|
||||
|
||||
return make_ready_future<std::unordered_set<dht::token>>(decode_tokens(tokens));
|
||||
auto decoded_tokens = decode_tokens(deserialize_set_column(*local(), msg->one(), "tokens"));
|
||||
return make_ready_future<std::unordered_set<dht::token>>(std::move(decoded_tokens));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3461,6 +3462,14 @@ future<> system_keyspace::set_must_synchronize_topology(bool value) {
|
||||
return set_scylla_local_param_as<bool>(MUST_SYNCHRONIZE_TOPOLOGY_KEY, value);
|
||||
}
|
||||
|
||||
static std::set<sstring> decode_features(const set_type_impl::native_type& features) {
|
||||
std::set<sstring> fset;
|
||||
for (auto& f : features) {
|
||||
fset.insert(value_cast<sstring>(std::move(f)));
|
||||
}
|
||||
return fset;
|
||||
}
|
||||
|
||||
future<service::topology> system_keyspace::load_topology_state() {
|
||||
auto rs = co_await qctx->execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
|
||||
@@ -3485,11 +3494,7 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
|
||||
std::optional<service::ring_slice> ring_slice;
|
||||
if (row.has("tokens")) {
|
||||
auto blob = row.get_blob("tokens");
|
||||
auto cdef = topology()->get_column_definition("tokens");
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
auto ts = value_cast<set_type_impl::native_type>(deserialized);
|
||||
auto tokens = decode_tokens(ts);
|
||||
auto tokens = decode_tokens(deserialize_set_column(*topology(), row, "tokens"));
|
||||
|
||||
if (tokens.empty()) {
|
||||
on_fatal_internal_error(slogger, format(
|
||||
@@ -3512,6 +3517,11 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
rebuild_option = row.get_as<sstring>("rebuild_option");
|
||||
}
|
||||
|
||||
std::set<sstring> supported_features;
|
||||
if (row.has("supported_features")) {
|
||||
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
|
||||
}
|
||||
|
||||
if (row.has("topology_request")) {
|
||||
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
|
||||
ret.requests.emplace(host_id, req);
|
||||
@@ -3578,7 +3588,7 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
if (map) {
|
||||
map->emplace(host_id, service::replica_state{
|
||||
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
||||
ring_slice, shard_count, ignore_msb});
|
||||
ring_slice, shard_count, ignore_msb, std::move(supported_features)});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3645,6 +3655,10 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
some_row.get_as<sstring>("global_topology_request"));
|
||||
ret.global_request.emplace(req);
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
}
|
||||
|
||||
co_return ret;
|
||||
|
||||
@@ -501,29 +501,64 @@ future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
|
||||
co_await _db.local().apply(freeze(muts), db::no_timeout);
|
||||
}
|
||||
|
||||
template<typename Builder>
|
||||
class topology_mutation_builder_base {
|
||||
private:
|
||||
Builder& self() {
|
||||
return *static_cast<Builder*>(this);
|
||||
}
|
||||
|
||||
protected:
|
||||
enum class collection_apply_mode {
|
||||
overwrite,
|
||||
update,
|
||||
};
|
||||
|
||||
using builder_base = topology_mutation_builder_base<Builder>;
|
||||
|
||||
Builder& apply_atomic(const char* cell, const data_value& value);
|
||||
template<std::ranges::range C>
|
||||
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
|
||||
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
|
||||
Builder& del(const char* cell);
|
||||
};
|
||||
|
||||
class topology_mutation_builder;
|
||||
|
||||
class topology_node_mutation_builder {
|
||||
class topology_node_mutation_builder
|
||||
: public topology_mutation_builder_base<topology_node_mutation_builder> {
|
||||
|
||||
friend builder_base;
|
||||
|
||||
topology_mutation_builder& _builder;
|
||||
deletable_row& _r;
|
||||
|
||||
private:
|
||||
row& row();
|
||||
api::timestamp_type timestamp() const;
|
||||
const schema& schema() const;
|
||||
|
||||
public:
|
||||
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
|
||||
|
||||
template<typename T>
|
||||
topology_node_mutation_builder& set(const char* cell, const T& value) {
|
||||
return set(cell, sstring{::format("{}", value)});
|
||||
}
|
||||
topology_node_mutation_builder& set(const char* cell, node_state value);
|
||||
topology_node_mutation_builder& set(const char* cell, topology_request value);
|
||||
topology_node_mutation_builder& set(const char* cell, const sstring& value);
|
||||
topology_node_mutation_builder& set(const char* cell, const raft::server_id& value);
|
||||
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
|
||||
template<typename S>
|
||||
requires std::constructible_from<sstring, S>
|
||||
topology_node_mutation_builder& set(const char* cell, const std::set<S>& value);
|
||||
topology_node_mutation_builder& set(const char* cell, const uint32_t& value);
|
||||
topology_node_mutation_builder& set(const char* cell, const utils::UUID& value);
|
||||
topology_node_mutation_builder& del(const char* cell);
|
||||
canonical_mutation build();
|
||||
};
|
||||
|
||||
class topology_mutation_builder {
|
||||
class topology_mutation_builder
|
||||
: public topology_mutation_builder_base<topology_mutation_builder> {
|
||||
|
||||
friend builder_base;
|
||||
friend class topology_node_mutation_builder;
|
||||
|
||||
schema_ptr _s;
|
||||
@@ -531,6 +566,12 @@ class topology_mutation_builder {
|
||||
api::timestamp_type _ts;
|
||||
|
||||
std::optional<topology_node_mutation_builder> _node_builder;
|
||||
|
||||
private:
|
||||
row& row();
|
||||
api::timestamp_type timestamp() const;
|
||||
const schema& schema() const;
|
||||
|
||||
public:
|
||||
topology_mutation_builder(api::timestamp_type ts);
|
||||
topology_mutation_builder& set_transition_state(topology::transition_state);
|
||||
@@ -538,6 +579,9 @@ public:
|
||||
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
|
||||
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
|
||||
topology_mutation_builder& set_global_topology_request(global_topology_request);
|
||||
template<typename S>
|
||||
requires std::constructible_from<sstring, S>
|
||||
topology_mutation_builder& add_enabled_features(const std::set<S>& value);
|
||||
topology_mutation_builder& del_transition_state();
|
||||
topology_mutation_builder& del_global_topology_request();
|
||||
topology_node_mutation_builder& with_node(raft::server_id);
|
||||
@@ -556,76 +600,124 @@ topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation
|
||||
_r.apply(row_marker(_builder._ts));
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
template<typename Builder>
|
||||
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
|
||||
const column_definition* cdef = self().schema().get_column_definition(cell);
|
||||
assert(cdef);
|
||||
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value)));
|
||||
return *this;
|
||||
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value)));
|
||||
return self();
|
||||
}
|
||||
|
||||
template<typename Builder>
|
||||
template<std::ranges::range C>
|
||||
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
|
||||
Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) {
|
||||
const column_definition* cdef = self().schema().get_column_definition(cell);
|
||||
assert(cdef);
|
||||
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
|
||||
|
||||
std::set<bytes, serialized_compare> cset(vtype->as_less_comparator());
|
||||
for (const auto& v : c) {
|
||||
cset.insert(vtype->decompose(data_value(v)));
|
||||
}
|
||||
|
||||
collection_mutation_description cm;
|
||||
cm.cells.reserve(cset.size());
|
||||
for (const bytes& raw : cset) {
|
||||
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view()));
|
||||
}
|
||||
|
||||
if (apply_mode == collection_apply_mode::overwrite) {
|
||||
cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now());
|
||||
}
|
||||
|
||||
self().row().apply(*cdef, cm.serialize(*cdef->type));
|
||||
return self();
|
||||
}
|
||||
|
||||
template<typename Builder>
|
||||
Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
|
||||
auto cdef = self().schema().get_column_definition(cell);
|
||||
assert(cdef);
|
||||
if (!cdef->type->is_multi_cell()) {
|
||||
self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now()));
|
||||
} else {
|
||||
collection_mutation_description cm;
|
||||
cm.tomb = tombstone{self().timestamp(), gc_clock::now()};
|
||||
self().row().apply(*cdef, cm.serialize(*cdef->type));
|
||||
}
|
||||
return self();
|
||||
}
|
||||
|
||||
row& topology_node_mutation_builder::row() {
|
||||
return _r.cells();
|
||||
}
|
||||
|
||||
api::timestamp_type topology_node_mutation_builder::timestamp() const {
|
||||
return _builder._ts;
|
||||
}
|
||||
|
||||
const schema& topology_node_mutation_builder::schema() const {
|
||||
return *_builder._s;
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, node_state value) {
|
||||
return apply_atomic(cell, sstring{::format("{}", value)});
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, topology_request value) {
|
||||
return apply_atomic(cell, sstring{::format("{}", value)});
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) {
|
||||
return apply_atomic(cell, value);
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const raft::server_id& value) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
assert(cdef);
|
||||
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value.uuid())));
|
||||
return *this;
|
||||
return apply_atomic(cell, value.uuid());
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const uint32_t& value) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
assert(cdef);
|
||||
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(int32_t(value))));
|
||||
return *this;
|
||||
return apply_atomic(cell, int32_t(value));
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(
|
||||
const char* cell, const utils::UUID& value) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
assert(cdef);
|
||||
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value)));
|
||||
return *this;
|
||||
return apply_atomic(cell, value);
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::del(const char* cell) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
assert(cdef);
|
||||
if (!cdef->type->is_multi_cell()) {
|
||||
_r.cells().apply(*cdef, atomic_cell::make_dead(_builder._ts, gc_clock::now()));
|
||||
} else {
|
||||
collection_mutation_description cm;
|
||||
cm.tomb = tombstone{_builder._ts, gc_clock::now()};
|
||||
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
|
||||
}
|
||||
return *this;
|
||||
return builder_base::del(cell);
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
|
||||
auto cdef = _builder._s->get_column_definition(cell);
|
||||
assert(cdef);
|
||||
collection_mutation_description cm;
|
||||
if (tokens.size()) {
|
||||
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
|
||||
return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); }));
|
||||
}
|
||||
|
||||
cm.cells.reserve(tokens.size());
|
||||
|
||||
for (auto&& value : tokens) {
|
||||
cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _builder._ts, bytes_view()));
|
||||
}
|
||||
|
||||
cm.tomb = tombstone(_builder._ts - 1, gc_clock::now());
|
||||
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
|
||||
} else {
|
||||
del(cell);
|
||||
}
|
||||
return *this;
|
||||
template<typename S>
|
||||
requires std::constructible_from<sstring, S>
|
||||
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set<S>& features) {
|
||||
return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
|
||||
}
|
||||
|
||||
canonical_mutation topology_node_mutation_builder::build() {
|
||||
return canonical_mutation{std::move(_builder._m)};
|
||||
}
|
||||
|
||||
row& topology_mutation_builder::row() {
|
||||
return _m.partition().static_row().maybe_create();
|
||||
}
|
||||
|
||||
api::timestamp_type topology_mutation_builder::timestamp() const {
|
||||
return _ts;
|
||||
}
|
||||
|
||||
const schema& topology_mutation_builder::schema() const {
|
||||
return *_s;
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
|
||||
_m.set_static_cell("transition_state", ::format("{}", value), _ts);
|
||||
return *this;
|
||||
return apply_atomic("transition_state", ::format("{}", value));
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) {
|
||||
@@ -634,35 +726,33 @@ topology_mutation_builder& topology_mutation_builder::set_version(topology::vers
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
|
||||
auto cdef = _s->get_column_definition("transition_state");
|
||||
assert(cdef);
|
||||
_m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||
return *this;
|
||||
return del("transition_state");
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
|
||||
const cdc::generation_id_v2& value) {
|
||||
_m.set_static_cell("current_cdc_generation_timestamp", value.ts, _ts);
|
||||
_m.set_static_cell("current_cdc_generation_uuid", value.id, _ts);
|
||||
apply_atomic("current_cdc_generation_timestamp", value.ts);
|
||||
apply_atomic("current_cdc_generation_uuid", value.id);
|
||||
return *this;
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
|
||||
const utils::UUID& value) {
|
||||
_m.set_static_cell("new_cdc_generation_data_uuid", value, _ts);
|
||||
return *this;
|
||||
return apply_atomic("new_cdc_generation_data_uuid", value);
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
|
||||
_m.set_static_cell("global_topology_request", ::format("{}", value), _ts);
|
||||
return *this;
|
||||
return apply_atomic("global_topology_request", ::format("{}", value));
|
||||
}
|
||||
|
||||
template<typename S>
|
||||
requires std::constructible_from<sstring, S>
|
||||
topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set<S>& features) {
|
||||
return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
|
||||
auto cdef = _s->get_column_definition("global_topology_request");
|
||||
assert(cdef);
|
||||
_m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||
return *this;
|
||||
return del("global_topology_request");
|
||||
}
|
||||
|
||||
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
|
||||
@@ -1483,7 +1573,8 @@ future<> storage_service::raft_replace(raft::server& raft_server, raft::server_i
|
||||
.set("replaced_id", replaced_id)
|
||||
.set("num_tokens", _db.local().get_config().num_tokens())
|
||||
.set("shard_count", smp::count)
|
||||
.set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits());
|
||||
.set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits())
|
||||
.set("supported_features", _feature_service.supported_feature_set());
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("replace {}/{}: add myself ({}) to topology", replaced_id, replaced_ip, raft_server.id()));
|
||||
try {
|
||||
@@ -1519,7 +1610,8 @@ future<> storage_service::raft_bootstrap(raft::server& raft_server) {
|
||||
.set("topology_request", topology_request::join)
|
||||
.set("num_tokens", _db.local().get_config().num_tokens())
|
||||
.set("shard_count", smp::count)
|
||||
.set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits());
|
||||
.set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits())
|
||||
.set("supported_features", _feature_service.supported_feature_set());
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "bootstrap: add myself to topology");
|
||||
try {
|
||||
@@ -1535,6 +1627,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
auto local_shard_count = smp::count;
|
||||
auto local_ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits();
|
||||
auto local_release_version = version::release();
|
||||
auto local_supported_features = boost::copy_range<std::set<sstring>>(_feature_service.supported_feature_set());
|
||||
|
||||
auto synchronized = [&] () {
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
@@ -1546,7 +1639,8 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
|
||||
return replica_state.shard_count == local_shard_count
|
||||
&& replica_state.ignore_msb == local_ignore_msb
|
||||
&& replica_state.release_version == local_release_version;
|
||||
&& replica_state.release_version == local_release_version
|
||||
&& replica_state.supported_features == local_supported_features;
|
||||
};
|
||||
|
||||
// We avoid performing a read barrier if we're sure that our metadata stored in topology
|
||||
@@ -1582,7 +1676,8 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
builder.with_node(raft_server.id())
|
||||
.set("shard_count", local_shard_count)
|
||||
.set("ignore_msb", local_ignore_msb)
|
||||
.set("release_version", local_release_version);
|
||||
.set("release_version", local_release_version)
|
||||
.set("supported_features", local_supported_features);
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(
|
||||
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
|
||||
|
||||
@@ -63,6 +63,7 @@ struct replica_state {
|
||||
std::optional<ring_slice> ring; // if engaged contain the set of tokens the node owns together with their state
|
||||
size_t shard_count;
|
||||
uint8_t ignore_msb;
|
||||
std::set<sstring> supported_features;
|
||||
};
|
||||
|
||||
struct topology {
|
||||
@@ -107,6 +108,9 @@ struct topology {
|
||||
// It's used as partition key in CDC_GENERATIONS_V3 table.
|
||||
std::optional<utils::UUID> new_cdc_generation_data_uuid;
|
||||
|
||||
// Features that are considered enabled by the cluster
|
||||
std::set<sstring> enabled_features;
|
||||
|
||||
// Find only nodes in non 'left' state
|
||||
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
|
||||
// Return true if node exists in any state including 'left' one
|
||||
|
||||
Reference in New Issue
Block a user