From 59e771be5c21d84ee82c1b31c1596b314379d978 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 23 Feb 2023 08:37:03 +0200 Subject: [PATCH 01/11] gms: gossip_digest: delete unused compare_to function Not only it is unused, it is wrong since it doesn't compare the digest endpoint member. Signed-off-by: Benny Halevy --- gms/gossip_digest.hh | 7 ------- 1 file changed, 7 deletions(-) diff --git a/gms/gossip_digest.hh b/gms/gossip_digest.hh index 7300ea5ed2..8762e3faf8 100644 --- a/gms/gossip_digest.hh +++ b/gms/gossip_digest.hh @@ -51,13 +51,6 @@ public: return _max_version; } - int32_t compare_to(gossip_digest d) const { - if (_generation != d.get_generation()) { - return (_generation - d.get_generation()); - } - return (_max_version - d.get_max_version()); - } - friend bool operator<(const gossip_digest& x, const gossip_digest& y) { if (x._generation != y._generation) { return x._generation < y._generation; From 44a8db016a5fd36262ca0f66ed4ef7f837f2fc1b Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 23 Feb 2023 08:37:03 +0200 Subject: [PATCH 02/11] gms: versioned_value: delete unused compare_to function Not only it is unused, it is wrong since it doesn't compare the value, only its version. Signed-off-by: Benny Halevy --- gms/versioned_value.hh | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 0b44aec760..169a62bcba 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -86,10 +86,6 @@ public: : version(-1) { } - int compare_to(const versioned_value &value) const noexcept { - return version - value.version; - } - friend inline std::ostream& operator<<(std::ostream& os, const versioned_value& x) { return os << "Value(" << x.value << "," << x.version << ")"; } From 5aaec736125bfc0565ce872b4fcf4b20120d98f6 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 24 Feb 2023 18:49:03 +0200 Subject: [PATCH 03/11] scylla-gdb: add get_gms_versioned_value Prepare for next patch that makes gms::versioned_value members private, and provides methods by the same name as the current members. Signed-off-by: Benny Halevy --- scylla-gdb.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/scylla-gdb.py b/scylla-gdb.py index 65d5695cb9..9c841ea852 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -4028,6 +4028,19 @@ class scylla_netw(gdb.Command): gdb.write(' %s\n' % (conn['_stats'])) +def get_gms_versioned_value(vv): + try: + return { + 'version': vv['_version'], + 'value': vv['_value'] + } + except gdb.error: + return { + 'version': vv['version'], + 'value': vv['value'] + } + + class scylla_gms(gdb.Command): def __init__(self): gdb.Command.__init__(self, 'scylla gms', gdb.COMMAND_USER, gdb.COMPLETE_NONE, True) @@ -4042,7 +4055,8 @@ class scylla_gms(gdb.Command): for (endpoint, state) in unordered_map(state_map): ip = ip_to_str(int(get_ip(endpoint)), byteorder=sys.byteorder) gdb.write('%s: (gms::endpoint_state*) %s (%s)\n' % (ip, state.address, state['_heart_beat_state'])) - for app_state, value in std_map(state['_application_state']): + for app_state, vv in std_map(state['_application_state']): + value = get_gms_versioned_value(vv) gdb.write(' %s: {version=%d, value=%s}\n' % (app_state, value['version'], value['value'])) From c5d819ce60b4d26111c871baea712c4411661297 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 23 Feb 2023 08:44:08 +0200 Subject: [PATCH 04/11] gms: versioned_value: make members private and provide accessor functions to get them. 1. So they can't be modified by mistake, as the versioned value is immutable. A new value must have a higher version. 2. Before making the version a strong gms::version_type. Signed-off-by: Benny Halevy --- api/failure_detector.cc | 4 ++-- cdc/generation.cc | 6 ++--- gms/endpoint_state.cc | 2 +- gms/endpoint_state.hh | 2 +- gms/gossiper.cc | 30 ++++++++++++------------- gms/versioned_value.hh | 20 +++++++++-------- idl/gossip_digest.idl.hh | 4 ++-- replica/table.cc | 4 ++-- service/load_broadcaster.hh | 2 +- service/migration_manager.cc | 8 +++---- service/misc_services.cc | 4 ++-- service/raft/raft_group0.cc | 2 +- service/raft/raft_group_registry.cc | 2 +- service/storage_service.cc | 34 ++++++++++++++--------------- 14 files changed, 63 insertions(+), 61 deletions(-) diff --git a/api/failure_detector.cc b/api/failure_detector.cc index cfac9f0d4a..a336296f11 100644 --- a/api/failure_detector.cc +++ b/api/failure_detector.cc @@ -31,8 +31,8 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) { // We return the enum index and not it's name to stay compatible to origin // method that the state index are static but the name can be changed. version_val.application_state = static_cast::type>(a.first); - version_val.value = a.second.value; - version_val.version = a.second.version; + version_val.value = a.second.value(); + version_val.version = a.second.version(); val.application_state.push(version_val); } res.push_back(val); diff --git a/cdc/generation.cc b/cdc/generation.cc index 85f56fbd67..a4f8397766 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -35,12 +35,12 @@ extern logging::logger cdc_log; static int get_shard_count(const gms::inet_address& endpoint, const gms::gossiper& g) { auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT); - return ep_state ? std::stoi(ep_state->value) : -1; + return ep_state ? std::stoi(ep_state->value()) : -1; } static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const gms::gossiper& g) { auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS); - return ep_state ? std::stoi(ep_state->value) : 0; + return ep_state ? std::stoi(ep_state->value()) : 0; } namespace db { @@ -775,7 +775,7 @@ future<> generation_service::on_change(gms::inet_address ep, gms::application_st return make_ready_future(); } - auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value); + auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value()); cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id); return legacy_handle_cdc_generation(gen_id); diff --git a/gms/endpoint_state.cc b/gms/endpoint_state.cc index ad6140d132..e4c59b1e97 100644 --- a/gms/endpoint_state.cc +++ b/gms/endpoint_state.cc @@ -48,7 +48,7 @@ bool endpoint_state::is_cql_ready() const noexcept { return false; } try { - return boost::lexical_cast(app_state->value); + return boost::lexical_cast(app_state->value()); } catch (...) { return false; } diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 43a921b3ea..6dc3d5ad70 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -140,7 +140,7 @@ public: if (!app_state) { return empty; } - const auto& value = app_state->value; + const auto& value = app_state->value(); if (value.empty()) { return empty; } diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 072a42ef6b..e8b0ff3a92 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -599,7 +599,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint const auto& remote_key = item.first; const auto& remote_value = item.second; const versioned_value* local_value = local_state.get_application_state_ptr(remote_key); - if (!local_value || remote_value.version > local_value->version) { + if (!local_value || remote_value.version() > local_value->version()) { logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value); local_state.add_application_state(remote_key, remote_value); @@ -1096,7 +1096,7 @@ int gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcep int max_version = state.get_heart_beat_state().get_heart_beat_version(); for (auto& entry : state.get_application_state_map()) { auto& value = entry.second; - max_version = std::max(max_version, value.version); + max_version = std::max(max_version, value.version()); } return max_version; } @@ -1403,7 +1403,7 @@ locator::host_id gossiper::get_host_id(inet_address endpoint) const { if (!app_state) { throw std::runtime_error(format("Host {} does not have HOST_ID application_state", endpoint)); } - return locator::host_id(utils::UUID(app_state->value)); + return locator::host_id(utils::UUID(app_state->value())); } std::set gossiper::get_nodes_with_host_id(locator::host_id host_id) const { @@ -1411,7 +1411,7 @@ std::set gossiper::get_nodes_with_host_id(locator::host_id ho for (auto& x : get_endpoint_states()) { auto node = x.first; auto app_state = get_application_state_ptr(node, application_state::HOST_ID); - if (app_state && host_id == locator::host_id(utils::UUID(app_state->value))) { + if (app_state && host_id == locator::host_id(utils::UUID(app_state->value()))) { nodes.insert(node); } } @@ -1439,12 +1439,12 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a /* Accumulate all application states whose versions are greater than "version" variable */ for (auto& entry : eps.get_application_state_map()) { auto& value = entry.second; - if (value.version > version) { + if (value.version() > version) { if (!reqd_endpoint_state) { reqd_endpoint_state.emplace(eps.get_heart_beat_state()); } auto& key = entry.first; - logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value); + logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value()); reqd_endpoint_state->add_application_state(key, value); } } @@ -1467,7 +1467,7 @@ sstring gossiper::get_rpc_address(const inet_address& endpoint) const { if (endpoint != get_broadcast_address()) { auto* v = get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS); if (v) { - return v->value; + return v->value(); } } return fmt::to_string(endpoint); @@ -1718,7 +1718,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta } const versioned_value* local_val = local_state.get_application_state_ptr(remote_key); - if (!local_val || remote_value.version > local_val->version) { + if (!local_val || remote_value.version() > local_val->version()) { changed.push_back(remote_key); local_state.add_application_state(remote_key, remote_value); } @@ -2275,7 +2275,7 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application if (!v) { return {}; } - return v->value; + return v->value(); } /** @@ -2305,7 +2305,7 @@ static std::string_view do_get_gossip_status(const gms::versioned_value* app_sta if (!app_state) { return gms::versioned_value::STATUS_UNKNOWN; } - const auto& value = app_state->value; + const auto& value = app_state->value(); auto pos = value.find(','); if (!value.size() || !pos) { return gms::versioned_value::STATUS_UNKNOWN; @@ -2438,7 +2438,7 @@ std::set gossiper::get_supported_features(inet_address endpoint) const if (!app_state) { return {}; } - return feature_service::to_feature_set(app_state->value); + return feature_service::to_feature_set(app_state->value()); } std::set gossiper::get_supported_features(const std::unordered_map& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const { @@ -2512,8 +2512,8 @@ void gossiper::check_snitch_name_matches(sstring local_snitch_name) const { continue; } - if (remote_snitch_name->value != local_snitch_name) { - throw std::runtime_error(format("Snitch check failed. This node cannot join the cluster because it uses {} and not {}", local_snitch_name, remote_snitch_name->value)); + if (remote_snitch_name->value() != local_snitch_name) { + throw std::runtime_error(format("Snitch check failed. This node cannot join the cluster because it uses {} and not {}", local_snitch_name, remote_snitch_name->value())); } } } @@ -2535,11 +2535,11 @@ void gossiper::append_endpoint_state(std::stringstream& ss, const endpoint_state if (app_state == application_state::TOKENS) { continue; } - ss << " " << app_state << ":" << versioned_val.version << ":" << versioned_val.value << "\n"; + ss << " " << app_state << ":" << versioned_val.version() << ":" << versioned_val.value() << "\n"; } const auto& app_state_map = state.get_application_state_map(); if (app_state_map.contains(application_state::TOKENS)) { - ss << " TOKENS:" << app_state_map.at(application_state::TOKENS).version << ":\n"; + ss << " TOKENS:" << app_state_map.at(application_state::TOKENS).version() << ":\n"; } else { ss << " TOKENS: not present" << "\n"; } diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 169a62bcba..1136899993 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -36,6 +36,8 @@ namespace gms { */ class versioned_value { + int _version; + sstring _value; public: // this must be a char that cannot be present in any token static constexpr char DELIMITER = ','; @@ -58,17 +60,17 @@ public: // values for ApplicationState.REMOVAL_COORDINATOR static constexpr const char* REMOVAL_COORDINATOR = "REMOVER"; - int version; - sstring value; + int version() const noexcept { return _version; }; + const sstring& value() const noexcept { return _value; }; public: bool operator==(const versioned_value& other) const noexcept { - return version == other.version && - value == other.value; + return _version == other._version && + _value == other._value; } public: versioned_value(const sstring& value, int version = version_generator::get_next_version()) - : version(version), value(value) { + : _version(version), _value(value) { #if 0 // blindly interning everything is somewhat suboptimal -- lots of VersionedValues are unique -- // but harmless, and interning the non-unique ones saves significant memory. (Unfortunately, @@ -79,15 +81,15 @@ public: } versioned_value(sstring&& value, int version = version_generator::get_next_version()) noexcept - : version(version), value(std::move(value)) { + : _version(version), _value(std::move(value)) { } versioned_value() noexcept - : version(-1) { + : _version(-1) { } friend inline std::ostream& operator<<(std::ostream& os, const versioned_value& x) { - return os << "Value(" << x.value << "," << x.version << ")"; + return os << "Value(" << x.value() << "," << x.version() << ")"; } static sstring version_string(const std::initializer_list& args) { @@ -105,7 +107,7 @@ public: static std::optional cdc_generation_id_from_string(const sstring&); static versioned_value clone_with_higher_version(const versioned_value& value) noexcept { - return versioned_value(value.value); + return versioned_value(value.value()); } static versioned_value bootstrapping(const std::unordered_set& tokens) { diff --git a/idl/gossip_digest.idl.hh b/idl/gossip_digest.idl.hh index c74596bc49..bde6c5d099 100644 --- a/idl/gossip_digest.idl.hh +++ b/idl/gossip_digest.idl.hh @@ -28,8 +28,8 @@ enum class application_state:int { }; class versioned_value { - sstring value; - int version; + sstring value(); + int version(); }; class heart_beat_state { diff --git a/replica/table.cc b/replica/table.cc index 5ecc3c1a74..c5bc045fac 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2210,9 +2210,9 @@ table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::in float f = -1.0f; // missing state means old node if (state) { sstring me = format("{}.{}", _schema->ks_name(), _schema->cf_name()); - auto i = state->value.find(me); + auto i = state->value().find(me); if (i != sstring::npos) { - f = strtof(&state->value[i + me.size() + 1], nullptr); + f = strtof(&state->value()[i + me.size() + 1], nullptr); } else { f = 0.0f; // empty state means that node has rebooted } diff --git a/service/load_broadcaster.hh b/service/load_broadcaster.hh index e00c62ecdf..f1cadec8d2 100644 --- a/service/load_broadcaster.hh +++ b/service/load_broadcaster.hh @@ -37,7 +37,7 @@ public: virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override { if (state == gms::application_state::LOAD) { - _load_info[endpoint] = std::stod(value.value); + _load_info[endpoint] = std::stod(value.value()); } return make_ready_future(); } diff --git a/service/migration_manager.cc b/service/migration_manager.cc index b43abe5083..4e062d611b 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -179,7 +179,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, if (endpoint != utils::fb_utilities::get_broadcast_address() && value) { // FIXME: discarded future - (void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value}), endpoint).handle_exception([endpoint] (auto ep) { + (void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value()}), endpoint).handle_exception([endpoint] (auto ep) { mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep); }); } @@ -205,7 +205,7 @@ bool migration_manager::have_schema_agreement() { mlogger.debug("Schema state not yet available for {}.", endpoint); return false; } - auto remote_version = table_schema_version(utils::UUID{schema->value}); + auto remote_version = table_schema_version(utils::UUID{schema->value()}); if (our_version != remote_version) { mlogger.debug("Schema mismatch for {} ({} != {}).", endpoint, our_version, remote_version); return false; @@ -251,7 +251,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio mlogger.debug("application_state::SCHEMA does not exist for {}, not submitting migration task", endpoint); return make_ready_future<>(); } - auto current_version = table_schema_version(utils::UUID{value->value}); + auto current_version = table_schema_version(utils::UUID{value->value()}); if (db.get_version() == current_version) { mlogger.debug("not submitting migration task for {} because our versions match", endpoint); return make_ready_future<>(); @@ -366,7 +366,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr bool migration_manager::has_compatible_schema_tables_version(const gms::inet_address& endpoint) { auto* version = _gossiper.get_application_state_ptr(endpoint, gms::application_state::SCHEMA_TABLES_VERSION); - return version && version->value == db::schema_tables::version; + return version && version->value() == db::schema_tables::version; } bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint) { diff --git a/service/misc_services.cc b/service/misc_services.cc index d20d822944..02f7701a52 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -275,7 +275,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms:: size_t current; size_t max; api::timestamp_type ticks; - const char* start_bound = value.value.data(); + const char* start_bound = value.value().data(); char* end_bound; for (auto* ptr : {¤t, &max}) { *ptr = std::strtoull(start_bound, &end_bound, 10); @@ -288,7 +288,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms:: return make_ready_future(); } ticks = std::strtoll(start_bound, &end_bound, 10); - if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value.data() + value.value.size()) { + if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value().data() + value.value().size()) { return make_ready_future(); } auto backlog = view_update_backlog_timestamped{db::view::update_backlog{current, max}, ticks}; diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 661e232b7e..1523b7d26d 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -617,7 +617,7 @@ void raft_group0::load_initial_raft_address_map() { if (value == nullptr) { continue; } - auto server_id = utils::UUID(value->value); + auto server_id = utils::UUID(value->value()); if (server_id == utils::UUID{}) { upgrade_log.error("empty Host ID for host {} ", ip_addr); continue; diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index f31d2386ea..a94b767d37 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -77,7 +77,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state ep_state) { auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID); if (app_state_ptr) { - raft::server_id id(utils::UUID(app_state_ptr->value)); + raft::server_id id(utils::UUID(app_state_ptr->value())); rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change() {} {}", endpoint, id); _address_map.add_or_update_entry(id, endpoint); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 4ff68a077c..d48f3716a2 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1816,7 +1816,7 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { if (endpoint != get_broadcast_address()) { auto* v = _gossiper.get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS); if (v) { - return v->value; + return v->value(); } } return fmt::to_string(endpoint); @@ -2160,9 +2160,9 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect throw std::runtime_error(err); } std::vector coordinator; - boost::split(coordinator, value->value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); if (coordinator.size() != 2) { - auto err = format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value); + auto err = format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value()); slogger.warn("{}", err); throw std::runtime_error(err); } @@ -2236,7 +2236,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}", endpoint, state, value); if (state == application_state::STATUS) { std::vector pieces; - boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); if (pieces.empty()) { slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value); co_return; @@ -2274,7 +2274,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready()); co_await notify_cql_change(endpoint, ep_state->is_cql_ready()); } else if (state == application_state::INTERNAL_IP) { - co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value)); + co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value())); } } } @@ -2330,18 +2330,18 @@ future<> storage_service::update_table(gms::inet_address endpoint, sstring col, future<> storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) { slogger.debug("Update system.peers table: endpoint={}, app_state={}, versioned_value={}", endpoint, state, value); if (state == application_state::RELEASE_VERSION) { - co_await update_table(endpoint, "release_version", value.value); + co_await update_table(endpoint, "release_version", value.value()); } else if (state == application_state::DC) { - co_await update_table(endpoint, "data_center", value.value); + co_await update_table(endpoint, "data_center", value.value()); } else if (state == application_state::RACK) { - co_await update_table(endpoint, "rack", value.value); + co_await update_table(endpoint, "rack", value.value()); } else if (state == application_state::INTERNAL_IP) { auto col = sstring("preferred_ip"); inet_address ep; try { - ep = gms::inet_address(value.value); + ep = gms::inet_address(value.value()); } catch (...) { - slogger.error("fail to update {} for {}: invalid address {}", col, endpoint, value.value); + slogger.error("fail to update {} for {}: invalid address {}", col, endpoint, value.value()); co_return; } co_await update_table(endpoint, col, ep.addr()); @@ -2349,18 +2349,18 @@ future<> storage_service::do_update_system_peers_table(gms::inet_address endpoin auto col = sstring("rpc_address"); inet_address ep; try { - ep = gms::inet_address(value.value); + ep = gms::inet_address(value.value()); } catch (...) { - slogger.error("fail to update {} for {}: invalid rcpaddr {}", col, endpoint, value.value); + slogger.error("fail to update {} for {}: invalid rcpaddr {}", col, endpoint, value.value()); co_return; } co_await update_table(endpoint, col, ep.addr()); } else if (state == application_state::SCHEMA) { - co_await update_table(endpoint, "schema_version", utils::UUID(value.value)); + co_await update_table(endpoint, "schema_version", utils::UUID(value.value())); } else if (state == application_state::HOST_ID) { - co_await update_table(endpoint, "host_id", utils::UUID(value.value)); + co_await update_table(endpoint, "host_id", utils::UUID(value.value())); } else if (state == application_state::SUPPORTED_FEATURES) { - co_await update_table(endpoint, "supported_features", value.value); + co_await update_table(endpoint, "supported_features", value.value()); } } @@ -2390,8 +2390,8 @@ locator::endpoint_dc_rack storage_service::get_dc_rack_for(inet_address endpoint auto* dc = _gossiper.get_application_state_ptr(endpoint, gms::application_state::DC); auto* rack = _gossiper.get_application_state_ptr(endpoint, gms::application_state::RACK); return locator::endpoint_dc_rack{ - .dc = dc ? dc->value : locator::endpoint_dc_rack::default_location.dc, - .rack = rack ? rack->value : locator::endpoint_dc_rack::default_location.rack, + .dc = dc ? dc->value() : locator::endpoint_dc_rack::default_location.dc, + .rack = rack ? rack->value() : locator::endpoint_dc_rack::default_location.rack, }; } From f5f566bdd8b2fbffd104593dba033175f51b26ae Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 24 Feb 2023 14:10:52 +0200 Subject: [PATCH 05/11] utils: add tagged_integer A generic template for defining strongly typed integer types. Use it here to replace raft::internal::tagged_uint64. Will be used for defining gms generation and version as strong and distinguishable types in following patches. Signed-off-by: Benny Halevy --- configure.py | 1 + idl/CMakeLists.txt | 4 +- idl/raft.idl.hh | 4 ++ idl/raft_storage.idl.hh | 6 +- idl/utils.idl.hh | 18 ++++++ raft/fsm.cc | 4 +- raft/internal.hh | 58 ----------------- raft/raft.hh | 7 ++- test/raft/randomized_nemesis_test.cc | 2 +- utils/tagged_integer.hh | 93 ++++++++++++++++++++++++++++ 10 files changed, 127 insertions(+), 70 deletions(-) create mode 100644 idl/utils.idl.hh create mode 100644 utils/tagged_integer.hh diff --git a/configure.py b/configure.py index 75da9a447c..091d896bd7 100755 --- a/configure.py +++ b/configure.py @@ -1155,6 +1155,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/position_in_partition.idl.hh', 'idl/experimental/broadcast_tables_lang.idl.hh', 'idl/storage_service.idl.hh', + 'idl/utils.idl.hh', ] headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git']) diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index 48450e7dfd..ab69df70c5 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -59,7 +59,9 @@ set(idl_headers replica_exception.idl.hh per_partition_rate_limit_info.idl.hh position_in_partition.idl.hh - experimental/broadcast_tables_lang.idl.hh) + experimental/broadcast_tables_lang.idl.hh + utils.idl.hh + ) foreach(idl_header ${idl_headers}) compile_idl(${idl_header} diff --git a/idl/raft.idl.hh b/idl/raft.idl.hh index d54e2f7823..dbea21381d 100644 --- a/idl/raft.idl.hh +++ b/idl/raft.idl.hh @@ -6,6 +6,10 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include "raft/raft.hh" + +#include "idl/utils.idl.hh" + namespace raft { struct snapshot_descriptor { diff --git a/idl/raft_storage.idl.hh b/idl/raft_storage.idl.hh index 023776a93e..f7aa18e667 100644 --- a/idl/raft_storage.idl.hh +++ b/idl/raft_storage.idl.hh @@ -9,6 +9,7 @@ #include "raft/raft.hh" #include "idl/uuid.idl.hh" +#include "idl/utils.idl.hh" namespace raft { @@ -19,11 +20,6 @@ struct tagged_id { utils::UUID id; }; -template -struct tagged_uint64 { - uint64_t get_value(); -}; - } // namespace internal struct server_address { diff --git a/idl/utils.idl.hh b/idl/utils.idl.hh new file mode 100644 index 0000000000..bfe7a3c98d --- /dev/null +++ b/idl/utils.idl.hh @@ -0,0 +1,18 @@ +/* + * Copyright 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "utils/tagged_integer.hh" + +namespace utils { + +template +struct tagged_integer final { + ValueType value(); +}; + +} // namespace utils diff --git a/raft/fsm.cc b/raft/fsm.cc index 55e7cf0014..9a8852f6ba 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -98,7 +98,7 @@ const log_entry& fsm::add_entry(T command) { tmp.enter_joint(command.current); command = std::move(tmp); - logger.trace("[{}] appending joint config entry at {}: {}", _my_id, _log.next_idx().get_value(), command); + logger.trace("[{}] appending joint config entry at {}: {}", _my_id, _log.next_idx(), command); } utils::get_local_injector().inject("fsm::add_entry/test-failure", @@ -462,7 +462,7 @@ void fsm::maybe_commit() { // system then transitions to the new configuration. configuration cfg(_log.get_configuration()); cfg.leave_joint(); - logger.trace("[{}] appending non-joint config entry at {}: {}", _my_id, _log.next_idx().get_value(), cfg); + logger.trace("[{}] appending non-joint config entry at {}: {}", _my_id, _log.next_idx(), cfg); _log.emplace_back(seastar::make_lw_shared({_current_term, _log.next_idx(), std::move(cfg)})); leader_state().tracker.set_configuration(_log.get_configuration(), _log.last_idx()); // Leaving joint configuration may commit more entries diff --git a/raft/internal.hh b/raft/internal.hh index 8462736bde..da9a65b34b 100644 --- a/raft/internal.hh +++ b/raft/internal.hh @@ -14,66 +14,8 @@ namespace raft { namespace internal { -template -class tagged_uint64 { - uint64_t _val; -public: - tagged_uint64() : _val(0) {} - explicit tagged_uint64(uint64_t v) : _val(v) {} - tagged_uint64(const tagged_uint64&) = default; - tagged_uint64(tagged_uint64&&) = default; - tagged_uint64& operator=(const tagged_uint64&) = default; - auto operator<=>(const tagged_uint64&) const = default; - explicit operator bool() const { return _val != 0; } - - uint64_t get_value() const { - return _val; - } - operator uint64_t() const { - return get_value(); - } - tagged_uint64& operator++() { // pre increment - ++_val; - return *this; - } - tagged_uint64 operator++(int) { // post increment - uint64_t v = _val++; - return tagged_uint64(v); - } - tagged_uint64& operator--() { // pre decrement - --_val; - return *this; - } - tagged_uint64 operator--(int) { // post decrement - uint64_t v = _val--; - return tagged_uint64(v); - } - tagged_uint64 operator+(const tagged_uint64& o) const { - return tagged_uint64(_val + o._val); - } - tagged_uint64 operator-(const tagged_uint64& o) const { - return tagged_uint64(_val - o._val); - } - friend std::ostream& operator<<(std::ostream& os, const tagged_uint64& u) { - os << u._val; - return os; - } -}; - template using tagged_id = utils::tagged_uuid; } // end of namespace internal } // end of namespace raft - -namespace std { - -template -struct hash> { - size_t operator()(const raft::internal::tagged_uint64& val) const { - return hash()(val); - } -}; - -} // end of namespace std - diff --git a/raft/raft.hh b/raft/raft.hh index ff15c08148..5391b1e456 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -19,6 +19,7 @@ #include #include "bytes_ostream.hh" #include "utils/UUID.hh" +#include "utils/tagged_integer.hh" #include "internal.hh" #include "logical_clock.hh" @@ -39,11 +40,11 @@ using server_id = internal::tagged_id; using group_id = raft::internal::tagged_id; // This type represents the raft term -using term_t = internal::tagged_uint64; +using term_t = utils::tagged_integer; // This type represensts the index into the raft log -using index_t = internal::tagged_uint64; +using index_t = utils::tagged_integer; // Identifier for a read barrier request -using read_id = internal::tagged_uint64; +using read_id = utils::tagged_integer; // Opaque connection properties. May contain ip:port pair for instance. // This value is disseminated between cluster member diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 9a6c8f32d7..38fedb9a39 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -893,7 +893,7 @@ class persistence { if (b == _stored_entries.end() || (*b)->idx >= idx) { return b; } - return b + std::min((idx - (*b)->idx).get_value(), _stored_entries.size()); + return b + std::min(size_t(idx - (*b)->idx), _stored_entries.size()); } public: diff --git a/utils/tagged_integer.hh b/utils/tagged_integer.hh new file mode 100644 index 0000000000..56c7e6299a --- /dev/null +++ b/utils/tagged_integer.hh @@ -0,0 +1,93 @@ +/* + * Copyright 2020-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include +#include +#include + +namespace utils { + +template +class tagged_integer { +public: + using value_type = ValueType; +private: + value_type _value; +public: + tagged_integer() noexcept : _value(0) {} + explicit tagged_integer(value_type v) noexcept : _value(v) {} + + tagged_integer& operator=(value_type v) noexcept { + _value = v; + return *this; + } + + value_type value() const noexcept { return _value; } + operator value_type() const noexcept { return _value; } + + explicit operator bool() const { return _value != 0; } + + auto operator<=>(const tagged_integer& o) const = default; + + tagged_integer& operator++() noexcept { + ++_value; + return *this; + } + tagged_integer& operator--() noexcept { + --_value; + return *this; + } + + tagged_integer operator++(int) noexcept { + auto ret = *this; + ++_value; + return ret; + } + tagged_integer operator--(int) noexcept { + auto ret = *this; + --_value; + return ret; + } + + tagged_integer operator+(const tagged_integer& o) const { + return tagged_integer(_value + o._value); + } + tagged_integer operator-(const tagged_integer& o) const { + return tagged_integer(_value - o._value); + } + + tagged_integer& operator+=(const tagged_integer& o) const { + _value += o._value; + return *this; + } + tagged_integer& operator-=(const tagged_integer& o) const { + _value -= o._value; + return *this; + } +}; + +} // namespace utils + +namespace std { + +template +struct hash> { + size_t operator()(const utils::tagged_integer& x) const noexcept { + return hash{}(x.value()); + } +}; + +template +[[maybe_unused]] ostream& operator<<(ostream& s, const utils::tagged_integer& x) { + return s << x.value(); +} + +} // namespace std From d1817e9e1b88bb912508e53011a7083e35f20cb3 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 2 Apr 2023 12:41:20 +0300 Subject: [PATCH 06/11] utils: move generation-number to gms Although get_generation_number implementation is completely generic, it is used exclusively to seed the gossip generation number. Following patches will define a strong gms::generation_id type and this function should return it. Signed-off-by: Benny Halevy --- configure.py | 2 +- db/system_keyspace.cc | 6 +++--- {utils => gms}/generation-number.cc | 2 +- {utils => gms}/generation-number.hh | 2 +- gms/gossiper.cc | 4 ++-- service/storage_service.cc | 4 ++-- utils/CMakeLists.txt | 1 - 7 files changed, 10 insertions(+), 11 deletions(-) rename {utils => gms}/generation-number.cc (95%) rename {utils => gms}/generation-number.hh (88%) diff --git a/configure.py b/configure.py index 091d896bd7..26ecdfff8b 100755 --- a/configure.py +++ b/configure.py @@ -700,7 +700,7 @@ scylla_core = (['message/messaging_service.cc', 'utils/limiting_data_source.cc', 'utils/updateable_value.cc', 'utils/directories.cc', - 'utils/generation-number.cc', + 'gms/generation-number.cc', 'utils/rjson.cc', 'utils/human_readable.cc', 'utils/histogram_metrics_helper.cc', diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 33f056d2bf..13c522f3d5 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -51,7 +51,7 @@ #include "db/view/build_progress_virtual_reader.hh" #include "db/schema_tables.hh" #include "index/built_indexes_virtual_reader.hh" -#include "utils/generation-number.hh" +#include "gms/generation-number.hh" #include "db/virtual_table.hh" #include "service/storage_service.hh" #include "protocol_server.hh" @@ -3125,11 +3125,11 @@ future system_keyspace::increment_and_get_generation() { // seconds-since-epoch isn't a foolproof new generation // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), // but it's as close as sanely possible - generation = utils::get_generation_number(); + generation = gms::get_generation_number(); } else { // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. int stored_generation = rs->one().template get_as("gossip_generation") + 1; - int now = utils::get_generation_number(); + int now = gms::get_generation_number(); if (stored_generation >= now) { slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}." "See CASSANDRA-3654 if you experience problems", stored_generation, now); diff --git a/utils/generation-number.cc b/gms/generation-number.cc similarity index 95% rename from utils/generation-number.cc rename to gms/generation-number.cc index 34daee0ab6..3c8a6cd05a 100644 --- a/utils/generation-number.cc +++ b/gms/generation-number.cc @@ -9,7 +9,7 @@ #include #include "generation-number.hh" -namespace utils { +namespace gms { int get_generation_number() { using namespace std::chrono; diff --git a/utils/generation-number.hh b/gms/generation-number.hh similarity index 88% rename from utils/generation-number.hh rename to gms/generation-number.hh index deef890bd4..f2444ec1ad 100644 --- a/utils/generation-number.hh +++ b/gms/generation-number.hh @@ -8,7 +8,7 @@ #pragma once -namespace utils { +namespace gms { int get_generation_number(); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e8b0ff3a92..6e22c618e8 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -38,7 +38,7 @@ #include #include #include -#include "utils/generation-number.hh" +#include "gms/generation-number.hh" #include "locator/token_metadata.hh" #include "utils/exceptions.hh" @@ -567,7 +567,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint int local_generation = local_state.get_heart_beat_state().get_generation(); int remote_generation = remote_state.get_heart_beat_state().get_generation(); logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation); - if (remote_generation > utils::get_generation_number() + MAX_GENERATION_DIFFERENCE) { + if (remote_generation > get_generation_number() + MAX_GENERATION_DIFFERENCE) { // assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself) logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}", node, local_generation, remote_generation); diff --git a/service/storage_service.cc b/service/storage_service.cc index d48f3716a2..85b24a9c60 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -59,7 +59,7 @@ #include "repair/repair.hh" #include "repair/row_level.hh" #include "service/priority_manager.hh" -#include "utils/generation-number.hh" +#include "gms/generation-number.hh" #include #include #include @@ -2948,7 +2948,7 @@ future<> storage_service::start_gossiping() { co_await ss._sys_ks.local().get_local_tokens(), cdc_gen_ts); ss._gossiper.force_newer_generation(); - co_await ss._gossiper.start_gossiping(utils::get_generation_number()); + co_await ss._gossiper.start_gossiping(gms::get_generation_number()); } catch (...) { should_stop_gossiper = true; } diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index a2ecbd1b82..185a9787e2 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -21,7 +21,6 @@ target_sources(utils error_injection.cc exceptions.cc file_lock.cc - generation-number.cc gz/crc_combine.cc gz/crc_combine_table.cc hashers.cc From 2d20ee7d61cb69328658dfba49a3938451f976c0 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 22 Feb 2023 11:04:33 +0200 Subject: [PATCH 07/11] gms: version_generator: define version_type and generation_type strong types Derived from utils::tagged_integer, using different tags, the types are incompatible with each other and require explicit typecasting to- and from- their value type. Signed-off-by: Benny Halevy --- db/system_keyspace.cc | 8 ++++---- gms/generation-number.cc | 8 ++++++-- gms/generation-number.hh | 6 +++++- gms/gossiper.cc | 4 ++-- gms/gossiper.hh | 3 ++- gms/heart_beat_state.hh | 2 +- gms/version_generator.cc | 4 ++-- gms/version_generator.hh | 6 +++++- gms/versioned_value.hh | 4 ++-- service/storage_service.cc | 2 +- test/manual/gossip.cc | 2 +- 11 files changed, 31 insertions(+), 18 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 13c522f3d5..f28ab877c0 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3120,7 +3120,7 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history future system_keyspace::increment_and_get_generation() { auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL); auto rs = co_await _qp.local().execute_internal(req, cql3::query_processor::cache_internal::yes); - int generation; + gms::generation_type generation; if (rs->empty() || !rs->one().has("gossip_generation")) { // seconds-since-epoch isn't a foolproof new generation // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), @@ -3128,8 +3128,8 @@ future system_keyspace::increment_and_get_generation() { generation = gms::get_generation_number(); } else { // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. - int stored_generation = rs->one().template get_as("gossip_generation") + 1; - int now = gms::get_generation_number(); + auto stored_generation = gms::generation_type(rs->one().template get_as("gossip_generation") + 1); + auto now = gms::get_generation_number(); if (stored_generation >= now) { slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}." "See CASSANDRA-3654 if you experience problems", stored_generation, now); @@ -3139,7 +3139,7 @@ future system_keyspace::increment_and_get_generation() { } } req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL); - co_await _qp.local().execute_internal(req, {generation}, cql3::query_processor::cache_internal::yes); + co_await _qp.local().execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes); co_await force_blocking_flush(LOCAL); co_return generation; } diff --git a/gms/generation-number.cc b/gms/generation-number.cc index 3c8a6cd05a..e6919156d9 100644 --- a/gms/generation-number.cc +++ b/gms/generation-number.cc @@ -6,16 +6,20 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include #include #include "generation-number.hh" namespace gms { -int get_generation_number() { +generation_type get_generation_number() { using namespace std::chrono; auto now = high_resolution_clock::now().time_since_epoch(); int generation_number = duration_cast(now).count(); - return generation_number; + auto ret = generation_type(generation_number); + // Make sure the clock didn't overflow the 32 bits value + assert(ret.value() == generation_number); + return ret; } } diff --git a/gms/generation-number.hh b/gms/generation-number.hh index f2444ec1ad..e9d1baa8f9 100644 --- a/gms/generation-number.hh +++ b/gms/generation-number.hh @@ -8,8 +8,12 @@ #pragma once +#include "utils/tagged_integer.hh" + namespace gms { -int get_generation_number(); +using generation_type = utils::tagged_integer; + +generation_type get_generation_number(); } diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 6e22c618e8..b6baa4a081 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1842,7 +1842,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l } } -future<> gossiper::start_gossiping(int generation_nbr, std::map preload_local_states, gms::advertise_myself advertise) { +future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map preload_local_states, gms::advertise_myself advertise) { co_await container().invoke_on_all([advertise] (gossiper& g) { if (!advertise) { g._advertise_myself = false; @@ -1851,7 +1851,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map 0) { - generation_nbr = _force_gossip_generation(); + generation_nbr = gms::generation_type(_force_gossip_generation()); logger.warn("Use the generation number provided by user: generation = {}", generation_nbr); } endpoint_state& local_state = _endpoint_state_map[get_broadcast_address()]; diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 8b9e05d762..9179bfff82 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -19,6 +19,7 @@ #include "utils/atomic_vector.hh" #include "utils/UUID.hh" #include "utils/fb_utilities.hh" +#include "gms/generation-number.hh" #include "gms/versioned_value.hh" #include "gms/application_state.hh" #include "gms/endpoint_state.hh" @@ -509,7 +510,7 @@ public: * existing nodes can talk to the replacing node. So the probability of * replacing node being talked to is pretty high. */ - future<> start_gossiping(int generation_nbr, std::map preload_local_states = {}, + future<> start_gossiping(gms::generation_type generation_nbr, std::map preload_local_states = {}, gms::advertise_myself advertise = gms::advertise_myself::yes); public: diff --git a/gms/heart_beat_state.hh b/gms/heart_beat_state.hh index a379a59bfe..794448df05 100644 --- a/gms/heart_beat_state.hh +++ b/gms/heart_beat_state.hh @@ -43,7 +43,7 @@ public: } void update_heart_beat() noexcept { - _version = version_generator::get_next_version(); + _version = version_generator::get_next_version().value(); } int32_t get_heart_beat_version() const noexcept { diff --git a/gms/version_generator.cc b/gms/version_generator.cc index b599595221..586e2761e3 100644 --- a/gms/version_generator.cc +++ b/gms/version_generator.cc @@ -14,9 +14,9 @@ namespace gms { namespace version_generator { // In the original Cassandra code, version was an AtomicInteger. // For us, we run the gossiper on a single CPU, and don't need to use atomics. -static int version = 0; +static version_type version; -int get_next_version() noexcept +version_type get_next_version() noexcept { return ++version; } diff --git a/gms/version_generator.hh b/gms/version_generator.hh index e16ae20412..b0d4312f3b 100644 --- a/gms/version_generator.hh +++ b/gms/version_generator.hh @@ -10,8 +10,12 @@ #pragma once +#include "utils/tagged_integer.hh" + namespace gms { +using version_type = utils::tagged_integer; + /** * A unique version number generator for any state that is generated by the * local node. @@ -19,7 +23,7 @@ namespace gms { namespace version_generator { - int get_next_version() noexcept; + version_type get_next_version() noexcept; } } // namespace gms diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 1136899993..fabdfe454f 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -69,7 +69,7 @@ public: } public: - versioned_value(const sstring& value, int version = version_generator::get_next_version()) + versioned_value(const sstring& value, int version = version_generator::get_next_version().value()) : _version(version), _value(value) { #if 0 // blindly interning everything is somewhat suboptimal -- lots of VersionedValues are unique -- @@ -80,7 +80,7 @@ public: #endif } - versioned_value(sstring&& value, int version = version_generator::get_next_version()) noexcept + versioned_value(sstring&& value, int version = version_generator::get_next_version().value()) noexcept : _version(version), _value(std::move(value)) { } diff --git a/service/storage_service.cc b/service/storage_service.cc index 85b24a9c60..a5be0c4a22 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1408,7 +1408,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi slogger.info("Starting up server gossip"); - auto generation_number = co_await _sys_ks.local().increment_and_get_generation(); + auto generation_number = gms::generation_type(co_await _sys_ks.local().increment_and_get_generation()); auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip); co_await _gossiper.start_gossiping(generation_number, app_states, advertise); diff --git a/test/manual/gossip.cc b/test/manual/gossip.cc index 45c74b1e4c..d31418185d 100644 --- a/test/manual/gossip.cc +++ b/test/manual/gossip.cc @@ -98,7 +98,7 @@ int main(int ac, char ** av) { using namespace std::chrono; auto now = high_resolution_clock::now().time_since_epoch(); - int generation_number = duration_cast(now).count(); + auto generation_number = gms::generation_type(duration_cast(now).count()); gossiper.local().start_gossiping(generation_number, app_states).get(); static double load = 0.5; for (;;) { From b638571cb0dd1303f8f7ca853d79afe758751ebe Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 22 Feb 2023 14:53:35 +0200 Subject: [PATCH 08/11] gms: versioned_value: use version_type Adjust scylla-gdb.get_gms_version_value to get the versioned_value version as version_type (utils::tagged_integer). Signed-off-by: Benny Halevy --- api/failure_detector.cc | 2 +- gms/gossiper.cc | 44 ++++++++++++++++++++-------------------- gms/gossiper.hh | 8 ++++---- gms/versioned_value.hh | 8 ++++---- idl/gossip_digest.idl.hh | 3 ++- scylla-gdb.py | 17 +++++++++++++--- 6 files changed, 47 insertions(+), 35 deletions(-) diff --git a/api/failure_detector.cc b/api/failure_detector.cc index a336296f11..3de451c0c4 100644 --- a/api/failure_detector.cc +++ b/api/failure_detector.cc @@ -32,7 +32,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) { // method that the state index are static but the name can be changed. version_val.application_state = static_cast::type>(a.first); version_val.value = a.second.value(); - version_val.version = a.second.version(); + version_val.version = a.second.version().value(); val.application_state.push(version_val); } res.push_back(val); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b6baa4a081..5a8d7c8507 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -154,8 +154,8 @@ void gossiper::do_sort(utils::chunked_vector& g_digest_list) { for (auto g_digest : g_digest_list) { auto ep = g_digest.get_endpoint(); auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep); - int version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : 0; - int diff_version = ::abs(version - g_digest.get_max_version()); + version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type(); + int32_t diff_version = ::abs(version.value() - g_digest.get_max_version()); diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version)); } @@ -363,7 +363,7 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector delta_ep_state_map; for (auto g_digest : ack_msg_digest) { inet_address addr = g_digest.get_endpoint(); - auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version()); + auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, version_type(g_digest.get_max_version())); if (local_ep_state_ptr) { delta_ep_state_map.emplace(addr, *local_ep_state_ptr); } @@ -583,8 +583,8 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint } else if (remote_generation == local_generation) { if (listener_notification) { // find maximum state - int local_max_version = this->get_max_endpoint_state_version(local_state); - int remote_max_version = this->get_max_endpoint_state_version(remote_state); + auto local_max_version = this->get_max_endpoint_state_version(local_state); + auto remote_max_version = this->get_max_endpoint_state_version(remote_state); if (remote_max_version > local_max_version) { // apply states, but do not notify since there is no major change co_await this->apply_new_states(node, local_state, remote_state); @@ -1092,8 +1092,8 @@ std::set gossiper::get_unreachable_members() const { return ret; } -int gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcept { - int max_version = state.get_heart_beat_state().get_heart_beat_version(); +version_type gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcept { + auto max_version = version_type(state.get_heart_beat_state().get_heart_beat_version()); for (auto& entry : state.get_application_state_map()) { auto& value = entry.second; max_version = std::max(max_version, value.version()); @@ -1122,7 +1122,7 @@ void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quaran void gossiper::make_random_gossip_digest(utils::chunked_vector& g_digests) { int generation = 0; - int max_version = 0; + version_type max_version; // local epstate will be part of _endpoint_state_map utils::chunked_vector endpoints; @@ -1137,7 +1137,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g generation = eps.get_heart_beat_state().get_generation(); max_version = get_max_endpoint_state_version(eps); } - g_digests.push_back(gossip_digest(endpoint, generation, max_version)); + g_digests.push_back(gossip_digest(endpoint, generation, max_version.value())); } #if 0 if (logger.isTraceEnabled()) { @@ -1418,7 +1418,7 @@ std::set gossiper::get_nodes_with_host_id(locator::host_id ho return nodes; } -std::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, int version) { +std::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) { std::optional reqd_endpoint_state; auto es = get_endpoint_state_for_endpoint_ptr(for_endpoint); if (es) { @@ -1431,7 +1431,7 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a * than the version passed in. In this case we also send the old * heart beat and throw it away on the receiver if it is redundant. */ - int local_hb_version = eps.get_heart_beat_state().get_heart_beat_version(); + auto local_hb_version = version_type(eps.get_heart_beat_state().get_heart_beat_version()); if (local_hb_version > version) { reqd_endpoint_state.emplace(eps.get_heart_beat_state()); logger.trace("local heartbeat version {} greater than {} for {}", local_hb_version, version, for_endpoint); @@ -1485,9 +1485,9 @@ void gossiper::update_timestamp_for_nodes(const std::map local_generation) { update = true; } else if (remote_generation == local_generation) { - int local_version = get_max_endpoint_state_version(*local_endpoint_state); + auto local_version = get_max_endpoint_state_version(*local_endpoint_state); int remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version(); - if (remote_version > local_version) { + if (version_type(remote_version) > local_version) { update = true; } } @@ -1757,15 +1757,15 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati } void gossiper::request_all(gossip_digest& g_digest, - utils::chunked_vector& delta_gossip_digest_list, int remote_generation) { + utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation) { /* We are here since we have no data for this endpoint locally so request everthing. */ - delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, 0); + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation.value(), 0); logger.trace("request_all for {}", g_digest.get_endpoint()); } void gossiper::send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, - int max_remote_version) { + version_type max_remote_version) { auto ep = g_digest.get_endpoint(); logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version); auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version); @@ -1789,8 +1789,8 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l } } for (gossip_digest& g_digest : g_digest_list) { - int remote_generation = g_digest.get_generation(); - int max_remote_version = g_digest.get_max_version(); + auto remote_generation = generation_type(g_digest.get_generation()); + auto max_remote_version = version_type(g_digest.get_max_version()); /* Get state associated with the end point in digest */ auto&& ep = g_digest.get_endpoint(); auto es = get_endpoint_state_for_endpoint_ptr(ep); @@ -1801,9 +1801,9 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l */ if (es) { endpoint_state& ep_state_ptr = *es; - int local_generation = ep_state_ptr.get_heart_beat_state().get_generation(); + auto local_generation = generation_type(ep_state_ptr.get_heart_beat_state().get_generation()); /* get the max version of all keys in the state associated with this endpoint */ - int max_local_version = get_max_endpoint_state_version(ep_state_ptr); + auto max_local_version = get_max_endpoint_state_version(ep_state_ptr); logger.trace("examine_gossiper(): ep={}, remote={}.{}, local={}.{}", ep, remote_generation, max_remote_version, local_generation, max_local_version); if (remote_generation == local_generation && max_remote_version == max_local_version) { @@ -1815,7 +1815,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l request_all(g_digest, delta_gossip_digest_list, remote_generation); } else if (remote_generation < local_generation) { /* send all data with generation = localgeneration and version > 0 */ - send_all(g_digest, delta_ep_state_map, 0); + send_all(g_digest, delta_ep_state_map, version_type()); } else if (remote_generation == local_generation) { /* * If the max remote version is greater then we request the @@ -1829,7 +1829,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l */ if (max_remote_version > max_local_version) { logger.trace("examine_gossiper(): requesting version > {} from {}", max_local_version, g_digest.get_endpoint()); - delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, max_local_version); + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation.value(), max_local_version.value()); } else if (max_remote_version < max_local_version) { /* send all data with generation = localgeneration and version > max_remote_version */ send_all(g_digest, delta_ep_state_map, max_remote_version); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 9179bfff82..a4c2f84a2e 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -284,7 +284,7 @@ public: * @param ep_state * @return */ - int get_max_endpoint_state_version(endpoint_state state) const noexcept; + version_type get_max_endpoint_state_version(endpoint_state state) const noexcept; private: @@ -404,7 +404,7 @@ public: std::set get_nodes_with_host_id(locator::host_id host_id) const; - std::optional get_state_for_version_bigger_than(inet_address for_endpoint, int version); + std::optional get_state_for_version_bigger_than(inet_address for_endpoint, version_type version); /** * determine which endpoint started up earlier @@ -458,10 +458,10 @@ private: future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); /* Request all the state for the endpoint in the g_digest */ - void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, int remote_generation); + void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation); /* Send all the data with version greater than max_remote_version */ - void send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, int max_remote_version); + void send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, version_type max_remote_version); public: /* diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index fabdfe454f..2763d378b2 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -36,7 +36,7 @@ namespace gms { */ class versioned_value { - int _version; + version_type _version; sstring _value; public: // this must be a char that cannot be present in any token @@ -60,7 +60,7 @@ public: // values for ApplicationState.REMOVAL_COORDINATOR static constexpr const char* REMOVAL_COORDINATOR = "REMOVER"; - int version() const noexcept { return _version; }; + version_type version() const noexcept { return _version; }; const sstring& value() const noexcept { return _value; }; public: bool operator==(const versioned_value& other) const noexcept { @@ -69,7 +69,7 @@ public: } public: - versioned_value(const sstring& value, int version = version_generator::get_next_version().value()) + versioned_value(const sstring& value, version_type version = version_generator::get_next_version()) : _version(version), _value(value) { #if 0 // blindly interning everything is somewhat suboptimal -- lots of VersionedValues are unique -- @@ -80,7 +80,7 @@ public: #endif } - versioned_value(sstring&& value, int version = version_generator::get_next_version().value()) noexcept + versioned_value(sstring&& value, version_type version = version_generator::get_next_version()) noexcept : _version(version), _value(std::move(value)) { } diff --git a/idl/gossip_digest.idl.hh b/idl/gossip_digest.idl.hh index bde6c5d099..b4dee6ff86 100644 --- a/idl/gossip_digest.idl.hh +++ b/idl/gossip_digest.idl.hh @@ -7,6 +7,7 @@ */ #include "gms/inet_address_serializer.hh" +#include "gms/version_generator.hh" namespace gms { enum class application_state:int { @@ -29,7 +30,7 @@ enum class application_state:int { class versioned_value { sstring value(); - int version(); + gms::version_type version(); }; class heart_beat_state { diff --git a/scylla-gdb.py b/scylla-gdb.py index 9c841ea852..59658d9c80 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -4028,16 +4028,27 @@ class scylla_netw(gdb.Command): gdb.write(' %s\n' % (conn['_stats'])) +def get_tagged_integer_type(i): + try: + return i['_value'] + except gdb.error: + return i + + +def get_gms_generation_or_version(i): + return get_tagged_integer_type(i) + + def get_gms_versioned_value(vv): try: return { - 'version': vv['_version'], - 'value': vv['_value'] + 'version': get_gms_generation_or_version(vv['_version']), + 'value': vv['_value'], } except gdb.error: return { 'version': vv['version'], - 'value': vv['value'] + 'value': vv['value'], } From 4cdad8bc8bc68cfcf1b2677da78e2b87aef77631 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 22 Feb 2023 14:58:49 +0200 Subject: [PATCH 09/11] gms: heart_beat_state: use generation_type and version_type Define default constructor as heart_beat_state(gms::generation_type(0)) Signed-off-by: Benny Halevy --- api/failure_detector.cc | 4 +- api/gossiper.cc | 8 ++-- api/storage_service.cc | 4 +- gms/endpoint_state.cc | 2 +- gms/endpoint_state.hh | 2 +- gms/gossiper.cc | 82 ++++++++++++++++++++-------------------- gms/gossiper.hh | 17 +++++---- gms/heart_beat_state.hh | 23 ++++++----- idl/gossip_digest.idl.hh | 4 +- 9 files changed, 75 insertions(+), 71 deletions(-) diff --git a/api/failure_detector.cc b/api/failure_detector.cc index 3de451c0c4..93d38b3d47 100644 --- a/api/failure_detector.cc +++ b/api/failure_detector.cc @@ -23,8 +23,8 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) { fd::endpoint_state val; val.addrs = fmt::to_string(i.first); val.is_alive = i.second.is_alive(); - val.generation = i.second.get_heart_beat_state().get_generation(); - val.version = i.second.get_heart_beat_state().get_heart_beat_version(); + val.generation = i.second.get_heart_beat_state().get_generation().value(); + val.version = i.second.get_heart_beat_state().get_heart_beat_version().value(); val.update_time = i.second.get_update_timestamp().time_since_epoch().count(); for (auto a : i.second.get_application_state_map()) { fd::version_value version_val; diff --git a/api/gossiper.cc b/api/gossiper.cc index 76225df62f..a109ae4dc4 100644 --- a/api/gossiper.cc +++ b/api/gossiper.cc @@ -34,15 +34,15 @@ void set_gossiper(http_context& ctx, routes& r, gms::gossiper& g) { httpd::gossiper_json::get_current_generation_number.set(r, [&g] (std::unique_ptr req) { gms::inet_address ep(req->param["addr"]); - return g.get_current_generation_number(ep).then([] (int res) { - return make_ready_future(res); + return g.get_current_generation_number(ep).then([] (gms::generation_type res) { + return make_ready_future(res.value()); }); }); httpd::gossiper_json::get_current_heart_beat_version.set(r, [&g] (std::unique_ptr req) { gms::inet_address ep(req->param["addr"]); - return g.get_current_heart_beat_version(ep).then([] (int res) { - return make_ready_future(res); + return g.get_current_heart_beat_version(ep).then([] (gms::version_type res) { + return make_ready_future(res.value()); }); }); diff --git a/api/storage_service.cc b/api/storage_service.cc index a53158870c..9583959b61 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -655,8 +655,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { gms::inet_address ep(utils::fb_utilities::get_broadcast_address()); - return g.get_current_generation_number(ep).then([](int res) { - return make_ready_future(res); + return g.get_current_generation_number(ep).then([](gms::generation_type res) { + return make_ready_future(res.value()); }); }); diff --git a/gms/endpoint_state.cc b/gms/endpoint_state.cc index e4c59b1e97..878f90367a 100644 --- a/gms/endpoint_state.cc +++ b/gms/endpoint_state.cc @@ -14,7 +14,7 @@ namespace gms { -static_assert(!std::is_default_constructible_v); +static_assert(std::is_default_constructible_v); static_assert(std::is_nothrow_copy_constructible_v); static_assert(std::is_nothrow_move_constructible_v); diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 6dc3d5ad70..c33f077eeb 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -43,7 +43,7 @@ public: } endpoint_state() noexcept - : _heart_beat_state(0) + : _heart_beat_state() , _update_timestamp(clk::now()) , _is_alive(true) { update_is_normal(); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 5a8d7c8507..d7942c9b3b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -50,7 +50,7 @@ static logging::logger logger("gossip"); constexpr std::chrono::milliseconds gossiper::INTERVAL; constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME; -constexpr int64_t gossiper::MAX_GENERATION_DIFFERENCE; +constexpr generation_type::value_type gossiper::MAX_GENERATION_DIFFERENCE; netw::msg_addr gossiper::get_msg_addr(inet_address to) const noexcept { return msg_addr{to, _default_cpuid}; @@ -116,7 +116,7 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s [ep, this] { auto es = get_endpoint_state_for_endpoint_ptr(ep); if (es) { - return es->get_heart_beat_state().get_heart_beat_version(); + return es->get_heart_beat_state().get_heart_beat_version().value(); } else { return 0; } @@ -415,9 +415,9 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optionalsecond; - int64_t current_generation_number = generation_number_opt ? - generation_number_opt.value() : es->get_heart_beat_state().get_generation(); + auto saved_generation_number = it->second; + auto current_generation_number = generation_number_opt ? + generation_type(generation_number_opt.value()) : es->get_heart_beat_state().get_generation(); respond = saved_generation_number == current_generation_number; logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}", from, saved_generation_number, current_generation_number); @@ -443,10 +443,10 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional if (generation_number_opt) { auto es = this->get_endpoint_state_for_endpoint_ptr(from); if (es) { - int local_generation = es->get_heart_beat_state().get_generation(); + auto local_generation = es->get_heart_beat_state().get_generation(); logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", from, generation_number_opt.value(), local_generation); - if (local_generation != generation_number_opt.value()) { + if (local_generation.value() != generation_number_opt.value()) { logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}", from, generation_number_opt.value(), local_generation); co_return; @@ -564,10 +564,10 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint auto es = this->get_endpoint_state_for_endpoint_ptr(node); if (es) { endpoint_state& local_state = *es; - int local_generation = local_state.get_heart_beat_state().get_generation(); - int remote_generation = remote_state.get_heart_beat_state().get_generation(); + auto local_generation = local_state.get_heart_beat_state().get_generation(); + auto remote_generation = remote_state.get_heart_beat_state().get_generation(); logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation); - if (remote_generation > get_generation_number() + MAX_GENERATION_DIFFERENCE) { + if (remote_generation > generation_type(get_generation_number().value() + MAX_GENERATION_DIFFERENCE)) { // assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself) logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}", node, local_generation, remote_generation); @@ -765,7 +765,7 @@ future> gossiper::get_live_members_synchronized() { }); } -future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version) { +future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version) { auto last = gossiper::clk::now(); auto diff = gossiper::clk::duration(0); auto echo_interval = std::chrono::milliseconds(2000); @@ -774,7 +774,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_ bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}, status = started", node); - co_await _messaging.send_gossip_echo(netw::msg_addr(node), gossip_generation, max_duration); + co_await _messaging.send_gossip_echo(netw::msg_addr(node), gossip_generation.value(), max_duration); logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node); } catch (...) { failed = true; @@ -1093,7 +1093,7 @@ std::set gossiper::get_unreachable_members() const { } version_type gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcept { - auto max_version = version_type(state.get_heart_beat_state().get_heart_beat_version()); + auto max_version = state.get_heart_beat_state().get_heart_beat_version(); for (auto& entry : state.get_application_state_map()) { auto& value = entry.second; max_version = std::max(max_version, value.version()); @@ -1121,7 +1121,7 @@ void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quaran } void gossiper::make_random_gossip_digest(utils::chunked_vector& g_digests) { - int generation = 0; + generation_type generation; version_type max_version; // local epstate will be part of _endpoint_state_map @@ -1137,7 +1137,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g generation = eps.get_heart_beat_state().get_generation(); max_version = get_max_endpoint_state_version(eps); } - g_digests.push_back(gossip_digest(endpoint, generation, max_version.value())); + g_digests.push_back(gossip_digest(endpoint, generation.value(), max_version.value())); } #if 0 if (logger.isTraceEnabled()) { @@ -1181,7 +1181,7 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi future<> gossiper::advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id) { auto& state = get_endpoint_state(endpoint); // remember this node's generation - int generation = state.get_heart_beat_state().get_generation(); + auto generation = state.get_heart_beat_state().get_generation(); logger.info("Removing host: {}", host_id); auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms); logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); @@ -1228,8 +1228,8 @@ future<> gossiper::assassinate_endpoint(sstring address) { auto permit = gossiper.lock_endpoint(endpoint).get0(); auto es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); auto now = gossiper.now(); - int gen = std::chrono::duration_cast((now + std::chrono::seconds(60)).time_since_epoch()).count(); - int ver = 9999; + generation_type gen(std::chrono::duration_cast((now + std::chrono::seconds(60)).time_since_epoch()).count()); + version_type ver(9999); endpoint_state ep_state = es ? *es : endpoint_state(heart_beat_state(gen, ver)); std::vector tokens; logger.warn("Assassinating {} via gossip", endpoint); @@ -1240,8 +1240,8 @@ future<> gossiper::assassinate_endpoint(sstring address) { throw std::runtime_error(format("Unable to calculate tokens for {}", endpoint)); } - int generation = ep_state.get_heart_beat_state().get_generation(); - int heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version(); + auto generation = ep_state.get_heart_beat_state().get_generation(); + auto heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version(); auto ring_delay = std::chrono::milliseconds(gossiper._gcfg.ring_delay_ms); logger.info("Sleeping for {} ms to ensure {} does not change", ring_delay.count(), endpoint); // make sure it did not change @@ -1273,13 +1273,13 @@ future<> gossiper::assassinate_endpoint(sstring address) { }); } -future gossiper::get_current_generation_number(inet_address endpoint) { +future gossiper::get_current_generation_number(inet_address endpoint) { return container().invoke_on(0, [endpoint] (auto&& gossiper) { return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation(); }); } -future gossiper::get_current_heart_beat_version(inet_address endpoint) { +future gossiper::get_current_heart_beat_version(inet_address endpoint) { return container().invoke_on(0, [endpoint] (auto&& gossiper) { return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version(); }); @@ -1431,7 +1431,7 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a * than the version passed in. In this case we also send the old * heart beat and throw it away on the receiver if it is redundant. */ - auto local_hb_version = version_type(eps.get_heart_beat_state().get_heart_beat_version()); + auto local_hb_version = eps.get_heart_beat_state().get_heart_beat_version(); if (local_hb_version > version) { reqd_endpoint_state.emplace(eps.get_heart_beat_state()); logger.trace("local heartbeat version {} greater than {} for {}", local_hb_version, version, for_endpoint); @@ -1452,7 +1452,7 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a return reqd_endpoint_state; } -int gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) { +generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) { auto* ep1 = get_endpoint_state_for_endpoint_ptr(addr1); auto* ep2 = get_endpoint_state_for_endpoint_ptr(addr2); if (!ep1 || !ep2) { @@ -1480,14 +1480,14 @@ void gossiper::update_timestamp_for_nodes(const std::mapget_heart_beat_state().get_generation(); - int remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation(); + auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation(); + auto remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation(); if (remote_generation > local_generation) { update = true; } else if (remote_generation == local_generation) { auto local_version = get_max_endpoint_state_version(*local_endpoint_state); - int remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version(); - if (version_type(remote_version) > local_version) { + auto remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version(); + if (remote_version > local_version) { update = true; } } @@ -1516,10 +1516,10 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { local_state.mark_dead(); msg_addr id = get_msg_addr(addr); - int64_t generation = _endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation(); + auto generation = _endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation(); logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation); // Do it in the background. - (void)_messaging.send_gossip_echo(id, generation, std::chrono::milliseconds(15000)).then([this, addr] { + (void)_messaging.send_gossip_echo(id, generation.value(), std::chrono::milliseconds(15000)).then([this, addr] { logger.trace("Got EchoMessage Reply"); // After sending echo message, the Node might not be in the // _endpoint_state_map anymore, use the reference of local_state @@ -1801,7 +1801,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l */ if (es) { endpoint_state& ep_state_ptr = *es; - auto local_generation = generation_type(ep_state_ptr.get_heart_beat_state().get_generation()); + auto local_generation = ep_state_ptr.get_heart_beat_state().get_generation(); /* get the max version of all keys in the state associated with this endpoint */ auto max_local_version = get_max_endpoint_state_version(ep_state_ptr); logger.trace("examine_gossiper(): ep={}, remote={}.{}, local={}.{}", ep, @@ -1882,24 +1882,24 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map }); } -future> +future gossiper::get_generation_for_nodes(std::unordered_set nodes) { - std::unordered_map ret; + generation_for_nodes ret; for (const auto& node : nodes) { auto es = get_endpoint_state_for_endpoint_ptr(node); if (es) { auto current_generation_number = es->get_heart_beat_state().get_generation(); ret.emplace(node, current_generation_number); } else { - return make_exception_future>( + return make_exception_future( std::runtime_error(format("Can not find generation number for node={}", node))); } } - return make_ready_future>(std::move(ret)); + return make_ready_future(std::move(ret)); } -future<> gossiper::advertise_to_nodes(std::unordered_map advertise_to_nodes) { - return container().invoke_on_all([advertise_to_nodes] (auto& g) { +future<> gossiper::advertise_to_nodes(generation_for_nodes advertise_to_nodes) { + return container().invoke_on_all([advertise_to_nodes = std::move(advertise_to_nodes)] (auto& g) { g._advertise_to_nodes = advertise_to_nodes; g._advertise_myself = true; }); @@ -2006,12 +2006,12 @@ future<> gossiper::add_saved_endpoint(inet_address ep) { } //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) - auto ep_state = endpoint_state(heart_beat_state(0)); + auto ep_state = endpoint_state(); auto es = get_endpoint_state_for_endpoint_ptr(ep); if (es) { ep_state = *es; logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state); - ep_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(0)); + ep_state.set_heart_beat_state_and_update_timestamp(heart_beat_state()); } const auto tmptr = get_token_metadata_ptr(); auto tokens = tmptr->get_tokens(ep); @@ -2126,14 +2126,14 @@ future<> gossiper::do_stop_gossiping() { logger.info("My status = {}", get_gossip_status(*my_ep_state)); } if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) { - int local_generation = my_ep_state->get_heart_beat_state().get_generation(); + auto local_generation = my_ep_state->get_heart_beat_state().get_generation(); logger.info("Announcing shutdown"); add_local_application_state(application_state::STATUS, versioned_value::shutdown(true)).get(); auto live_endpoints = _live_endpoints; for (inet_address addr : live_endpoints) { msg_addr id = get_msg_addr(addr); logger.info("Sending a GossipShutdown to {} with generation {}", id.addr, local_generation); - _messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation).then_wrapped([id] (auto&&f) { + _messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation.value()).then_wrapped([id] (auto&&f) { try { f.get(); logger.trace("Got GossipShutdown Reply"); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a4c2f84a2e..c775d866cf 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -97,6 +97,7 @@ class gossiper : public seastar::async_sharded_service, public seastar public: using clk = seastar::lowres_system_clock; using ignore_features_of_local_node = bool_class; + using generation_for_nodes = std::unordered_map; private: using messaging_verb = netw::messaging_verb; using messaging_service = netw::messaging_service; @@ -124,17 +125,17 @@ private: std::unordered_map _ack_handlers; bool _advertise_myself = true; // Map ip address and generation number - std::unordered_map _advertise_to_nodes; + generation_for_nodes _advertise_to_nodes; future<> _failure_detector_loop_done{make_ready_future<>()} ; rpc::no_wait_type background_msg(sstring type, noncopyable_function(gossiper&)> fn); public: // Get current generation number for the given nodes - future> + future get_generation_for_nodes(std::unordered_set nodes); // Only respond echo message listed in nodes with the generation number - future<> advertise_to_nodes(std::unordered_map advertise_to_nodes = {}); + future<> advertise_to_nodes(generation_for_nodes advertise_to_nodes = {}); const sstring& get_cluster_name() const noexcept; const sstring& get_partitioner_name() const noexcept { @@ -184,7 +185,7 @@ public: // Maximimum difference between remote generation value and generation // value this node would get if this node were restarted that we are // willing to accept about a peer. - static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365; + static constexpr generation_type::value_type MAX_GENERATION_DIFFERENCE = 86400 * 365; std::chrono::milliseconds fat_client_timeout; std::chrono::milliseconds quarantine_delay() const noexcept; @@ -358,8 +359,8 @@ public: future<> assassinate_endpoint(sstring address); public: - future get_current_generation_number(inet_address endpoint); - future get_current_heart_beat_version(inet_address endpoint); + future get_current_generation_number(inet_address endpoint); + future get_current_heart_beat_version(inet_address endpoint); bool is_gossip_only_member(inet_address endpoint); bool is_safe_for_bootstrap(inet_address endpoint); @@ -409,7 +410,7 @@ public: /** * determine which endpoint started up earlier */ - int compare_endpoint_startup(inet_address addr1, inet_address addr2); + generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2); /** * Return the rpc address associated with an endpoint as a string. @@ -616,7 +617,7 @@ public: int get_up_endpoint_count() const noexcept; private: future<> failure_detector_loop(); - future<> failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version); + future<> failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version); future<> update_live_endpoints_version(); }; diff --git a/gms/heart_beat_state.hh b/gms/heart_beat_state.hh index 794448df05..70df7efa69 100644 --- a/gms/heart_beat_state.hh +++ b/gms/heart_beat_state.hh @@ -10,6 +10,7 @@ #pragma once +#include "gms/generation-number.hh" #include "gms/version_generator.hh" #include "utils/serialization.hh" #include @@ -21,41 +22,43 @@ namespace gms { */ class heart_beat_state { private: - int32_t _generation; - int32_t _version; + generation_type _generation; + version_type _version; public: bool operator==(const heart_beat_state& other) const noexcept { return _generation == other._generation && _version == other._version; } - heart_beat_state(int32_t gen) noexcept + heart_beat_state() noexcept : heart_beat_state(generation_type(0)) {} + + explicit heart_beat_state(generation_type gen) noexcept : _generation(gen) - , _version(0) { + { } - heart_beat_state(int32_t gen, int32_t ver) noexcept + heart_beat_state(generation_type gen, version_type ver) noexcept : _generation(gen) , _version(ver) { } - int32_t get_generation() const noexcept { + generation_type get_generation() const noexcept { return _generation; } void update_heart_beat() noexcept { - _version = version_generator::get_next_version().value(); + _version = version_generator::get_next_version(); } - int32_t get_heart_beat_version() const noexcept { + version_type get_heart_beat_version() const noexcept { return _version; } void force_newer_generation_unsafe() noexcept { - _generation += 1; + ++_generation; } void force_highest_possible_version_unsafe() noexcept { - _version = std::numeric_limits::max(); + _version = std::numeric_limits::max(); } friend inline std::ostream& operator<<(std::ostream& os, const heart_beat_state& h) { diff --git a/idl/gossip_digest.idl.hh b/idl/gossip_digest.idl.hh index b4dee6ff86..d516dc0b1e 100644 --- a/idl/gossip_digest.idl.hh +++ b/idl/gossip_digest.idl.hh @@ -34,8 +34,8 @@ class versioned_value { }; class heart_beat_state { - int32_t get_generation(); - int32_t get_heart_beat_version(); + gms::generation_type get_generation(); + gms::version_type get_heart_beat_version(); }; class endpoint_state { From 5dc7b7811ca9f7d7ff7b094929601686d9750f91 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 22 Feb 2023 15:16:28 +0200 Subject: [PATCH 10/11] gms: gossip_digest: use generation_type and version_type Signed-off-by: Benny Halevy --- gms/gossip_digest.hh | 18 ++++++++---------- gms/gossiper.cc | 16 ++++++++-------- idl/gossip_digest.idl.hh | 4 ++-- test/manual/message.cc | 8 ++++---- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/gms/gossip_digest.hh b/gms/gossip_digest.hh index 8762e3faf8..4efe92a8d4 100644 --- a/gms/gossip_digest.hh +++ b/gms/gossip_digest.hh @@ -13,6 +13,8 @@ #include #include "utils/serialization.hh" #include "gms/inet_address.hh" +#include "gms/generation-number.hh" +#include "gms/version_generator.hh" namespace gms { @@ -24,16 +26,12 @@ class gossip_digest { // implements Comparable private: using inet_address = gms::inet_address; inet_address _endpoint; - int32_t _generation; - int32_t _max_version; + generation_type _generation; + version_type _max_version; public: - gossip_digest() - : _endpoint(0) - , _generation(0) - , _max_version(0) { - } + gossip_digest() = default; - gossip_digest(inet_address ep, int32_t gen, int32_t version) + explicit gossip_digest(inet_address ep, generation_type gen = {}, version_type version = {}) noexcept : _endpoint(ep) , _generation(gen) , _max_version(version) { @@ -43,11 +41,11 @@ public: return _endpoint; } - int32_t get_generation() const { + generation_type get_generation() const { return _generation; } - int32_t get_max_version() const { + version_type get_max_version() const { return _max_version; } diff --git a/gms/gossiper.cc b/gms/gossiper.cc index d7942c9b3b..1d9183f7b4 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -155,8 +155,8 @@ void gossiper::do_sort(utils::chunked_vector& g_digest_list) { auto ep = g_digest.get_endpoint(); auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep); version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type(); - int32_t diff_version = ::abs(version.value() - g_digest.get_max_version()); - diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version)); + int32_t diff_version = ::abs(version - g_digest.get_max_version()); + diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version))); } g_digest_list.clear(); @@ -1137,7 +1137,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g generation = eps.get_heart_beat_state().get_generation(); max_version = get_max_endpoint_state_version(eps); } - g_digests.push_back(gossip_digest(endpoint, generation.value(), max_version.value())); + g_digests.push_back(gossip_digest(endpoint, generation, max_version)); } #if 0 if (logger.isTraceEnabled()) { @@ -1759,7 +1759,7 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati void gossiper::request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation) { /* We are here since we have no data for this endpoint locally so request everthing. */ - delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation.value(), 0); + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation); logger.trace("request_all for {}", g_digest.get_endpoint()); } @@ -1785,12 +1785,12 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l */ logger.debug("Shadow request received, adding all states"); for (auto& entry : _endpoint_state_map) { - g_digest_list.emplace_back(entry.first, 0, 0); + g_digest_list.emplace_back(entry.first); } } for (gossip_digest& g_digest : g_digest_list) { - auto remote_generation = generation_type(g_digest.get_generation()); - auto max_remote_version = version_type(g_digest.get_max_version()); + auto remote_generation = g_digest.get_generation(); + auto max_remote_version = g_digest.get_max_version(); /* Get state associated with the end point in digest */ auto&& ep = g_digest.get_endpoint(); auto es = get_endpoint_state_for_endpoint_ptr(ep); @@ -1829,7 +1829,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l */ if (max_remote_version > max_local_version) { logger.trace("examine_gossiper(): requesting version > {} from {}", max_local_version, g_digest.get_endpoint()); - delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation.value(), max_local_version.value()); + delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, max_local_version); } else if (max_remote_version < max_local_version) { /* send all data with generation = localgeneration and version > max_remote_version */ send_all(g_digest, delta_ep_state_map, max_remote_version); diff --git a/idl/gossip_digest.idl.hh b/idl/gossip_digest.idl.hh index d516dc0b1e..7f7bc196a3 100644 --- a/idl/gossip_digest.idl.hh +++ b/idl/gossip_digest.idl.hh @@ -45,8 +45,8 @@ class endpoint_state { class gossip_digest { gms::inet_address get_endpoint(); - int32_t get_generation(); - int32_t get_max_version(); + gms::generation_type get_generation(); + gms::version_type get_max_version(); }; class gossip_digest_syn { diff --git a/test/manual/message.cc b/test/manual/message.cc index f3aa6d4a2f..a75735467a 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -56,8 +56,8 @@ public: auto from = netw::messaging_service::get_source(cinfo); auto ep1 = inet_address("1.1.1.1"); auto ep2 = inet_address("2.2.2.2"); - int32_t gen = 800; - int32_t ver = 900; + gms::generation_type gen(800); + gms::version_type ver(900); utils::chunked_vector digests; digests.push_back(gms::gossip_digest(ep1, gen++, ver++)); digests.push_back(gms::gossip_digest(ep2, gen++, ver++)); @@ -114,8 +114,8 @@ public: auto id = get_msg_addr(); auto ep1 = inet_address("1.1.1.1"); auto ep2 = inet_address("2.2.2.2"); - int32_t gen = 100; - int32_t ver = 900; + gms::generation_type gen(100); + gms::version_type ver(900); utils::chunked_vector digests; digests.push_back(gms::gossip_digest(ep1, gen++, ver++)); digests.push_back(gms::gossip_digest(ep2, gen++, ver++)); From 5520d3a8e3e66933eb2ed098b8d39b7600dcb139 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 22 Feb 2023 11:37:14 +0200 Subject: [PATCH 11/11] gossiper: version_generator: add {debug_,}validate_gossip_generation Make sure that the int64_t generation we get over rpc fits in the int32_t generation_type we keep locally. Restrict this assertion to non-release builds. Signed-off-by: Benny Halevy --- gms/generation-number.cc | 11 +++++++++++ gms/generation-number.hh | 7 +++++++ gms/gossiper.cc | 1 + message/messaging_service.cc | 3 +++ 4 files changed, 22 insertions(+) diff --git a/gms/generation-number.cc b/gms/generation-number.cc index e6919156d9..14ddcd726f 100644 --- a/gms/generation-number.cc +++ b/gms/generation-number.cc @@ -8,6 +8,11 @@ #include #include +#include +#include + +#include + #include "generation-number.hh" namespace gms { @@ -22,4 +27,10 @@ generation_type get_generation_number() { return ret; } +void validate_gossip_generation(int64_t generation_number) { + if (!std::in_range(generation_number)) { + throw std::out_of_range(fmt::format("gossip generation {} is out of range", generation_number)); + } +} + } diff --git a/gms/generation-number.hh b/gms/generation-number.hh index e9d1baa8f9..af0e4afa1f 100644 --- a/gms/generation-number.hh +++ b/gms/generation-number.hh @@ -16,4 +16,11 @@ using generation_type = utils::tagged_integer gossiper::handle_shutdown_msg(inet_address from, std::optional auto permit = co_await this->lock_endpoint(from); if (generation_number_opt) { + debug_validate_gossip_generation(*generation_number_opt); auto es = this->get_endpoint_state_for_endpoint_ptr(from); if (es) { auto local_generation = es->get_heart_beat_state().get_generation(); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index c1fcdb3ef2..0e7d9c238b 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1066,9 +1066,11 @@ future<> messaging_service::unregister_gossip_echo() { return unregister_handler(netw::messaging_verb::GOSSIP_ECHO); } future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number, std::chrono::milliseconds timeout) { + gms::debug_validate_gossip_generation(generation_number); return send_message_timeout(this, messaging_verb::GOSSIP_ECHO, std::move(id), timeout, generation_number); } future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number, abort_source& as) { + gms::debug_validate_gossip_generation(generation_number); return send_message_cancellable(this, messaging_verb::GOSSIP_ECHO, std::move(id), as, generation_number); } @@ -1079,6 +1081,7 @@ future<> messaging_service::unregister_gossip_shutdown() { return unregister_handler(netw::messaging_verb::GOSSIP_SHUTDOWN); } future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from, int64_t generation_number) { + gms::debug_validate_gossip_generation(generation_number); return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from), generation_number); }