diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b2fa862c59..532663d282 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -256,12 +256,14 @@ schema_ptr system_keyspace::topology() { .with_column("num_tokens", int32_type) .with_column("shard_count", int32_type) .with_column("ignore_msb", int32_type) + .with_column("supported_features", set_type_impl::get_instance(utf8_type, true)) .with_column("new_cdc_generation_data_uuid", uuid_type, column_kind::static_column) .with_column("version", long_type, column_kind::static_column) .with_column("transition_state", utf8_type, column_kind::static_column) .with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column) .with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column) .with_column("global_topology_request", utf8_type, column_kind::static_column) + .with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column) .set_comment("Current state of topology change machine") .with_version(generate_schema_version(id)) .build(); @@ -1554,6 +1556,13 @@ future system_keyspace::get_truncated_at(table_id cf_id) { }); } +static set_type_impl::native_type deserialize_set_column(const schema& s, const cql3::untyped_result_set_row& row, const char* name) { + auto blob = row.get_blob(name); + auto cdef = s.get_column_definition(name); + auto deserialized = cdef->type->deserialize(blob); + return value_cast(deserialized); +} + static set_type_impl::native_type prepare_tokens(const std::unordered_set& tokens) { set_type_impl::native_type tset; for (auto& t: tokens) { @@ -1562,7 +1571,7 @@ static set_type_impl::native_type prepare_tokens(const std::unordered_set decode_tokens(set_type_impl::native_type& tokens) { +std::unordered_set decode_tokens(const set_type_impl::native_type& tokens) { std::unordered_set tset; for (auto& t: tokens) { auto str = value_cast(t); @@ -1593,11 +1602,7 @@ future>> sy for (auto& row : *cql_result) { auto peer = gms::inet_address(row.get_as("peer")); if (row.has("tokens")) { - auto blob = row.get_blob("tokens"); - auto cdef = peers()->get_column_definition("tokens"); - auto deserialized = cdef->type->deserialize(blob); - auto tokens = value_cast(deserialized); - ret.emplace(peer, decode_tokens(tokens)); + ret.emplace(peer, decode_tokens(deserialize_set_column(*peers(), row, "tokens"))); } } return ret; @@ -1791,12 +1796,8 @@ future> system_keyspace::get_saved_tokens() { return make_ready_future>(); } - auto blob = msg->one().get_blob("tokens"); - auto cdef = local()->get_column_definition("tokens"); - auto deserialized = cdef->type->deserialize(blob); - auto tokens = value_cast(deserialized); - - return make_ready_future>(decode_tokens(tokens)); + auto decoded_tokens = decode_tokens(deserialize_set_column(*local(), msg->one(), "tokens")); + return make_ready_future>(std::move(decoded_tokens)); }); } @@ -3461,6 +3462,14 @@ future<> system_keyspace::set_must_synchronize_topology(bool value) { return set_scylla_local_param_as(MUST_SYNCHRONIZE_TOPOLOGY_KEY, value); } +static std::set decode_features(const set_type_impl::native_type& features) { + std::set fset; + for (auto& f : features) { + fset.insert(value_cast(std::move(f))); + } + return fset; +} + future system_keyspace::load_topology_state() { auto rs = co_await qctx->execute_cql( format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY)); @@ -3485,11 +3494,7 @@ future system_keyspace::load_topology_state() { std::optional ring_slice; if (row.has("tokens")) { - auto blob = row.get_blob("tokens"); - auto cdef = topology()->get_column_definition("tokens"); - auto deserialized = cdef->type->deserialize(blob); - auto ts = value_cast(deserialized); - auto tokens = decode_tokens(ts); + auto tokens = decode_tokens(deserialize_set_column(*topology(), row, "tokens")); if (tokens.empty()) { on_fatal_internal_error(slogger, format( @@ -3512,6 +3517,11 @@ future system_keyspace::load_topology_state() { rebuild_option = row.get_as("rebuild_option"); } + std::set supported_features; + if (row.has("supported_features")) { + supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features")); + } + if (row.has("topology_request")) { auto req = service::topology_request_from_string(row.get_as("topology_request")); ret.requests.emplace(host_id, req); @@ -3578,7 +3588,7 @@ future system_keyspace::load_topology_state() { if (map) { map->emplace(host_id, service::replica_state{ nstate, std::move(datacenter), std::move(rack), std::move(release_version), - ring_slice, shard_count, ignore_msb}); + ring_slice, shard_count, ignore_msb, std::move(supported_features)}); } } @@ -3645,6 +3655,10 @@ future system_keyspace::load_topology_state() { some_row.get_as("global_topology_request")); ret.global_request.emplace(req); } + + if (some_row.has("enabled_features")) { + ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features")); + } } co_return ret; diff --git a/service/storage_service.cc b/service/storage_service.cc index 5ea79e4a6b..9db2fdc27a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -501,29 +501,64 @@ future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) { co_await _db.local().apply(freeze(muts), db::no_timeout); } +template +class topology_mutation_builder_base { +private: + Builder& self() { + return *static_cast(this); + } + +protected: + enum class collection_apply_mode { + overwrite, + update, + }; + + using builder_base = topology_mutation_builder_base; + + Builder& apply_atomic(const char* cell, const data_value& value); + template + requires std::convertible_to, data_value> + Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c); + Builder& del(const char* cell); +}; + class topology_mutation_builder; -class topology_node_mutation_builder { +class topology_node_mutation_builder + : public topology_mutation_builder_base { + + friend builder_base; + topology_mutation_builder& _builder; deletable_row& _r; +private: + row& row(); + api::timestamp_type timestamp() const; + const schema& schema() const; + public: topology_node_mutation_builder(topology_mutation_builder&, raft::server_id); - template - topology_node_mutation_builder& set(const char* cell, const T& value) { - return set(cell, sstring{::format("{}", value)}); - } + topology_node_mutation_builder& set(const char* cell, node_state value); + topology_node_mutation_builder& set(const char* cell, topology_request value); topology_node_mutation_builder& set(const char* cell, const sstring& value); topology_node_mutation_builder& set(const char* cell, const raft::server_id& value); topology_node_mutation_builder& set(const char* cell, const std::unordered_set& value); + template + requires std::constructible_from + topology_node_mutation_builder& set(const char* cell, const std::set& value); topology_node_mutation_builder& set(const char* cell, const uint32_t& value); topology_node_mutation_builder& set(const char* cell, const utils::UUID& value); topology_node_mutation_builder& del(const char* cell); canonical_mutation build(); }; -class topology_mutation_builder { +class topology_mutation_builder + : public topology_mutation_builder_base { + + friend builder_base; friend class topology_node_mutation_builder; schema_ptr _s; @@ -531,6 +566,12 @@ class topology_mutation_builder { api::timestamp_type _ts; std::optional _node_builder; + +private: + row& row(); + api::timestamp_type timestamp() const; + const schema& schema() const; + public: topology_mutation_builder(api::timestamp_type ts); topology_mutation_builder& set_transition_state(topology::transition_state); @@ -538,6 +579,9 @@ public: topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value); topology_mutation_builder& set_global_topology_request(global_topology_request); + template + requires std::constructible_from + topology_mutation_builder& add_enabled_features(const std::set& value); topology_mutation_builder& del_transition_state(); topology_mutation_builder& del_global_topology_request(); topology_node_mutation_builder& with_node(raft::server_id); @@ -556,76 +600,124 @@ topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation _r.apply(row_marker(_builder._ts)); } -topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) { - auto cdef = _builder._s->get_column_definition(cell); +template +Builder& topology_mutation_builder_base::apply_atomic(const char* cell, const data_value& value) { + const column_definition* cdef = self().schema().get_column_definition(cell); assert(cdef); - _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value))); - return *this; + self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value))); + return self(); +} + +template +template +requires std::convertible_to, data_value> +Builder& topology_mutation_builder_base::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) { + const column_definition* cdef = self().schema().get_column_definition(cell); + assert(cdef); + auto vtype = static_pointer_cast(cdef->type)->get_elements_type(); + + std::set cset(vtype->as_less_comparator()); + for (const auto& v : c) { + cset.insert(vtype->decompose(data_value(v))); + } + + collection_mutation_description cm; + cm.cells.reserve(cset.size()); + for (const bytes& raw : cset) { + cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view())); + } + + if (apply_mode == collection_apply_mode::overwrite) { + cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now()); + } + + self().row().apply(*cdef, cm.serialize(*cdef->type)); + return self(); +} + +template +Builder& topology_mutation_builder_base::del(const char* cell) { + auto cdef = self().schema().get_column_definition(cell); + assert(cdef); + if (!cdef->type->is_multi_cell()) { + self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now())); + } else { + collection_mutation_description cm; + cm.tomb = tombstone{self().timestamp(), gc_clock::now()}; + self().row().apply(*cdef, cm.serialize(*cdef->type)); + } + return self(); +} + +row& topology_node_mutation_builder::row() { + return _r.cells(); +} + +api::timestamp_type topology_node_mutation_builder::timestamp() const { + return _builder._ts; +} + +const schema& topology_node_mutation_builder::schema() const { + return *_builder._s; +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, node_state value) { + return apply_atomic(cell, sstring{::format("{}", value)}); +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, topology_request value) { + return apply_atomic(cell, sstring{::format("{}", value)}); +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) { + return apply_atomic(cell, value); } topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const raft::server_id& value) { - auto cdef = _builder._s->get_column_definition(cell); - assert(cdef); - _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value.uuid()))); - return *this; + return apply_atomic(cell, value.uuid()); } topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const uint32_t& value) { - auto cdef = _builder._s->get_column_definition(cell); - assert(cdef); - _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(int32_t(value)))); - return *this; + return apply_atomic(cell, int32_t(value)); } topology_node_mutation_builder& topology_node_mutation_builder::set( const char* cell, const utils::UUID& value) { - auto cdef = _builder._s->get_column_definition(cell); - assert(cdef); - _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value))); - return *this; + return apply_atomic(cell, value); } topology_node_mutation_builder& topology_node_mutation_builder::del(const char* cell) { - auto cdef = _builder._s->get_column_definition(cell); - assert(cdef); - if (!cdef->type->is_multi_cell()) { - _r.cells().apply(*cdef, atomic_cell::make_dead(_builder._ts, gc_clock::now())); - } else { - collection_mutation_description cm; - cm.tomb = tombstone{_builder._ts, gc_clock::now()}; - _r.cells().apply(*cdef, cm.serialize(*cdef->type)); - } - return *this; + return builder_base::del(cell); } topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set& tokens) { - auto cdef = _builder._s->get_column_definition(cell); - assert(cdef); - collection_mutation_description cm; - if (tokens.size()) { - auto vtype = static_pointer_cast(cdef->type)->get_elements_type(); + return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); })); +} - cm.cells.reserve(tokens.size()); - - for (auto&& value : tokens) { - cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _builder._ts, bytes_view())); - } - - cm.tomb = tombstone(_builder._ts - 1, gc_clock::now()); - _r.cells().apply(*cdef, cm.serialize(*cdef->type)); - } else { - del(cell); - } - return *this; +template +requires std::constructible_from +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set& features) { + return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); } canonical_mutation topology_node_mutation_builder::build() { return canonical_mutation{std::move(_builder._m)}; } +row& topology_mutation_builder::row() { + return _m.partition().static_row().maybe_create(); +} + +api::timestamp_type topology_mutation_builder::timestamp() const { + return _ts; +} + +const schema& topology_mutation_builder::schema() const { + return *_s; +} + topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) { - _m.set_static_cell("transition_state", ::format("{}", value), _ts); - return *this; + return apply_atomic("transition_state", ::format("{}", value)); } topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) { @@ -634,35 +726,33 @@ topology_mutation_builder& topology_mutation_builder::set_version(topology::vers } topology_mutation_builder& topology_mutation_builder::del_transition_state() { - auto cdef = _s->get_column_definition("transition_state"); - assert(cdef); - _m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now())); - return *this; + return del("transition_state"); } topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id( const cdc::generation_id_v2& value) { - _m.set_static_cell("current_cdc_generation_timestamp", value.ts, _ts); - _m.set_static_cell("current_cdc_generation_uuid", value.id, _ts); + apply_atomic("current_cdc_generation_timestamp", value.ts); + apply_atomic("current_cdc_generation_uuid", value.id); return *this; } topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid( const utils::UUID& value) { - _m.set_static_cell("new_cdc_generation_data_uuid", value, _ts); - return *this; + return apply_atomic("new_cdc_generation_data_uuid", value); } topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) { - _m.set_static_cell("global_topology_request", ::format("{}", value), _ts); - return *this; + return apply_atomic("global_topology_request", ::format("{}", value)); +} + +template +requires std::constructible_from +topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set& features) { + return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); } topology_mutation_builder& topology_mutation_builder::del_global_topology_request() { - auto cdef = _s->get_column_definition("global_topology_request"); - assert(cdef); - _m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now())); - return *this; + return del("global_topology_request"); } topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) { @@ -1483,7 +1573,8 @@ future<> storage_service::raft_replace(raft::server& raft_server, raft::server_i .set("replaced_id", replaced_id) .set("num_tokens", _db.local().get_config().num_tokens()) .set("shard_count", smp::count) - .set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits()); + .set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits()) + .set("supported_features", _feature_service.supported_feature_set()); topology_change change{{builder.build()}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("replace {}/{}: add myself ({}) to topology", replaced_id, replaced_ip, raft_server.id())); try { @@ -1519,7 +1610,8 @@ future<> storage_service::raft_bootstrap(raft::server& raft_server) { .set("topology_request", topology_request::join) .set("num_tokens", _db.local().get_config().num_tokens()) .set("shard_count", smp::count) - .set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits()); + .set("ignore_msb", _db.local().get_config().murmur3_partitioner_ignore_msb_bits()) + .set("supported_features", _feature_service.supported_feature_set()); topology_change change{{builder.build()}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "bootstrap: add myself to topology"); try { @@ -1535,6 +1627,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft auto local_shard_count = smp::count; auto local_ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits(); auto local_release_version = version::release(); + auto local_supported_features = boost::copy_range>(_feature_service.supported_feature_set()); auto synchronized = [&] () { auto it = _topology_state_machine._topology.find(raft_server.id()); @@ -1546,7 +1639,8 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft return replica_state.shard_count == local_shard_count && replica_state.ignore_msb == local_ignore_msb - && replica_state.release_version == local_release_version; + && replica_state.release_version == local_release_version + && replica_state.supported_features == local_supported_features; }; // We avoid performing a read barrier if we're sure that our metadata stored in topology @@ -1582,7 +1676,8 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft builder.with_node(raft_server.id()) .set("shard_count", local_shard_count) .set("ignore_msb", local_ignore_msb) - .set("release_version", local_release_version); + .set("release_version", local_release_version) + .set("supported_features", local_supported_features); topology_change change{{builder.build()}}; group0_command g0_cmd = _group0->client().prepare_command( std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id())); diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 77b060abcb..1fa3b93e1f 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -63,6 +63,7 @@ struct replica_state { std::optional ring; // if engaged contain the set of tokens the node owns together with their state size_t shard_count; uint8_t ignore_msb; + std::set supported_features; }; struct topology { @@ -107,6 +108,9 @@ struct topology { // It's used as partition key in CDC_GENERATIONS_V3 table. std::optional new_cdc_generation_data_uuid; + // Features that are considered enabled by the cluster + std::set enabled_features; + // Find only nodes in non 'left' state const std::pair* find(raft::server_id id) const; // Return true if node exists in any state including 'left' one