Merge 'Get rid of fb_utilities' from Benny Halevy

utils::fb_utilities is a global in-memory registry for storing and retrieving broadcast_address and broadcat_rpc_address.
As part of the effort to get rid of all global state, this series gets rid of fb_utilities.
This will eventually allow e.g. cql_test_env to instantiate multiple scylla server nodes, each serving on its own address.

Closes scylladb/scylladb#16250

* github.com:scylladb/scylladb:
  treewide: get rid of now unused fb_utilities
  tracing: use locator::topology rather than fb_utilities
  streaming: use locator::topology rather than fb_utilities
  raft: use locator::topology/messaging rather than fb_utilities
  storage_service: use locator::topology rather than fb_utilities
  storage_proxy: use locator::topology rather than fb_utilities
  service_level_controller: use locator::topology rather than fb_utilities
  misc_services: use locator::topology rather than fb_utilities
  migration_manager: use messaging rather than fb_utilities
  forward_service: use messaging rather than fb_utilities
  messaging_service: accept broadcast_addr in config rather than via fb_utilities
  messaging_service: move listen_address and port getters inline
  test: manual: modernize message test
  table: use gossiper rather than fb_utilities
  repair: use locator::topology rather than fb_utilities
  dht/range_streamer: use locator::topology rather than fb_utilities
  db/view: use locator::topology rather than fb_utilities
  database: use locator::topology rather than fb_utilities
  db/system_keyspace: use topology via db rather than fb_utilities
  db/system_keyspace: save_local_info: get broadcast addresses from caller
  db/hints/manager: use locator::topology rather than fb_utilities
  db/consistency_level: use locator::topology rather than fb_utilities
  api: use locator::topology rather than fb_utilities
  alternator: ttl: use locator::topology rather than fb_utilities
  gossiper: use locator::topology rather than fb_utilities
  gossiper: add get_this_endpoint_state_ptr
  test: lib: cql_test_env: pass broadcast_address in cql_test_config
  init: get_seeds_from_db_config: accept broadcast_address
  locator: replication strategies: use locator::topology rather than fb_utilities
  locator: topology: add helpers to retrieve this host_id and address
  snitch: pass broadcast_address in snitch_config
  snitch: add optional get_broadcast_address method
  locator: ec2_multi_region_snitch: keep local public address as member
  ec2_multi_region_snitch: reindent load_config
  ec2_multi_region_snitch: coroutinize load_config
  ec2_snitch: reindent load_config
  ec2_snitch: coroutinize load_config
  thrift: thrift_validation: use std::numeric_limits rather than fb_utilities
This commit is contained in:
Avi Kivity
2023-12-05 19:40:14 +02:00
71 changed files with 478 additions and 449 deletions

View File

@@ -23,7 +23,6 @@
#include "service/storage_proxy.hh"
#include "gms/gossiper.hh"
#include "utils/overloaded_functor.hh"
#include "utils/fb_utilities.hh"
#include "utils/aws_sigv4.hh"
static logging::logger slogger("alternator-server");

View File

@@ -38,7 +38,6 @@
#include "types/map.hh"
#include "utils/rjson.hh"
#include "utils/big_decimal.hh"
#include "utils/fb_utilities.hh"
#include "cql3/selection/selection.hh"
#include "cql3/values.hh"
#include "cql3/query_options.hh"
@@ -417,6 +416,7 @@ class token_ranges_owned_by_this_shard {
};
schema_ptr _s;
locator::effective_replication_map_ptr _erm;
// _token_ranges will contain a list of token ranges owned by this node.
// We'll further need to split each such range to the pieces owned by
// the current shard, using _intersecter.
@@ -430,15 +430,14 @@ class token_ranges_owned_by_this_shard {
size_t _range_idx;
size_t _end_idx;
std::optional<dht::selective_token_range_sharder> _intersecter;
locator::effective_replication_map_ptr _erm;
public:
token_ranges_owned_by_this_shard(replica::database& db, gms::gossiper& g, schema_ptr s)
: _s(s)
, _erm(s->table().get_effective_replication_map())
, _token_ranges(db.find_keyspace(s->ks_name()).get_effective_replication_map(),
g, utils::fb_utilities::get_broadcast_address())
g, _erm->get_topology().my_address())
, _range_idx(random_offset(0, _token_ranges.size() - 1))
, _end_idx(_range_idx + _token_ranges.size())
, _erm(s->table().get_effective_replication_map())
{
tlogger.debug("Generating token ranges starting from base range {} of {}", _range_idx, _token_ranges.size());
}

View File

@@ -11,7 +11,6 @@
#include "endpoint_snitch.hh"
#include "api/api-doc/endpoint_snitch_info.json.hh"
#include "api/api-doc/storage_service.json.hh"
#include "utils/fb_utilities.hh"
namespace api {
using namespace seastar::httpd;

View File

@@ -616,7 +616,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
ss::get_current_generation_number.set(r, [&ss](std::unique_ptr<http::request> req) {
gms::inet_address ep(utils::fb_utilities::get_broadcast_address());
auto ep = ss.local().get_token_metadata().get_topology().my_address();
return ss.local().gossiper().get_current_generation_number(ep).then([](gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});

View File

@@ -10,7 +10,6 @@
#include "api/api-doc/storage_service.json.hh"
#include "api/api-doc/endpoint_snitch_info.json.hh"
#include "locator/token_metadata.hh"
#include "utils/fb_utilities.hh"
using namespace seastar::httpd;
@@ -61,9 +60,9 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
return map_to_key_value(tm.local().get()->get_endpoint_to_host_id_map_for_reading(), res);
});
static auto host_or_broadcast = [](const_req req) {
static auto host_or_broadcast = [&tm](const_req req) {
auto host = req.get_query_param("host");
return host.empty() ? gms::inet_address(utils::fb_utilities::get_broadcast_address()) : gms::inet_address(host);
return host.empty() ? tm.local().get()->get_topology().my_address() : gms::inet_address(host);
};
httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&tm](const_req req) {

View File

@@ -161,6 +161,10 @@ public:
return *_role_manager;
}
const cql3::query_processor& query_processor() const noexcept {
return _qp;
}
private:
future<bool> has_existing_legacy_users() const;

View File

@@ -917,8 +917,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
// Need to artificially update our STATUS so other nodes handle the generation ID change
// FIXME: after 0e0282cd nodes do not require a STATUS update to react to CDC generation changes.
// The artificial STATUS update here should eventually be removed (in a few releases).
auto status = _gossiper.get_application_state_ptr(
utils::fb_utilities::get_broadcast_address(), gms::application_state::STATUS);
auto status = _gossiper.get_this_endpoint_state_ptr()->get_application_state_ptr(gms::application_state::STATUS);
if (!status) {
cdc_log.error("Our STATUS is missing");
cdc_log.error("Aborting CDC generation repair due to missing STATUS");

View File

@@ -23,7 +23,6 @@
#include "sstables/sstable_directory.hh"
#include "locator/abstract_replication_strategy.hh"
#include "utils/error_injection.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
#include "db/system_keyspace.hh"
#include <cmath>

View File

@@ -160,6 +160,10 @@ public:
return _cql_config;
}
const service::storage_proxy& proxy() const noexcept {
return _proxy;
}
service::storage_proxy& proxy() {
return _proxy;
}

View File

@@ -22,7 +22,6 @@
#include "db/read_repair_decision.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/network_topology_strategy.hh"
#include "utils/fb_utilities.hh"
#include "heat_load_balance.hh"
namespace db {
@@ -334,7 +333,7 @@ filter_for_query(consistency_level cl,
if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations
// local node is always first if present (see storage_proxy::get_endpoints_for_reading)
unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1;
unsigned local_idx = erm.get_topology().is_me(epi[0].first) ? 0 : epi.size() + 1;
live_endpoints = boost::copy_range<inet_address_vector_replica_set>(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)));
}
}

View File

@@ -38,7 +38,6 @@
#include "utils/disk-error-handler.hh"
#include "utils/div_ceil.hh"
#include "utils/error_injection.hh"
#include "utils/fb_utilities.hh"
#include "utils/lister.hh"
#include "utils/runtime.hh"
#include "converting_mutation_partition_applier.hh"
@@ -363,13 +362,13 @@ bool manager::too_many_in_flight_hints_for(endpoint_id ep) const noexcept {
// There is no need to check the DC here because if there is an in-flight hint for this
// endpoint, then this means that its DC has already been checked and found to be ok.
return _stats.size_of_hints_in_progress > MAX_SIZE_OF_HINTS_IN_PROGRESS
&& !utils::fb_utilities::is_me(ep)
&& _proxy.local_db().get_token_metadata().get_topology().is_me(ep)
&& hints_in_progress_for(ep) > 0
&& local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
}
bool manager::can_hint_for(endpoint_id ep) const noexcept {
if (utils::fb_utilities::is_me(ep)) {
if (_proxy.local_db().get_token_metadata().get_topology().is_me(ep)) {
return false;
}
@@ -502,7 +501,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
std::exception_ptr eptr = nullptr;
if (utils::fb_utilities::is_me(endpoint)) {
if (_proxy.local_db().get_token_metadata().get_topology().is_me(endpoint)) {
set_draining_all();
try {

View File

@@ -14,7 +14,6 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "system_keyspace.hh"
#include "cql3/untyped_result_set.hh"
#include "utils/fb_utilities.hh"
#include "utils/hash.hh"
#include "thrift/server.hh"
#include "exceptions/exceptions.hh"
@@ -1374,7 +1373,7 @@ future<system_keyspace::local_info> system_keyspace::load_local_info() {
co_return ret;
}
future<> system_keyspace::save_local_info(local_info sysinfo, locator::endpoint_dc_rack location) {
future<> system_keyspace::save_local_info(local_info sysinfo, locator::endpoint_dc_rack location, gms::inet_address broadcast_address, gms::inet_address broadcast_rpc_address) {
auto& cfg = _db.get_config();
sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
, db::system_keyspace::LOCAL);
@@ -1389,8 +1388,8 @@ future<> system_keyspace::save_local_info(local_info sysinfo, locator::endpoint_
location.dc,
location.rack,
sstring(cfg.partitioner()),
utils::fb_utilities::get_broadcast_rpc_address().addr(),
utils::fb_utilities::get_broadcast_address().addr(),
broadcast_rpc_address,
broadcast_address,
sysinfo.listen_address.addr()
).discard_result();
}
@@ -1563,7 +1562,7 @@ static std::vector<cdc::generation_id_v2> decode_cdc_generations_ids(const set_t
future<> system_keyspace::update_tokens(gms::inet_address ep, const std::unordered_set<dht::token>& tokens)
{
if (ep == utils::fb_utilities::get_broadcast_address()) {
if (_db.get_token_metadata().get_topology().is_me(ep)) {
co_return co_await remove_endpoint(ep);
}
@@ -1657,7 +1656,7 @@ future<> system_keyspace::update_cached_values(gms::inet_address ep, sstring col
template <typename Value>
future<> system_keyspace::update_peer_info(gms::inet_address ep, sstring column_name, Value value) {
if (ep == utils::fb_utilities::get_broadcast_address()) {
if (_db.get_token_metadata().get_topology().is_me(ep)) {
co_return;
}

View File

@@ -410,7 +410,7 @@ public:
};
future<local_info> load_local_info();
future<> save_local_info(local_info, locator::endpoint_dc_rack);
future<> save_local_info(local_info, locator::endpoint_dc_rack, gms::inet_address broadcast_address, gms::inet_address broadcast_rpc_address);
public:
static api::timestamp_type schema_creation_timestamp();
@@ -533,6 +533,10 @@ public:
friend future<column_mapping> db::schema_tables::get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version);
friend future<bool> db::schema_tables::column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
friend future<> db::schema_tables::drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
const replica::database& local_db() const noexcept {
return _db;
}
}; // class system_keyspace
} // namespace db

View File

@@ -59,7 +59,6 @@
#include "types/map.hh"
#include "utils/error_injection.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/fb_utilities.hh"
#include "query-result-writer.hh"
#include "readers/from_fragments_v2.hh"
#include "readers/evictable.hh"
@@ -1553,7 +1552,7 @@ get_view_natural_endpoint(
const dht::token& base_token,
const dht::token& view_token) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_address = topology.my_address();
auto my_datacenter = topology.get_datacenter();
std::vector<gms::inet_address> base_endpoints, view_endpoints;
for (auto&& base_endpoint : base_erm->get_natural_endpoints(base_token)) {
@@ -1655,7 +1654,7 @@ future<> view_update_generator::mutate_MV(
// First, find the local endpoint and ensure that if it exists,
// it will be the target endpoint. That way, all endpoints in the
// remote_endpoints list are guaranteed to be remote.
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_address = view_ermp->get_topology().my_address();
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), my_address);
if (remote_it != remote_endpoints.end()) {
if (!target_endpoint) {

View File

@@ -10,7 +10,6 @@
#include <seastar/core/sleep.hh>
#include "dht/range_streamer.hh"
#include "utils/fb_utilities.hh"
#include "replica/database.hh"
#include "gms/gossiper.hh"
#include "log.hh"
@@ -32,12 +31,13 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace) {
std::unordered_map<inet_address, dht::token_range_vector> range_fetch_map_map;
const auto& topo = _token_metadata_ptr->get_topology();
for (const auto& x : ranges_with_sources) {
const dht::token_range& range_ = x.first;
const std::vector<inet_address>& addresses = x.second;
bool found_source = false;
for (const auto& address : addresses) {
if (address == utils::fb_utilities::get_broadcast_address()) {
if (topo.is_me(address)) {
// If localhost is a source, we have found one, but we don't add it to the map to avoid streaming locally
found_source = true;
continue;

View File

@@ -2200,7 +2200,7 @@ future<> gossiper::do_stop_gossiping() {
logger.info("gossip is already stopped");
co_return;
}
auto my_ep_state = get_endpoint_state_ptr(get_broadcast_address());
auto my_ep_state = get_this_endpoint_state_ptr();
if (my_ep_state) {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}

View File

@@ -19,7 +19,6 @@
#include <seastar/util/source_location-compat.hh>
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "gms/generation-number.hh"
#include "gms/versioned_value.hh"
#include "gms/application_state.hh"
@@ -144,7 +143,7 @@ public:
}
inet_address get_broadcast_address() const noexcept {
return utils::fb_utilities::get_broadcast_address();
return get_token_metadata_ptr()->get_topology().my_address();
}
const std::set<inet_address>& get_seeds() const noexcept;
@@ -415,6 +414,11 @@ public:
// the endpoint_state_ptr is held.
endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept;
// Return this node's endpoint_state_ptr
endpoint_state_ptr get_this_endpoint_state_ptr() const noexcept {
return get_endpoint_state_ptr(get_broadcast_address());
}
const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept;
sstring get_application_state_value(inet_address endpoint, application_state appstate) const;

View File

@@ -9,7 +9,6 @@
#include "init.hh"
#include "utils/to_string.hh"
#include "gms/inet_address.hh"
#include "utils/fb_utilities.hh"
#include "seastarx.hh"
#include "db/config.hh"
@@ -18,7 +17,7 @@
logging::logger startlog("init");
std::set<gms::inet_address> get_seeds_from_db_config(const db::config& cfg) {
std::set<gms::inet_address> get_seeds_from_db_config(const db::config& cfg, gms::inet_address broadcast_address) {
auto preferred = cfg.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
const auto listen = gms::inet_address::lookup(cfg.listen_address(), family).get0();
@@ -43,7 +42,6 @@ std::set<gms::inet_address> get_seeds_from_db_config(const db::config& cfg) {
if (seeds.empty()) {
seeds.emplace(gms::inet_address("127.0.0.1"));
}
auto broadcast_address = utils::fb_utilities::get_broadcast_address();
startlog.info("seeds={{{}}}, listen_address={}, broadcast_address={}",
fmt::join(seeds, ", "), listen, broadcast_address);
if (broadcast_address != listen && seeds.contains(listen)) {

View File

@@ -35,7 +35,7 @@ extern logging::logger startlog;
class bad_configuration_error : public std::exception {};
std::set<gms::inet_address> get_seeds_from_db_config(const db::config& cfg);
std::set<gms::inet_address> get_seeds_from_db_config(const db::config& cfg, gms::inet_address broadcast_address);
class service_set {
public:

View File

@@ -20,76 +20,58 @@ static constexpr const char* PRIVATE_MAC_QUERY = "/latest/meta-data/network/inte
namespace locator {
ec2_multi_region_snitch::ec2_multi_region_snitch(const snitch_config& cfg)
: ec2_snitch(cfg)
, _broadcast_rpc_address_specified_by_user(cfg.broadcast_rpc_address_specified_by_user) {}
{}
future<> ec2_multi_region_snitch::start() {
_state = snitch_state::initializing;
return seastar::async([this] {
ec2_snitch::load_config(true).get();
if (this_shard_id() == io_cpu_id()) {
inet_address local_public_address;
co_await ec2_snitch::load_config(true);
if (this_shard_id() == io_cpu_id()) {
auto token = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, TOKEN_REQ_ENDPOINT, std::nullopt);
auto token = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, TOKEN_REQ_ENDPOINT, std::nullopt).get0();
try {
auto broadcast = utils::fb_utilities::get_broadcast_address();
if (broadcast.addr().is_ipv6()) {
auto macs = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PRIVATE_MAC_QUERY, token).get0();
// we should just get a single line, ending in '/'. If there are more than one mac, we should
// maybe try to loop the addresses and exclude local/link-local etc, but these addresses typically
// are already filtered by aws, so probably does not help. For now, just warn and pick first address.
auto i = macs.find('/');
auto mac = macs.substr(0, i);
if (i != std::string::npos && ++i != macs.size()) {
logger().warn("Ec2MultiRegionSnitch (ipv6): more than one MAC address listed ({}). Will use first.", macs);
}
auto ipv6 = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, format(PUBLIC_IPV6_QUERY_REQ, mac), token).get0();
local_public_address = inet_address(ipv6);
_local_private_address = ipv6;
} else {
auto pub_addr = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PUBLIC_IP_QUERY_REQ, token).get0();
local_public_address = inet_address(pub_addr);
try {
auto broadcast = _cfg.broadcast_address;
if (broadcast.addr().is_ipv6()) {
auto macs = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PRIVATE_MAC_QUERY, token);
// we should just get a single line, ending in '/'. If there are more than one mac, we should
// maybe try to loop the addresses and exclude local/link-local etc, but these addresses typically
// are already filtered by aws, so probably does not help. For now, just warn and pick first address.
auto i = macs.find('/');
auto mac = macs.substr(0, i);
if (i != std::string::npos && ++i != macs.size()) {
logger().warn("Ec2MultiRegionSnitch (ipv6): more than one MAC address listed ({}). Will use first.", macs);
}
} catch (...) {
std::throw_with_nested(exceptions::configuration_exception("Failed to get a Public IP. Public IP is a requirement for Ec2MultiRegionSnitch. Consider using a different snitch if your instance doesn't have it"));
auto ipv6 = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, format(PUBLIC_IPV6_QUERY_REQ, mac), token);
_local_public_address = inet_address(ipv6);
_local_private_address = ipv6;
} else {
auto pub_addr = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PUBLIC_IP_QUERY_REQ, token);
_local_public_address = inet_address(pub_addr);
}
logger().info("Ec2MultiRegionSnitch using publicIP as identifier: {}", local_public_address);
} catch (...) {
std::throw_with_nested(exceptions::configuration_exception("Failed to get a Public IP. Public IP is a requirement for Ec2MultiRegionSnitch. Consider using a different snitch if your instance doesn't have it"));
}
logger().info("Ec2MultiRegionSnitch using publicIP as identifier: {}", _local_public_address);
//
// Use the Public IP to broadcast Address to other nodes.
//
// Cassandra 2.1 manual explicitly instructs to set broadcast_address
// value to a public address in cassandra.yaml.
//
utils::fb_utilities::set_broadcast_address(local_public_address);
if (!_broadcast_rpc_address_specified_by_user) {
utils::fb_utilities::set_broadcast_rpc_address(local_public_address);
}
if (!local_public_address.addr().is_ipv6()) {
sstring priv_addr = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PRIVATE_IP_QUERY_REQ, token).get0();
_local_private_address = priv_addr;
}
//
// Gossiper main instance is currently running on CPU0 -
// therefore we need to make sure the _local_private_address is
// set on the shard0 so that it may be used when Gossiper is
// going to invoke gossiper_starting() method.
//
container().invoke_on(0, [this] (snitch_ptr& local_s) {
if (this_shard_id() != io_cpu_id()) {
local_s->set_local_private_addr(_local_private_address);
}
}).get();
set_snitch_ready();
return;
if (!_local_public_address.addr().is_ipv6()) {
sstring priv_addr = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, PRIVATE_IP_QUERY_REQ, token);
_local_private_address = priv_addr;
}
set_snitch_ready();
});
//
// Gossiper main instance is currently running on CPU0 -
// therefore we need to make sure the _local_private_address is
// set on the shard0 so that it may be used when Gossiper is
// going to invoke gossiper_starting() method.
//
co_await container().invoke_on(0, [this] (snitch_ptr& local_s) {
if (this_shard_id() != io_cpu_id()) {
local_s->set_local_private_addr(_local_private_address);
}
});
}
set_snitch_ready();
}
void ec2_multi_region_snitch::set_local_private_addr(const sstring& addr_str) {

View File

@@ -22,7 +22,11 @@ public:
virtual sstring get_name() const override {
return "org.apache.cassandra.locator.Ec2MultiRegionSnitch";
}
virtual std::optional<inet_address> get_public_address() const noexcept override {
return _local_public_address;
}
private:
inet_address _local_public_address;
sstring _local_private_address;
bool _broadcast_rpc_address_specified_by_user;
};

View File

@@ -24,36 +24,32 @@ future<> ec2_snitch::load_config(bool prefer_local) {
using namespace boost::algorithm;
if (this_shard_id() == io_cpu_id()) {
auto token = aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, TOKEN_REQ_ENDPOINT, std::nullopt).get0();
return aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, ZONE_NAME_QUERY_REQ, token).then([this, prefer_local](sstring az) {
assert(az.size());
auto token = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, TOKEN_REQ_ENDPOINT, std::nullopt);
auto az = co_await aws_api_call(AWS_QUERY_SERVER_ADDR, AWS_QUERY_SERVER_PORT, ZONE_NAME_QUERY_REQ, token);
assert(az.size());
std::vector<std::string> splits;
std::vector<std::string> splits;
// Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
split(splits, az, is_any_of("-"));
assert(splits.size() > 1);
// Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
split(splits, az, is_any_of("-"));
assert(splits.size() > 1);
sstring my_rack = splits[splits.size() - 1];
sstring my_rack = splits[splits.size() - 1];
// hack for CASSANDRA-4026
sstring my_dc = az.substr(0, az.size() - 1);
if (my_dc[my_dc.size() - 1] == '1') {
my_dc = az.substr(0, az.size() - 3);
}
// hack for CASSANDRA-4026
sstring my_dc = az.substr(0, az.size() - 1);
if (my_dc[my_dc.size() - 1] == '1') {
my_dc = az.substr(0, az.size() - 3);
}
return read_property_file().then([this, prefer_local, my_dc, my_rack] (sstring datacenter_suffix) mutable {
my_dc += datacenter_suffix;
logger().info("Ec2Snitch using region: {}, zone: {}.", my_dc, my_rack);
return container().invoke_on_all([prefer_local, my_dc, my_rack] (snitch_ptr& local_s) {
local_s->set_my_dc_and_rack(my_dc, my_rack);
local_s->set_prefer_local(prefer_local);
});
});
auto datacenter_suffix = co_await read_property_file();
my_dc += datacenter_suffix;
logger().info("Ec2Snitch using region: {}, zone: {}.", my_dc, my_rack);
co_await container().invoke_on_all([prefer_local, my_dc, my_rack] (snitch_ptr& local_s) {
local_s->set_my_dc_and_rack(my_dc, my_rack);
local_s->set_prefer_local(prefer_local);
});
}
return make_ready_future<>();
}
future<> ec2_snitch::start() {

View File

@@ -11,7 +11,6 @@
#include "locator/everywhere_replication_strategy.hh"
#include "utils/class_registrator.hh"
#include "utils/fb_utilities.hh"
#include "locator/token_metadata.hh"
namespace locator {
@@ -23,7 +22,7 @@ everywhere_replication_strategy::everywhere_replication_strategy(const replicati
future<endpoint_set> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
if (tm.sorted_tokens().empty()) {
endpoint_set result{inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})};
endpoint_set result{inet_address_vector_replica_set({tm.get_topology().my_address()})};
return make_ready_future<endpoint_set>(std::move(result));
}
const auto& all_endpoints = tm.get_all_endpoints();

View File

@@ -9,7 +9,6 @@
#include <algorithm>
#include "local_strategy.hh"
#include "utils/class_registrator.hh"
#include "utils/fb_utilities.hh"
namespace locator {
@@ -20,7 +19,7 @@ local_strategy::local_strategy(const replication_strategy_config_options& config
}
future<endpoint_set> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
return make_ready_future<endpoint_set>(endpoint_set({utils::fb_utilities::get_broadcast_address()}));
return make_ready_future<endpoint_set>(endpoint_set({tm.get_topology().my_address()}));
}
void local_strategy::validate_options(const gms::feature_service&) const {

View File

@@ -15,7 +15,6 @@
#include "locator/network_topology_strategy.hh"
#include "locator/load_sketch.hh"
#include "utils/fb_utilities.hh"
#include <boost/algorithm/string.hpp>
#include "utils/hash.hh"

View File

@@ -10,7 +10,6 @@
#include "db/system_keyspace.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "utils/fb_utilities.hh"
#include "db/config.hh"
#include <boost/algorithm/string/trim.hpp>
@@ -21,7 +20,8 @@
namespace locator {
production_snitch_base::production_snitch_base(snitch_config cfg)
: allowed_property_keys({ dc_property_key,
: snitch_base(cfg)
, allowed_property_keys({ dc_property_key,
rack_property_key,
prefer_local_property_key,
dc_suffix_property_key }) {

View File

@@ -13,7 +13,6 @@
#include <seastar/core/sstring.hh>
#include "gms/inet_address.hh"
#include "snitch_base.hh"
#include "utils/fb_utilities.hh"
namespace locator {
@@ -24,7 +23,9 @@ using inet_address = gms::inet_address;
* in the 2nd and 3rd octets of the ip address, respectively.
*/
struct rack_inferring_snitch : public snitch_base {
rack_inferring_snitch(const snitch_config& cfg) {
rack_inferring_snitch(const snitch_config& cfg)
: snitch_base(cfg)
{
_my_dc = get_datacenter();
_my_rack = get_rack();
@@ -33,12 +34,12 @@ struct rack_inferring_snitch : public snitch_base {
}
virtual sstring get_rack() const override {
auto endpoint = utils::fb_utilities::get_broadcast_address();
auto& endpoint = _cfg.broadcast_address;
return std::to_string(uint8_t(endpoint.bytes()[2]));
}
virtual sstring get_datacenter() const override {
auto endpoint = utils::fb_utilities::get_broadcast_address();
auto& endpoint = _cfg.broadcast_address;
return std::to_string(uint8_t(endpoint.bytes()[1]));
}

View File

@@ -10,7 +10,6 @@
#pragma once
#include "snitch_base.hh"
#include "utils/fb_utilities.hh"
#include <memory>
namespace locator {
@@ -21,7 +20,9 @@ namespace locator {
* which improves cache locality.
*/
struct simple_snitch : public snitch_base {
simple_snitch(const snitch_config& cfg) {
simple_snitch(const snitch_config& cfg)
: snitch_base(cfg)
{
_my_dc = get_datacenter();
_my_rack = get_rack();

View File

@@ -46,10 +46,10 @@ struct snitch_config {
sstring name = "SimpleSnitch";
sstring properties_file_name = "";
unsigned io_cpu_id = 0;
bool broadcast_rpc_address_specified_by_user = false;
// Gossiping-property-file specific
gms::inet_address listen_address;
gms::inet_address broadcast_address;
// GCE-specific
sstring gce_meta_server_url = "";
@@ -78,6 +78,8 @@ public:
};
}
virtual std::optional<inet_address> get_public_address() const noexcept { return std::nullopt; }
/**
* returns whatever info snitch wants to gossip
*/
@@ -285,6 +287,8 @@ inline future<> i_endpoint_snitch::reset_snitch(sharded<snitch_ptr>& snitch, sni
class snitch_base : public i_endpoint_snitch {
public:
snitch_base(const snitch_config& cfg) : _cfg(cfg) {}
//
// Sons have to implement:
// virtual sstring get_rack() = 0;
@@ -296,6 +300,7 @@ public:
protected:
sstring _my_dc;
sstring _my_rack;
snitch_config _cfg;
};
} // namespace locator

View File

@@ -22,7 +22,6 @@
#include <boost/range/adaptors.hpp>
#include <seastar/core/smp.hh>
#include "utils/stall_free.hh"
#include "utils/fb_utilities.hh"
namespace locator {

View File

@@ -15,7 +15,6 @@
#include "locator/topology.hh"
#include "locator/production_snitch_base.hh"
#include "utils/stall_free.hh"
#include "utils/fb_utilities.hh"
namespace locator {

View File

@@ -161,6 +161,7 @@ class topology {
public:
struct config {
inet_address this_endpoint;
inet_address this_cql_address; // corresponds to broadcast_rpc_address
host_id this_host_id;
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
@@ -336,6 +337,26 @@ public:
void for_each_node(std::function<void(const node*)> func) const;
host_id my_host_id() const noexcept {
return _cfg.this_host_id;
}
inet_address my_address() const noexcept {
return _cfg.this_endpoint;
}
inet_address my_cql_address() const noexcept {
return _cfg.this_cql_address;
}
bool is_me(const locator::host_id& id) const noexcept {
return id == my_host_id();
}
bool is_me(const inet_address& addr) const noexcept {
return addr == my_address();
}
private:
bool is_configured_this_node(const node&) const;
const node* add_node(node_holder node);

34
main.cc
View File

@@ -12,6 +12,7 @@
#include <seastar/util/closeable.hh>
#include <seastar/core/abort_source.hh>
#include "gms/inet_address.hh"
#include "tasks/task_manager.hh"
#include "utils/build_id.hh"
#include "supervisor.hh"
@@ -531,7 +532,9 @@ sharded<service::storage_proxy> *the_storage_proxy;
static locator::host_id initialize_local_info_thread(sharded<db::system_keyspace>& sys_ks,
sharded<locator::snitch_ptr>& snitch,
const gms::inet_address& listen_address,
const db::config& cfg)
const db::config& cfg,
gms::inet_address broadcast_address,
gms::inet_address broadcast_rpc_address)
{
auto linfo = sys_ks.local().load_local_info().get0();
if (linfo.cluster_name.empty()) {
@@ -545,7 +548,7 @@ static locator::host_id initialize_local_info_thread(sharded<db::system_keyspace
}
linfo.listen_address = listen_address;
sys_ks.local().save_local_info(std::move(linfo), snitch.local()->get_location()).get();
sys_ks.local().save_local_info(std::move(linfo), snitch.local()->get_location(), broadcast_address, broadcast_rpc_address).get();
return linfo.host_id;
}
@@ -811,9 +814,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto family = cfg->enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
auto broadcast_addr = utils::resolve(cfg->broadcast_address || cfg->listen_address, family, preferred).get0();
utils::fb_utilities::set_broadcast_address(broadcast_addr);
auto broadcast_rpc_addr = utils::resolve(cfg->broadcast_rpc_address || cfg->rpc_address, family, preferred).get0();
utils::fb_utilities::set_broadcast_rpc_address(broadcast_rpc_addr);
ctx.api_dir = cfg->api_ui_dir();
if (!ctx.api_dir.empty() && ctx.api_dir.back() != '/') {
@@ -873,8 +874,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
debug::the_snitch = &snitch;
snitch_config snitch_cfg;
snitch_cfg.name = cfg->endpoint_snitch();
snitch_cfg.broadcast_rpc_address_specified_by_user = !cfg->broadcast_rpc_address().empty();
snitch_cfg.listen_address = utils::resolve(cfg->listen_address, family).get0();
snitch_cfg.broadcast_address = broadcast_addr;
snitch.start(snitch_cfg).get();
auto stop_snitch = defer_verbose_shutdown("snitch", [&snitch] {
snitch.stop().get();
@@ -883,9 +884,23 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// #293 - do not stop anything (unless snitch.on_all(start) fails)
stop_snitch->cancel();
if (auto opt_public_address = snitch.local()->get_public_address()) {
// Use the Public IP as broadcast_address to other nodes
// and the broadcast_rpc_address (for client CQL connections).
//
// Cassandra 2.1 manual explicitly instructs to set broadcast_address
// value to a public address in cassandra.yaml.
//
broadcast_addr = *opt_public_address;
if (cfg->broadcast_rpc_address().empty()) {
broadcast_rpc_addr = *opt_public_address;
}
}
supervisor::notify("starting tokens manager");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.this_endpoint = broadcast_addr;
tm_cfg.topo_cfg.this_cql_address = broadcast_rpc_addr;
tm_cfg.topo_cfg.local_dc_rack = snitch.local()->get_location();
if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") {
//
@@ -1189,9 +1204,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sys_ks.local().build_bootstrap_info().get();
const auto listen_address = utils::resolve(cfg->listen_address, family).get0();
const auto host_id = initialize_local_info_thread(sys_ks, snitch, listen_address, *cfg);
const auto host_id = initialize_local_info_thread(sys_ks, snitch, listen_address, *cfg, broadcast_addr, broadcast_rpc_addr);
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) {
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = broadcast_addr] (locator::token_metadata& tm) {
// Makes local host id available in topology cfg as soon as possible.
// Raft topology discard the endpoint-to-id map, so the local id can
// still be found in the config.
@@ -1204,6 +1219,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
mscfg.id = host_id;
mscfg.ip = listen_address;
mscfg.broadcast_address = broadcast_addr;
mscfg.port = cfg->storage_port();
mscfg.ssl_port = cfg->ssl_storage_port();
mscfg.listen_on_broadcast_address = cfg->listen_on_broadcast_address();
@@ -1271,7 +1287,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("starting gossiper");
gms::gossip_config gcfg;
gcfg.gossip_scheduling_group = dbcfg.gossip_scheduling_group;
gcfg.seeds = get_seeds_from_db_config(*cfg);
gcfg.seeds = get_seeds_from_db_config(*cfg, broadcast_addr);
gcfg.cluster_name = cfg->cluster_name();
gcfg.partitioner = cfg->partitioner();
gcfg.ring_delay_ms = cfg->ring_delay_ms();

View File

@@ -240,7 +240,7 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
}
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
: messaging_service(config{std::move(id), std::move(ip), port},
: messaging_service(config{std::move(id), ip, ip, port},
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
{}
@@ -336,7 +336,8 @@ bool messaging_service::is_host_banned(locator::host_id id) {
}
void messaging_service::do_start_listen() {
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != utils::fb_utilities::get_broadcast_address();
auto broadcast_address = this->broadcast_address();
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
rpc::server_options so;
if (_cfg.compress != compress_what::none) {
so.compressor_factory = &compressor_factory;
@@ -379,7 +380,7 @@ void messaging_service::do_start_listen() {
};
_server[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x55AA));
if (listen_to_bc) {
_server[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x66BB));
_server[1] = listen(broadcast_address, rpc::streaming_domain_type(0x66BB));
}
}
@@ -405,22 +406,22 @@ void messaging_service::do_start_listen() {
};
_server_tls[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x77CC));
if (listen_to_bc) {
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x88DD));
_server_tls[1] = listen(broadcast_address, rpc::streaming_domain_type(0x88DD));
}
}
// Do this on just cpu 0, to avoid duplicate logs.
if (this_shard_id() == 0) {
if (_server_tls[0]) {
mlogger.info("Starting Encrypted Messaging Service on SSL port {}", _cfg.ssl_port);
mlogger.info("Starting Encrypted Messaging Service on SSL address {} port {}", _cfg.ip, _cfg.ssl_port);
}
if (_server_tls[1]) {
mlogger.info("Starting Encrypted Messaging Service on SSL broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.ssl_port);
mlogger.info("Starting Encrypted Messaging Service on SSL broadcast address {} port {}", broadcast_address, _cfg.ssl_port);
}
if (_server[0]) {
mlogger.info("Starting Messaging Service on port {}", _cfg.port);
mlogger.info("Starting Messaging Service on address {} port {}", _cfg.ip, _cfg.port);
}
if (_server[1]) {
mlogger.info("Starting Messaging Service on broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.port);
mlogger.info("Starting Messaging Service on broadcast address {} port {}", broadcast_address, _cfg.port);
}
}
}
@@ -474,14 +475,6 @@ msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
messaging_service::~messaging_service() = default;
uint16_t messaging_service::port() {
return _cfg.port;
}
gms::inet_address messaging_service::listen_address() {
return _cfg.ip;
}
static future<> do_with_servers(std::string_view what, std::array<std::unique_ptr<messaging_service::rpc_protocol_server_wrapper>, 2>& servers, auto method) {
mlogger.info("{} server", what);
co_await coroutine::parallel_for_each(
@@ -824,7 +817,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}
auto my_host_id = _cfg.id;
auto broadcast_address = utils::fb_utilities::get_broadcast_address();
auto broadcast_address = _cfg.broadcast_address;
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
auto laddr = socket_address(listen_to_bc ? broadcast_address : _cfg.ip, 0);
@@ -929,7 +922,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
// No reply is received, nothing to wait for.
(void)_rpc->make_client<
rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t, utils::UUID)>(messaging_verb::CLIENT_ID)(
*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
*it->second.rpc_client, broadcast_address, src_cpu_id,
query::result_memory_limiter::maximum_result_size, my_host_id.uuid())
.handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) {
mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t<messaging_verb>(verb), ep);

View File

@@ -267,7 +267,8 @@ public:
struct config {
locator::host_id id;
gms::inet_address ip;
gms::inet_address ip; // a.k.a. listen_address - the address this node is listening on
gms::inet_address broadcast_address; // This node's address, as told to other nodes
uint16_t port;
uint16_t ssl_port = 0;
encrypt_what encrypt = encrypt_what::none;
@@ -340,8 +341,15 @@ public:
future<> start();
future<> start_listen(locator::shared_token_metadata& stm);
uint16_t port();
gms::inet_address listen_address();
uint16_t port() const noexcept {
return _cfg.port;
}
gms::inet_address listen_address() const noexcept {
return _cfg.ip;
}
gms::inet_address broadcast_address() const noexcept {
return _cfg.broadcast_address;
}
future<> shutdown();
future<> stop();
static rpc::no_wait_type no_wait();

View File

@@ -13,7 +13,6 @@
#include "dht/sharder.hh"
#include "streaming/stream_reason.hh"
#include "gms/inet_address.hh"
#include "utils/fb_utilities.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "sstables/sstables.hh"
@@ -225,7 +224,8 @@ static std::vector<gms::inet_address> get_neighbors(
auto normal_nodes = erm.get_token_metadata().get_all_endpoints();
ret = inet_address_vector_replica_set(normal_nodes.begin(), normal_nodes.end());
}
remove_item(ret, utils::fb_utilities::get_broadcast_address());
auto my_address = erm.get_topology().my_address();
remove_item(ret, my_address);
if (!data_centers.empty()) {
auto dc_endpoints_map = erm.get_token_metadata().get_topology().get_datacenter_endpoints();
@@ -246,7 +246,7 @@ static std::vector<gms::inet_address> get_neighbors(
}
// We require, like Cassandra does, that the current host must also
// be part of the repair
if (!dc_endpoints.contains(utils::fb_utilities::get_broadcast_address())) {
if (!dc_endpoints.contains(my_address)) {
throw std::runtime_error("The current host must be part of the repair");
}
// The resulting list of nodes is the intersection of the nodes in the
@@ -269,7 +269,7 @@ static std::vector<gms::inet_address> get_neighbors(
} catch(...) {
throw std::runtime_error(format("Unknown host specified: {}", host));
}
if (endpoint == utils::fb_utilities::get_broadcast_address()) {
if (endpoint == my_address) {
found_me = true;
} else if (neighbor_set.contains(endpoint)) {
ret.push_back(endpoint);
@@ -290,7 +290,7 @@ static std::vector<gms::inet_address> get_neighbors(
throw std::runtime_error("The current host must be part of the repair");
}
if (ret.size() < 1) {
auto me = utils::fb_utilities::get_broadcast_address();
auto me = my_address;
auto others = erm.get_natural_endpoints(tok);
remove_item(others, me);
throw std::runtime_error(fmt::format("Repair requires at least two "
@@ -345,7 +345,8 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
// Repair coordinator must participate in repair, but it is never
// returned by get_neighbors - add it here
participating_hosts.insert(utils::fb_utilities::get_broadcast_address());
auto my_address = erm.get_topology().my_address();
participating_hosts.insert(my_address);
co_await do_for_each(ranges, [&] (const dht::token_range& range) {
const auto nbs = get_neighbors(erm, ksname, range, data_centers, hosts, ignore_nodes);
@@ -1085,6 +1086,7 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
auto& erm = germs->get();
auto& topology = erm.get_token_metadata().get_topology();
auto my_address = topology.my_address();
repair_options options(options_map);
@@ -1101,7 +1103,7 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
co_return id.id;
}
if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) {
if (!_gossiper.local().is_normal(my_address)) {
throw std::runtime_error("Node is not in NORMAL status yet!");
}
@@ -1122,15 +1124,15 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
// but instead of each range being assigned just one primary owner
// across the entire cluster, here each range is assigned a primary
// owner in each of the DCs.
ranges = erm.get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address());
ranges = erm.get_primary_ranges_within_dc(my_address);
} else if (options.data_centers.size() > 0 || options.hosts.size() > 0) {
throw std::runtime_error("You need to run primary range repair on all nodes in the cluster.");
} else {
ranges = erm.get_primary_ranges(utils::fb_utilities::get_broadcast_address());
ranges = erm.get_primary_ranges(my_address);
}
} else {
// get keyspace local ranges
ranges = erm.get_ranges(utils::fb_utilities::get_broadcast_address());
ranges = erm.get_ranges(my_address);
}
if (!options.data_centers.empty() && !options.hosts.empty()) {
@@ -1498,7 +1500,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto myloc = topology.get_location();
auto myip = utils::fb_utilities::get_broadcast_address();
auto myip = topology.my_address();
auto reason = streaming::stream_reason::bootstrap;
// Calculate number of ranges to sync data
size_t nr_ranges_total = 0;
@@ -1672,9 +1674,9 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
using inet_address = gms::inet_address;
return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
auto& db = get_db().local();
auto myip = utils::fb_utilities::get_broadcast_address();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto myip = topology.my_address();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto local_dc = topology.get_datacenter();
bool is_removenode = myip != leaving_node;
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
@@ -1864,7 +1866,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) {
assert(this_shard_id() == 0);
return do_decommission_removenode_with_repair(std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {});
auto my_address = tmptr->get_topology().my_address();
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {});
}
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
@@ -1885,7 +1888,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
auto& db = get_db().local();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto myip = utils::fb_utilities::get_broadcast_address();
auto myip = tmptr->get_topology().my_address();
size_t nr_ranges_total = 0;
for (const auto& [keyspace_name, erm] : ks_erms) {
if (!db.has_keyspace(keyspace_name)) {
@@ -1985,13 +1988,14 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
auto cloned_tm = co_await tmptr->clone_async();
auto op = sstring("replace_with_repair");
auto& topology = tmptr->get_topology();
auto myip = topology.my_address();
auto myloc = topology.get_location();
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), myloc, locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
cloned_tmptr->update_topology(myip, myloc, locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, myip);
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes));
}

View File

@@ -669,7 +669,7 @@ void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_
bool do_small_table_optimization = erm && small_table_optimization;
auto* strat = do_small_table_optimization ? &erm->get_replication_strategy() : nullptr;
auto* tm = do_small_table_optimization ? &erm->get_token_metadata() : nullptr;
auto myip = do_small_table_optimization ? utils::fb_utilities::get_broadcast_address() : gms::inet_address();
auto myip = do_small_table_optimization ? erm->get_topology().my_address() : gms::inet_address();
for (auto& r : rows) {
thread::maybe_yield();
if (!r.dirty_on_master()) {
@@ -843,7 +843,7 @@ public:
, _max_row_buf_size(max_row_buf_size)
, _seed(seed)
, _repair_master(master)
, _myip(utils::fb_utilities::get_broadcast_address())
, _myip(_db.local().get_token_metadata().get_topology().my_address())
, _repair_meta_id(repair_meta_id)
, _reason(reason)
, _master_node_shard_config(std::move(master_node_shard_config))
@@ -872,7 +872,7 @@ public:
} else {
add_to_repair_meta_for_followers(*this);
}
_all_node_states.push_back(repair_node_state(utils::fb_utilities::get_broadcast_address()));
_all_node_states.push_back(repair_node_state(_myip));
for (auto& node : all_live_peer_nodes) {
_all_node_states.push_back(repair_node_state(node));
}
@@ -1576,7 +1576,7 @@ public:
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason,
gc_clock::time_point compaction_time, abort_source& as) {
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
repair.get_db().local().get_token_metadata().get_topology().my_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
return repair.insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as).then([] {
return repair_row_level_start_response{repair_row_level_start_status::ok};
}).handle_exception_type([] (replica::no_such_column_family&) {
@@ -1598,7 +1598,7 @@ public:
static future<>
repair_row_level_stop_handler(repair_service& rs, gms::inet_address from, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range) {
rlogger.debug("<<< Finished Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}",
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, range);
rs.get_db().local().get_token_metadata().get_topology().my_address(), from, repair_meta_id, ks_name, cf_name, range);
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::row_level_stop_started);
return rs.remove_repair_meta(from, repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range)).then([rm] {
@@ -2392,7 +2392,7 @@ future<> repair_service::init_ms_handlers() {
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) {
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
utils::fb_utilities::get_broadcast_address())));
local_repair.get_db().local().get_token_metadata().get_topology().my_address())));
}
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
@@ -2561,9 +2561,9 @@ private:
};
inet_address_vector_replica_set sort_peer_nodes(const std::vector<gms::inet_address>& nodes) {
auto myip = utils::fb_utilities::get_broadcast_address();
inet_address_vector_replica_set sorted_nodes(nodes.begin(), nodes.end());
_shard_task.db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes);
auto& topology = _shard_task.db.local().get_token_metadata().get_topology();
topology.sort_by_proximity(topology.my_address(), sorted_nodes);
return sorted_nodes;
}
@@ -2833,11 +2833,12 @@ private:
if (_shard_task.reason() != streaming::stream_reason::repair) {
co_return;
}
auto my_address = get_erm()->get_topology().my_address();
// Update repair_history table only if all replicas have been repaired
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_shard_task.total_rf != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
_shard_task.global_repair_id.uuid(), _shard_task.total_rf, repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
@@ -2852,7 +2853,7 @@ private:
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_shard_task.global_repair_id.uuid(), _table_id, _shard_task.get_keyspace(), _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
all_nodes.push_back(my_address);
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
try {
auto& ms = _shard_task.messaging.local();

View File

@@ -51,7 +51,6 @@
#include "multishard_mutation_query.hh"
#include "utils/human_readable.hh"
#include "utils/fb_utilities.hh"
#include "utils/stall_free.hh"
#include "utils/fmt-compat.hh"
#include "utils/error_injection.hh"
@@ -2693,7 +2692,8 @@ const sstring& database::get_snitch_name() const {
}
dht::token_range_vector database::get_keyspace_local_ranges(sstring ks) {
return find_keyspace(ks).get_effective_replication_map()->get_ranges(utils::fb_utilities::get_broadcast_address());
auto my_address = get_token_metadata().get_topology().my_address();
return find_keyspace(ks).get_effective_replication_map()->get_ranges(my_address);
}
/*!

View File

@@ -38,7 +38,6 @@
#include <boost/range/adaptor/map.hpp>
#include "utils/error_injection.hh"
#include "utils/histogram_metrics_helper.hh"
#include "utils/fb_utilities.hh"
#include "mutation/mutation_source_metadata.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
@@ -2365,7 +2364,7 @@ table::cache_hit_rate table::get_my_hit_rate() const {
}
table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::inet_address addr) {
if (utils::fb_utilities::get_broadcast_address() == addr) {
if (gossiper.get_broadcast_address() == addr) {
return get_my_hit_rate();
}
auto it = _cluster_cache_hit_rates.find(addr);

View File

@@ -35,7 +35,6 @@
#include "tracing/trace_state.hh"
#include "tracing/tracing.hh"
#include "types/types.hh"
#include "utils/fb_utilities.hh"
#include "service/storage_proxy.hh"
#include "cql3/functions/aggregate_function.hh"
@@ -268,7 +267,8 @@ public:
{}
future<query::forward_result> dispatch_to_node(netw::msg_addr id, query::forward_request req) {
if (utils::fb_utilities::is_me(id.addr)) {
auto my_address = _forwarder._messaging.broadcast_address();
if (id.addr == my_address) {
return _forwarder.dispatch_to_shards(req, _tr_info);
}

View File

@@ -69,5 +69,9 @@ public:
void start_broadcasting();
future<> stop_broadcasting();
const gms::gossiper& gossiper() const noexcept {
return _gossiper;
};
};
}

View File

@@ -207,7 +207,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint,
const auto* value = state.get_application_state_ptr(gms::application_state::SCHEMA);
if (endpoint != utils::fb_utilities::get_broadcast_address() && value) {
if (endpoint != _messaging.broadcast_address() && value) {
// FIXME: discarded future
(void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value()}), endpoint).handle_exception([endpoint] (auto ep) {
mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep);
@@ -223,8 +223,8 @@ bool migration_manager::have_schema_agreement() {
auto our_version = _storage_proxy.get_db().local().get_version();
bool match = false;
static thread_local logger::rate_limit rate_limit{std::chrono::seconds{5}};
_gossiper.for_each_endpoint_state_until([&] (const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
if (endpoint == utils::fb_utilities::get_broadcast_address() || !_gossiper.is_alive(endpoint)) {
_gossiper.for_each_endpoint_state_until([&, my_address = _messaging.broadcast_address()] (const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
if (endpoint == my_address || !_gossiper.is_alive(endpoint)) {
return stop_iteration::no;
}
mlogger.debug("Checking schema state for {}.", endpoint);
@@ -950,9 +950,9 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema,
try {
using namespace std::placeholders;
auto all_live = _gossiper.get_live_members();
auto live_members = all_live | boost::adaptors::filtered([this] (const gms::inet_address& endpoint) {
auto live_members = all_live | boost::adaptors::filtered([this, my_address = _messaging.broadcast_address()] (const gms::inet_address& endpoint) {
// only push schema to nodes with known and equal versions
return endpoint != utils::fb_utilities::get_broadcast_address() &&
return endpoint != my_address &&
_messaging.knows_version(endpoint) &&
_messaging.get_raw_version(endpoint) == netw::messaging_service::current_version;
});

View File

@@ -48,7 +48,7 @@ future<std::map<sstring, double>> load_meter::get_load_map() {
llogger.debug("load_broadcaster is not set yet!");
}
load_map.emplace(format("{}",
utils::fb_utilities::get_broadcast_address()), get_load());
_lb->gossiper().get_broadcast_address()), get_load());
return load_map;
});
}

View File

@@ -12,7 +12,8 @@
#include "service_level_controller.hh"
#include "message/messaging_service.hh"
#include "db/system_distributed_keyspace.hh"
#include "utils/fb_utilities.hh"
#include "cql3/query_processor.hh"
#include "service/storage_proxy.hh"
namespace qos {
static logging::logger sl_logger("service_level_controller");
@@ -432,7 +433,8 @@ future<> service_level_controller::do_remove_service_level(sstring name, bool re
void service_level_controller::on_join_cluster(const gms::inet_address& endpoint) { }
void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint) {
if (this_shard_id() == global_controller && endpoint == utils::fb_utilities::get_broadcast_address()) {
auto my_address = _auth_service.local().query_processor().proxy().local_db().get_token_metadata().get_topology().my_address();
if (this_shard_id() == global_controller && endpoint == my_address) {
_global_controller_db->dist_data_update_aborter.request_abort();
}
}

View File

@@ -1301,7 +1301,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
// This is a work-around for boost tests where RPC module is not listening so we cannot contact ourselves.
// But really, except the (arguably broken) test code, we don't need to be treated as an edge case. All nodes are symmetric.
// For production code this line is unnecessary.
entered_synchronize->insert(utils::fb_utilities::get_broadcast_address());
entered_synchronize->insert(ms.broadcast_address());
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
// We fetch the config again on every attempt to handle the possibility of removing failed nodes.

View File

@@ -20,6 +20,7 @@
#include "idl/group0_state_machine.dist.hh"
#include "idl/group0_state_machine.dist.impl.hh"
#include "service/raft/group0_state_machine.hh"
#include "replica/database.hh"
namespace service {
@@ -301,7 +302,7 @@ group0_command raft_group0_client::prepare_command(Command change, group0_guard&
.prev_state_id{guard.observed_group0_state_id()},
.new_state_id{guard.new_group0_state_id()},
.creator_addr{utils::fb_utilities::get_broadcast_address()},
.creator_addr{_sys_ks.local_db().get_token_metadata().get_topology().my_address()},
.creator_id{_raft_gr.group0().id()}
};
@@ -321,7 +322,7 @@ group0_command raft_group0_client::prepare_command(Command change, std::string_v
.prev_state_id{std::nullopt},
.new_state_id{new_group0_state_id},
.creator_addr{utils::fb_utilities::get_broadcast_address()},
.creator_addr{_sys_ks.local_db().get_token_metadata().get_topology().my_address()},
.creator_id{_raft_gr.group0().id()}
};

View File

@@ -24,7 +24,6 @@
#include "gc_clock.hh"
#include "service/raft/group0_state_machine.hh"
#include "db/system_keyspace.hh"
#include "utils/fb_utilities.hh"
namespace service {
// Obtaining this object means that all previously finished operations on group 0 are visible on this node.

View File

@@ -152,8 +152,16 @@ static future<ResultTuple> add_replica_exception_to_query_result(gms::feature_se
return encode_replica_exception_for_rpc<ResultTuple>(features, f.get_exception());
}
static bool only_me(const inet_address_vector_replica_set& replicas) {
return replicas.size() == 1 && replicas[0] == utils::fb_utilities::get_broadcast_address();
gms::inet_address storage_proxy::my_address() const noexcept {
return local_db().get_token_metadata().get_topology().my_address();
}
bool storage_proxy::is_me(gms::inet_address addr) const noexcept {
return local_db().get_token_metadata().get_topology().is_me(addr);
}
bool storage_proxy::only_me(const inet_address_vector_replica_set& replicas) const noexcept {
return replicas.size() == 1 && is_me(replicas[0]);
}
// This class handles all communication with other nodes in `storage_proxy`:
@@ -901,7 +909,6 @@ private:
};
using namespace exceptions;
using fbu = utils::fb_utilities;
static inline
query::digest_algorithm digest_algorithm(service::storage_proxy& proxy) {
@@ -1030,7 +1037,7 @@ public:
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) override {
const auto my_ip = utils::fb_utilities::get_broadcast_address();
const auto my_ip = sp.my_address();
auto m = _mutations[my_ip];
if (m) {
tracing::trace(tr_state, "Executing a mutation locally");
@@ -1045,7 +1052,7 @@ public:
if (m) {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
return sp.remote().send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, tracing::make_trace_info(tr_state),
*m, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(),
*m, forward, sp.my_address(), this_shard_id(),
response_id, rate_limit_info, fence);
}
sp.got_response(response_id, ep, std::nullopt);
@@ -1085,7 +1092,7 @@ public:
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) override {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.apply_fence(sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info), fence, utils::fb_utilities::get_broadcast_address());
return sp.apply_fence(sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info), fence, sp.my_address());
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
@@ -1093,7 +1100,7 @@ public:
fencing_token fence) override {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
return sp.remote().send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, tracing::make_trace_info(tr_state),
*_mutation, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(),
*_mutation, forward, sp.my_address(), this_shard_id(),
response_id, rate_limit_info, fence);
}
virtual bool is_shared() override {
@@ -1116,14 +1123,14 @@ public:
fencing_token fence) override {
// A hint will be sent to all relevant endpoints when the endpoint it was originally intended for
// becomes unavailable - this might include the current node
return sp.apply_fence(sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout), fence, utils::fb_utilities::get_broadcast_address());
return sp.apply_fence(sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout), fence, sp.my_address());
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token fence) override {
return sp.remote().send_hint_mutation(
netw::messaging_service::msg_addr{ep, 0}, timeout, tr_state,
*_mutation, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(), response_id, rate_limit_info, fence);
*_mutation, forward, sp.my_address(), this_shard_id(), response_id, rate_limit_info, fence);
}
};
@@ -1250,7 +1257,7 @@ public:
// TODO: Enforce per partition rate limiting in paxos
return sp.remote().send_paxos_learn(
netw::messaging_service::msg_addr{ep, 0}, timeout, tracing::make_trace_info(tr_state),
*_proposal, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(), response_id);
*_proposal, forward, sp.my_address(), this_shard_id(), response_id);
}
virtual bool is_shared() override {
return true;
@@ -1583,7 +1590,7 @@ private:
class datacenter_write_response_handler : public abstract_write_response_handler {
bool waited_for(gms::inet_address from) override {
const auto& topo = _effective_replication_map_ptr->get_topology();
return fbu::is_me(from) || (topo.get_datacenter(from) == topo.get_datacenter());
return topo.is_me(from) || (topo.get_datacenter(from) == topo.get_datacenter());
}
public:
@@ -1925,7 +1932,8 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
// sends query result content while other replicas send digests needed to check consistency.
bool only_digest = peer != _live_endpoints[0];
auto da = digest_algorithm(*_proxy);
if (fbu::is_me(peer)) {
const auto& topo = _effective_replication_map_ptr->get_topology();
if (topo.is_me(peer)) {
tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot);
response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
} else {
@@ -2082,9 +2090,10 @@ future<bool> paxos_response_handler::accept_proposal(lw_shared_ptr<paxos::propos
auto handle_one_msg = [this, &request_tracker, timeout_if_partially_accepted, proposal = std::move(proposal)] (gms::inet_address peer) mutable -> future<> {
bool is_timeout = false;
std::optional<bool> accepted;
const auto& topo = _effective_replication_map_ptr->get_topology();
try {
if (fbu::is_me(peer)) {
if (topo.is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", *proposal);
accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
} else {
@@ -2236,11 +2245,13 @@ void paxos_response_handler::prune(utils::UUID ballot) {
}
_proxy->get_stats().cas_now_pruning++;
_proxy->get_stats().cas_prune++;
auto erm = _effective_replication_map_ptr;
auto my_address = _proxy->my_address();
// running in the background, but the amount of the bg job is limited by pruning_limit
// it is waited by holding shared pointer to storage_proxy which guaranties
// that storage_proxy::stop() will wait for this to complete
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
if (fbu::is_me(peer)) {
(void)parallel_for_each(_live_endpoints, [this, ballot, erm, my_address] (gms::inet_address peer) mutable {
if (peer == my_address) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_proxy->remote().system_keyspace(), _schema, _key.key(), ballot, _timeout, tr_state);
} else {
@@ -2734,7 +2745,7 @@ void storage_proxy_stats::stats::register_stats() {
}
inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept {
if (fbu::is_me(ep)) {
if (topo.is_me(ep)) {
return _local.val;
}
@@ -3019,7 +3030,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints);
const bool coordinator_in_replica_set = std::find(natural_endpoints.begin(), natural_endpoints.end(),
utils::fb_utilities::get_broadcast_address()) != natural_endpoints.end();
my_address()) != natural_endpoints.end();
// Check if this node, which is serving as a coordinator for
// the mutation, is also a replica for the partition being
@@ -3271,7 +3282,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation&
throw exceptions::unavailable_exception(cl, block_for(erm, cl), 0);
}
const auto my_address = utils::fb_utilities::get_broadcast_address();
const auto my_address = this->my_address();
// Early return if coordinator can become the leader (so one extra internode message can be
// avoided). With token-aware drivers this is the expected case, so we are doing it ASAP.
if (boost::algorithm::any_of_equal(live_endpoints, my_address)) {
@@ -3320,7 +3331,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
}
// Forward mutations to the leaders chosen for them
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_address = this->my_address();
co_await coroutine::parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), permit = std::move(permit), my_address, fence] (auto& endpoint_and_mutations) -> future<> {
auto first_schema = endpoint_and_mutations.second[0].s;
@@ -3537,7 +3548,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
}
static inet_address_vector_replica_set endpoint_filter(
const noncopyable_function<bool(const gms::inet_address&)>& is_alive,
const noncopyable_function<bool(const gms::inet_address&)>& is_alive, gms::inet_address my_address,
const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
// special case for single-node data centers
if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) {
@@ -3547,9 +3558,8 @@ static inet_address_vector_replica_set endpoint_filter(
// strip out dead endpoints and localhost
std::unordered_multimap<sstring, gms::inet_address> validated;
auto is_valid = [&is_alive] (gms::inet_address input) {
return input != utils::fb_utilities::get_broadcast_address()
&& is_alive(input);
auto is_valid = [&is_alive, my_address] (gms::inet_address input) {
return input != my_address && is_alive(input);
};
for (auto& e : endpoints) {
@@ -3640,12 +3650,12 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
, _batch_uuid(utils::UUID_gen::get_time_UUID())
, _batchlog_endpoints(
[this]() -> inet_address_vector_replica_set {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto local_addr = _p.my_address();
auto& topology = _ermp->get_topology();
auto local_dc = topology.get_datacenter();
auto& local_endpoints = topology.get_datacenter_racks().at(local_dc);
auto local_rack = topology.get_rack();
auto chosen_endpoints = endpoint_filter(std::bind_front(&storage_proxy::is_alive, &_p),
auto chosen_endpoints = endpoint_filter(std::bind_front(&storage_proxy::is_alive, &_p), local_addr,
local_rack, local_endpoints);
if (chosen_endpoints.empty()) {
@@ -3925,7 +3935,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto& stats = handler_ptr->stats();
auto& handler = *handler_ptr;
auto& global_stats = handler._proxy->_global_stats;
if (handler.get_targets().size() != 1 || !fbu::is_me(handler.get_targets()[0])) {
if (handler.get_targets().size() != 1 || !is_me(handler.get_targets()[0])) {
auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology();
auto local_dc = topology.get_datacenter();
@@ -3954,7 +3964,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
}
auto all = boost::range::join(local, dc_groups);
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_address = this->my_address();
// lambda for applying mutation locally
auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable {
@@ -4223,7 +4233,7 @@ public:
private:
bool waiting_for(gms::inet_address ep) {
const auto& topo = _effective_replication_map_ptr->get_topology();
return db::is_datacenter_local(_cl) ? fbu::is_me(ep) || (topo.get_datacenter(ep) == topo.get_datacenter()) : true;
return db::is_datacenter_local(_cl) ? topo.is_me(ep) || (topo.get_datacenter(ep) == topo.get_datacenter()) : true;
}
void got_response(gms::inet_address ep) {
if (!_cl_reported) {
@@ -4843,9 +4853,9 @@ public:
protected:
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(get_topology(), ep);
if (fbu::is_me(ep)) {
if (_proxy->is_me(ep)) {
tracing::trace(_trace_state, "read_mutation_data: querying locally");
return _proxy->apply_fence(_proxy->query_mutations_locally(_schema, cmd, _partition_range, timeout, _trace_state), get_fence(), utils::fb_utilities::get_broadcast_address());
return _proxy->apply_fence(_proxy->query_mutations_locally(_schema, cmd, _partition_range, timeout, _trace_state), get_fence(), _proxy->my_address());
} else {
return _proxy->remote().send_read_mutation_data(netw::messaging_service::msg_addr{ep, 0}, timeout,
_trace_state, *cmd, _partition_range,
@@ -4857,9 +4867,9 @@ protected:
auto opts = want_digest
? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)}
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (fbu::is_me(ep)) {
if (_proxy->is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
return _proxy->apply_fence(_proxy->query_result_local(_effective_replication_map_ptr, _schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address());
return _proxy->apply_fence(_proxy->query_result_local(_effective_replication_map_ptr, _schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), _proxy->my_address());
} else {
return _proxy->remote().send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout,
_trace_state, *_cmd, _partition_range, opts.digest_algo, _rate_limit_info,
@@ -4868,10 +4878,10 @@ protected:
}
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature, std::optional<full_position>>> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep);
if (fbu::is_me(ep)) {
if (_proxy->is_me(ep)) {
tracing::trace(_trace_state, "read_digest: querying locally");
return _proxy->apply_fence(_proxy->query_result_local_digest(_effective_replication_map_ptr, _schema, _cmd, _partition_range, _trace_state,
timeout, digest_algorithm(*_proxy), adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address());
timeout, digest_algorithm(*_proxy), adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), _proxy->my_address());
} else {
tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep);
return _proxy->remote().send_read_digest(netw::messaging_service::msg_addr{ep, 0}, timeout,
@@ -5313,7 +5323,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_endpoints_for_reading()
// orders the list by proximity to the local endpoint.
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != erm->get_topology().my_address();
auto cf = _db.local().find_column_family(schema).shared_from_this();
inet_address_vector_replica_set target_replicas = filter_replicas_for_read(cl, *erm, all_replicas, preferred_endpoints, repair_decision,
@@ -6224,9 +6234,9 @@ inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator:
}
void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, inet_address_vector_replica_set& eps) const {
topo.sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps);
topo.sort_by_proximity(my_address(), eps);
// FIXME: before dynamic snitch is implement put local address (if present) at the beginning
auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address());
auto it = boost::range::find(eps, my_address());
if (it != eps.end() && it != eps.begin()) {
std::iter_swap(it, eps.begin());
}
@@ -6273,7 +6283,7 @@ storage_proxy::filter_replicas_for_read(
}
bool storage_proxy::is_alive(const gms::inet_address& ep) const {
return _remote ? _remote->is_alive(ep) : (ep == utils::fb_utilities::get_broadcast_address());
return _remote ? _remote->is_alive(ep) : is_me(ep);
}
inet_address_vector_replica_set storage_proxy::intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2) {

View File

@@ -495,7 +495,13 @@ public:
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded<db::system_keyspace>& sys_ks);
future<> stop_remote();
gms::inet_address my_address() const noexcept;
bool is_me(gms::inet_address addr) const noexcept;
private:
bool only_me(const inet_address_vector_replica_set& replicas) const noexcept;
// Throws an error if remote is not initialized.
const struct remote& remote() const;
struct remote& remote();

View File

@@ -78,7 +78,6 @@
#include <seastar/coroutine/exception.hh>
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include "utils/fb_utilities.hh"
#include "locator/util.hh"
#include "idl/storage_service.dist.hh"
#include "service/storage_proxy.hh"
@@ -407,7 +406,7 @@ future<> storage_service::topology_state_load() {
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (!utils::fb_utilities::is_me(ip)) {
if (!is_me(ip)) {
// Some state that is used to fill in 'peeers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
@@ -465,7 +464,7 @@ future<> storage_service::topology_state_load() {
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
if (!utils::fb_utilities::is_me(ip)) {
if (!is_me(ip)) {
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
co_await _sys_ks.local().update_tokens(ip, {});
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
@@ -538,7 +537,7 @@ future<> storage_service::topology_state_load() {
// endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds
// will be up to date and reachable at the time of restart.
for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) {
if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
co_await _gossiper.add_saved_endpoint(e);
}
}
@@ -1074,7 +1073,7 @@ class topology_coordinator {
}
slogger.trace("raft topology: send {} command with term {} and index {} to {}/{}",
cmd.cmd, _term, cmd_index, id, *ip);
auto result = utils::fb_utilities::is_me(*ip) ?
auto result = _db.get_token_metadata().get_topology().is_me(*ip) ?
co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) :
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, netw::msg_addr{*ip}, id, _term, cmd_index, cmd);
@@ -3082,7 +3081,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
auto broadcast_rpc_address = get_token_metadata_ptr()->get_topology().my_cql_address();
// Ensure we know our own actual Schema UUID in preparation for updates
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
@@ -5661,7 +5660,7 @@ future<> storage_service::rebuild(sstring source_dc) {
}
auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false);
co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, ss.get_broadcast_address()), ss._gossiper, false);
}
try {
co_await streamer->stream_async();
@@ -6085,9 +6084,9 @@ future<> storage_service::load_tablet_metadata() {
future<> storage_service::snitch_reconfigured() {
assert(this_shard_id() == 0);
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
co_await mutate_token_metadata([&] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), snitch->get_location());
tmptr->update_topology(get_broadcast_address(), snitch->get_location());
return make_ready_future<>();
});
@@ -6340,7 +6339,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), _gossiper, false);
co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, get_broadcast_address()), _gossiper, false);
}
try {
co_await streamer->stream_async();

View File

@@ -25,7 +25,6 @@
#include "gms/application_state.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include "utils/fb_utilities.hh"
#include "replica/database_fwd.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/distributed.hh>
@@ -264,9 +263,13 @@ public:
}
private:
inet_address get_broadcast_address() const {
return utils::fb_utilities::get_broadcast_address();
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
bool is_me(inet_address addr) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(addr);
}
/* This abstraction maintains the token/endpoint metadata information */
shared_token_metadata& _shared_token_metadata;
locator::effective_replication_map_factory& _erm_factory;

View File

@@ -16,7 +16,6 @@
#include "streaming/stream_result_future.hh"
#include "streaming/stream_manager.hh"
#include "dht/i_partitioner.hh"
#include "utils/fb_utilities.hh"
#include "streaming/stream_plan.hh"
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
@@ -117,7 +116,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
utils::fb_utilities::get_broadcast_address())));
_db.local().get_token_metadata().get_topology().my_address())));
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {

View File

@@ -9,7 +9,6 @@
#include <boost/test/unit_test.hpp>
#include "locator/gossiping_property_file_snitch.hh"
#include "utils/fb_utilities.hh"
#include "test/lib/scylla_test_case.hh"
#include <seastar/util/std-compat.hh>
#include <seastar/core/reactor.hh>
@@ -32,14 +31,15 @@ future<> one_test(const std::string& property_fname, bool exp_result) {
path fname(test_files_subdir);
fname /= path(property_fname);
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
engine().set_strict_dma(false);
auto my_address = gms::inet_address("localhost");
snitch_config cfg;
cfg.name = "org.apache.cassandra.locator.GossipingPropertyFileSnitch";
cfg.properties_file_name = fname.string();
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
auto snitch_i = std::make_unique<sharded<locator::snitch_ptr>>();
auto& snitch = *snitch_i;

View File

@@ -82,7 +82,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(
id, gc_clock::duration{0}, "test")},
.new_state_id = id,
.creator_addr{utils::fb_utilities::get_broadcast_address()},
.creator_addr{env.db().local().get_token_metadata().get_topology().my_address()},
.creator_id{group0.id()}
};
std::vector<canonical_mutation> cms;

View File

@@ -14,7 +14,6 @@
#include "locator/types.hh"
#include "test/lib/scylla_test_case.hh"
#include "utils/fb_utilities.hh"
#include "locator/host_id.hh"
#include "locator/topology.hh"
#include "locator/load_sketch.hh"
@@ -32,7 +31,6 @@ SEASTAR_THREAD_TEST_CASE(test_add_node) {
auto id3 = host_id::create_random_id();
auto ep3 = gms::inet_address("127.0.0.3");
utils::fb_utilities::set_broadcast_address(ep1);
topology::config cfg = {
.this_endpoint = ep1,
.local_dc_rack = endpoint_dc_rack::default_location,
@@ -70,7 +68,6 @@ SEASTAR_THREAD_TEST_CASE(test_moving) {
auto id1 = host_id::create_random_id();
auto ep1 = gms::inet_address("127.0.0.1");
utils::fb_utilities::set_broadcast_address(ep1);
topology::config cfg = {
.this_endpoint = ep1,
.local_dc_rack = endpoint_dc_rack::default_location,
@@ -100,7 +97,6 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
auto ep2 = gms::inet_address("127.0.0.2");
auto ep3 = gms::inet_address("127.0.0.3");
utils::fb_utilities::set_broadcast_address(ep1);
topology::config cfg = {
.this_endpoint = ep1,
.local_dc_rack = endpoint_dc_rack::default_location,
@@ -193,7 +189,6 @@ SEASTAR_THREAD_TEST_CASE(test_remove_endpoint) {
.rack = "rack2"
};
utils::fb_utilities::set_broadcast_address(ep1);
topology::config cfg = {
.this_endpoint = ep1,
.local_dc_rack = dc_rack1

View File

@@ -11,7 +11,6 @@
#include "gms/inet_address.hh"
#include "locator/types.hh"
#include "utils/UUID_gen.hh"
#include "utils/fb_utilities.hh"
#include "utils/sequenced_set.hh"
#include "locator/network_topology_strategy.hh"
#include "test/lib/scylla_test_case.hh"
@@ -225,19 +224,20 @@ locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
// Run in a seastar thread.
void simple_test() {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.name = "RackInferringSnitch";
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
@@ -307,12 +307,13 @@ void simple_test() {
// Run in a seastar thread.
void heavy_origin_test() {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.name = "RackInferringSnitch";
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
@@ -386,11 +387,12 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_heavy) {
}
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
@@ -398,7 +400,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
@@ -701,8 +703,10 @@ void generate_topology(topology& topo, const std::unordered_map<sstring, size_t>
}
SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
locator::token_metadata::config tm_cfg;
auto my_address = gms::inet_address("localhost");
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
constexpr size_t NODES = 100;
constexpr size_t VNODES = 64;
@@ -723,7 +727,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
for (size_t run = 0; run < RUNS; ++run) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{});
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
std::unordered_set<dht::token> random_tokens;
while (random_tokens.size() < nodes.size() * VNODES) {
@@ -810,8 +814,10 @@ void topology::test_compare_endpoints(const inet_address& address, const inet_ad
} // namespace locator
SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
locator::token_metadata::config tm_cfg;
auto my_address = gms::inet_address("localhost");
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
constexpr size_t NODES = 10;
@@ -833,7 +839,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
auto bogus_address = make_address(NODES + 1);
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{});
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
stm.mutate_token_metadata([&] (token_metadata& tm) {
auto& topo = tm.get_topology();
generate_topology(topo, datacenters, nodes);

View File

@@ -9,7 +9,6 @@
#include <boost/test/unit_test.hpp>
#include "locator/gossiping_property_file_snitch.hh"
#include "utils/fb_utilities.hh"
#include "test/lib/scylla_test_case.hh"
#include <seastar/util/std-compat.hh>
#include <vector>
@@ -26,9 +25,6 @@ future<> one_test(const std::string& property_fname1,
using namespace locator;
using namespace std::filesystem;
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
printf("Testing %s and %s property files. Expected result is %s\n",
property_fname1.c_str(), property_fname2.c_str(),
(exp_result ? "success" : "failure"));
@@ -39,6 +35,7 @@ future<> one_test(const std::string& property_fname1,
auto cpu0_dc_new = make_lw_shared<sstring>();
auto cpu0_rack_new = make_lw_shared<sstring>();
sharded<snitch_ptr> snitch;
auto my_address = gms::inet_address("localhost");
try {
path fname1(test_files_subdir);
@@ -51,6 +48,8 @@ future<> one_test(const std::string& property_fname1,
snitch_config cfg;
cfg.name = "org.apache.cassandra.locator.GossipingPropertyFileSnitch";
cfg.properties_file_name = fname1.string();
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
snitch.start(cfg).get();
snitch.invoke_on_all(&snitch_ptr::start).get();
} catch (std::exception& e) {

View File

@@ -23,7 +23,6 @@
#include "locator/tablet_sharder.hh"
#include "locator/load_sketch.hh"
#include "locator/tablet_replication_strategy.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
#include "utils/error_injection.hh"
@@ -434,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
tokm.get_topology().add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(), h1);
tokm.get_topology().add_or_update_endpoint(tokm.get_topology().my_address(), h1);
std::vector<tablet_id> tablet_ids;
{

View File

@@ -8,7 +8,6 @@
#include <boost/test/unit_test.hpp>
#include "test/lib/scylla_test_case.hh"
#include "utils/fb_utilities.hh"
#include "locator/token_metadata.hh"
#include "locator/simple_strategy.hh"
#include "locator/everywhere_replication_strategy.hh"
@@ -29,6 +28,7 @@ namespace {
return make_lw_shared<token_metadata>(token_metadata::config {
topology::config {
.this_endpoint = this_endpoint,
.this_cql_address = this_endpoint,
.local_dc_rack = get_dc_rack(this_endpoint)
}
});

View File

@@ -68,7 +68,6 @@
#include "service/raft/raft_group0.hh"
#include "sstables/sstables_manager.hh"
#include "init.hh"
#include "utils/fb_utilities.hh"
#include <sys/time.h>
#include <sys/resource.h>
@@ -413,9 +412,6 @@ public:
}).get();
});
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
single_node_cql_env env;
env.run_in_thread(std::move(func), std::move(cfg_in), std::move(init_configurables));
});
@@ -494,12 +490,18 @@ private:
_feature_service.start(fcfg).get();
auto stop_feature_service = defer([this] { _feature_service.stop().get(); });
_snitch.start(locator::snitch_config{}).get();
auto my_address = cfg_in.broadcast_address;
locator::snitch_config snitch_config;
snitch_config.listen_address = my_address;
snitch_config.broadcast_address = my_address;
_snitch.start(snitch_config).get();
auto stop_snitch = defer([this] { _snitch.stop().get(); });
_snitch.invoke_on_all(&locator::snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
tm_cfg.topo_cfg.local_dc_rack = { _snitch.local()->get_datacenter(), _snitch.local()->get_rack() };
_token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_metadata = defer([this] { _token_metadata.stop().get(); });
@@ -631,12 +633,12 @@ private:
linfo.host_id = locator::host_id::create_random_id();
}
host_id = linfo.host_id;
_sys_ks.local().save_local_info(std::move(linfo), _snitch.local()->get_location()).get();
_sys_ks.local().save_local_info(std::move(linfo), _snitch.local()->get_location(), my_address, my_address).get();
}
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = host_id] (locator::token_metadata& tm) {
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = host_id, &cfg_in] (locator::token_metadata& tm) {
auto& topo = tm.get_topology();
topo.set_host_id_cfg(hostid);
topo.add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(),
topo.add_or_update_endpoint(cfg_in.broadcast_address,
hostid,
std::nullopt,
locator::node::state::normal,

View File

@@ -94,6 +94,7 @@ public:
bool need_remote_proxy = false;
std::optional<uint64_t> initial_tablets; // When engaged, the default keyspace will use tablets.
locator::host_id host_id;
gms::inet_address broadcast_address = gms::inet_address("localhost");
cql_test_config();
cql_test_config(const cql_test_config&);

View File

@@ -9,7 +9,6 @@
#include <boost/test/unit_test.hpp>
#include "locator/ec2_snitch.hh"
#include "utils/fb_utilities.hh"
#include <seastar/testing/test_case.hh>
#include <seastar/util/std-compat.hh>
#include <vector>
@@ -31,12 +30,13 @@ future<> one_test(const std::string& property_fname, bool exp_result) {
path fname(test_files_subdir);
fname /= path(property_fname);
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
auto my_address = gms::inet_address("localhost");
snitch_config cfg;
cfg.name = "Ec2Snitch";
cfg.properties_file_name = fname.string();
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
auto snitch_i = std::make_unique<sharded<locator::snitch_ptr>>();
auto& snitch = *snitch_i;
return snitch.start(cfg).then([&snitch] () {

View File

@@ -21,7 +21,6 @@
#include <boost/test/unit_test.hpp>
#include "locator/gce_snitch.hh"
#include "utils/fb_utilities.hh"
#include <seastar/testing/test_case.hh>
#include <seastar/http/httpd.hh>
#include <seastar/net/inet_address.hh>
@@ -54,8 +53,7 @@ future<> one_test(const std::string& property_fname, bool exp_result) {
fs::path fname(test_files_subdir);
fname /= fs::path(property_fname);
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
auto my_address = gms::inet_address("localhost");
char* meta_url_env = std::getenv(DUMMY_META_SERVER_IP);
char* use_gce_server = std::getenv(USE_GCE_META_SERVER);
@@ -84,6 +82,8 @@ future<> one_test(const std::string& property_fname, bool exp_result) {
cfg.name = "GoogleCloudSnitch";
cfg.properties_file_name = fname.string();
cfg.gce_meta_server_url = meta_url;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
snitch.invoke_on_all(&snitch_ptr::start).get();

View File

@@ -16,7 +16,6 @@
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"
#include "utils/fb_utilities.hh"
#include "log.hh"
#include <seastar/core/thread.hh>
#include <chrono>
@@ -55,8 +54,6 @@ int main(int ac, char ** av) {
auto config = app.configuration();
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
utils::fb_utilities::set_broadcast_address(listen);
utils::fb_utilities::set_broadcast_rpc_address(listen);
auto cfg = std::make_unique<db::config>();
sharded<abort_source> abort_sources;
@@ -65,7 +62,12 @@ int main(int ac, char ** av) {
abort_sources.start().get();
auto stop_abort_source = defer([&] { abort_sources.stop().get(); });
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{}).get();
locator::token_metadata::config tm_cfg;
auto my_address = gms::inet_address("localhost");
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_mgr = defer([&] { token_metadata.stop().get(); });
messaging.start(locator::host_id{}, listen, 7000).get();

View File

@@ -12,7 +12,9 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/app-template.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/thread.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/closeable.hh>
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
@@ -20,11 +22,15 @@
#include "gms/gossip_digest.hh"
#include "api/api.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "log.hh"
#include "locator/token_metadata.hh"
#include "db/schema_tables.hh"
using namespace std::chrono_literals;
using namespace netw;
logging::logger test_logger("message_test");
class tester {
private:
messaging_service& ms;
@@ -53,7 +59,7 @@ public:
public:
void init_handler() {
ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) {
fmt::print("Server got syn msg = {}\n", msg);
test_logger.info("Server got syn msg = {}", msg);
auto from = netw::messaging_service::get_source(cinfo);
auto ep1 = inet_address("1.1.1.1");
@@ -70,13 +76,13 @@ public:
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
// FIXME: discarded future.
(void)ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) {
fmt::print("Fail to send ack : {}", ep);
test_logger.error("Fail to send ack : {}", ep);
});
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) {
fmt::print("Server got ack msg = {}\n", msg);
test_logger.info("Server got ack msg = {}", msg);
auto from = netw::messaging_service::get_source(cinfo);
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
@@ -86,32 +92,41 @@ public:
gms::gossip_digest_ack2 ack2(std::move(eps));
// FIXME: discarded future.
(void)ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) {
fmt::print("Fail to send ack2 : {}", ep);
test_logger.error("Fail to send ack2 : {}", ep);
});
digest_test_done.set_value();
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gms::gossip_digest_ack2 msg) {
fmt::print("Server got ack2 msg = {}\n", msg);
test_logger.info("Server got ack2 msg = {}", msg);
return messaging_service::no_wait();
});
ms.register_gossip_shutdown([] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
fmt::print("Server got shutdown msg = {}\n", from);
test_logger.info("Server got shutdown msg = {}", from);
return messaging_service::no_wait();
});
ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional<int64_t> gen_opt) {
fmt::print("Server got gossip echo msg\n");
test_logger.info("Server got gossip echo msg");
throw std::runtime_error("I'm throwing runtime_error exception");
return make_ready_future<>();
});
}
future<> deinit_handler() {
co_await ms.unregister_gossip_digest_syn();
co_await ms.unregister_gossip_digest_ack();
co_await ms.unregister_gossip_digest_ack2();
co_await ms.unregister_gossip_shutdown();
co_await ms.unregister_gossip_echo();
test_logger.info("tester deinit_hadler done");
}
public:
future<> test_gossip_digest() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
// Prepare gossip_digest_syn message
auto id = get_msg_addr();
auto ep1 = inet_address("1.1.1.1");
@@ -123,23 +138,24 @@ public:
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests, utils::null_uuid());
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] {
test_logger.info("Sent gossip sigest syn. Waiting for digest_test_done...");
return digest_test_done.get_future();
});
}
future<> test_gossip_shutdown() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
auto id = get_msg_addr();
inet_address from("127.0.0.1");
int64_t gen = 0x1;
return ms.send_gossip_shutdown(id, from, gen).then([] () {
fmt::print("Client sent gossip_shutdown got reply = void\n");
test_logger.info("Client sent gossip_shutdown got reply = void");
return make_ready_future<>();
});
}
future<> test_echo() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
auto id = get_msg_addr();
int64_t gen = 0x1;
return ms.send_gossip_echo(id, gen, std::chrono::seconds(10)).then_wrapped([] (auto&& f) {
@@ -147,7 +163,7 @@ public:
f.get();
return make_ready_future<>();
} catch (std::runtime_error& e) {
fmt::print("test_echo: {}\n", e.what());
test_logger.error("test_echo: {}", e.what());
}
return make_ready_future<>();
});
@@ -156,6 +172,7 @@ public:
namespace bpo = boost::program_options;
// Usage example: build/dev/test/manual/message --listen 127.0.0.1 --server 127.0.0.1
int main(int ac, char ** av) {
app_template app;
app.add_options()
@@ -168,45 +185,47 @@ int main(int ac, char ** av) {
distributed<replica::database> db;
return app.run_deprecated(ac, av, [&app] {
auto config = app.configuration();
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
utils::fb_utilities::set_broadcast_address(listen);
seastar::sharded<netw::messaging_service> messaging;
return messaging.start(locator::host_id{}, listen, 7000).then([config, stay_alive, &messaging] () {
auto testers = new distributed<tester>;
return testers->start(std::ref(messaging)).then([testers]{
auto port = testers->local().port();
std::cout << "Messaging server listening on port " << port << " ...\n";
return testers->invoke_on_all(&tester::init_handler);
}).then([testers, config, stay_alive, &messaging] {
auto t = &testers->local();
if (!config.contains("server")) {
return make_ready_future<>();
}
return seastar::async([&app] {
auto config = app.configuration();
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
auto my_address = listen != gms::inet_address("0.0.0.0") ? listen : gms::inet_address("localhost");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
sharded<locator::shared_token_metadata> token_metadata;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_tm = deferred_stop(token_metadata);
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();
auto stop_testers = deferred_stop(testers);
auto port = testers.local().port();
test_logger.info("Messaging server listening on {} port {}", listen, port);
testers.invoke_on_all(&tester::init_handler).get();
auto deinit_testers = deferred_action([&testers] {
testers.invoke_on_all(&tester::deinit_handler).get();
});
messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)).get();
if (config.contains("server")) {
auto ip = config["server"].as<std::string>();
auto cpuid = config["cpuid"].as<uint32_t>();
auto t = &testers.local();
t->set_server_ip(ip);
t->set_server_cpuid(cpuid);
fmt::print("=============TEST START===========\n");
fmt::print("Sending to server ....\n");
return t->test_gossip_digest().then([t] {
return t->test_gossip_shutdown();
}).then([t] {
return t->test_echo();
}).then([testers, stay_alive, &messaging] {
if (stay_alive) {
return make_ready_future<>();
}
fmt::print("=============TEST DONE===========\n");
return testers->stop().then([testers, &messaging] {
delete testers;
return messaging.stop().then([]{
engine().exit(0);
});
});
});
});
test_logger.info("=============TEST START===========");
test_logger.info("Sending to server ....");
t->test_gossip_digest().get();
t->test_gossip_shutdown().get();
t->test_echo().get();
test_logger.info("=============TEST DONE===========");
}
while (stay_alive) {
seastar::sleep(1s).get();
}
}).finally([] {
exit(0);
});
});
}

View File

@@ -8,23 +8,26 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include <limits>
#include "thrift_validation.hh"
#include "thrift/utils.hh"
#include "db/system_keyspace.hh"
#include <boost/regex.hpp>
#include "utils/fb_utilities.hh"
using namespace thrift;
using namespace ::apache::thrift;
namespace thrift_validation {
static constexpr uint32_t MAX_UNSIGNED_SHORT = std::numeric_limits<uint16_t>::max();
void validate_key(const schema& s, const bytes_view& k) {
if (k.empty()) {
throw make_exception<InvalidRequestException>("Key may not be empty");
}
auto max = static_cast<uint32_t>(utils::fb_utilities::MAX_UNSIGNED_SHORT);
auto max = MAX_UNSIGNED_SHORT;
if (k.size() > max) {
throw make_exception<InvalidRequestException>("Key length of {} is longer than maximum of {}", k.size(), max);
}
@@ -64,7 +67,7 @@ void validate_cf_def(const CfDef& cf_def) {
}
void validate_column_name(const std::string& name) {
auto max_name_length = static_cast<uint32_t>(utils::fb_utilities::MAX_UNSIGNED_SHORT);
auto max_name_length = MAX_UNSIGNED_SHORT;
if (name.size() > max_name_length) {
throw make_exception<InvalidRequestException>("column name length must not be greater than {}", max_name_length);
}

View File

@@ -38,7 +38,6 @@
#include "gms/feature_service.hh"
#include "locator/abstract_replication_strategy.hh"
#include "tools/schema_loader.hh"
#include "utils/fb_utilities.hh"
namespace {
@@ -219,8 +218,11 @@ std::vector<schema_ptr> do_load_schemas(std::string_view schema_str) {
feature_service.enable(feature_service.supported_feature_set()).get();
sharded<locator::shared_token_metadata> token_metadata;
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{}).get();
auto my_address = gms::inet_address("localhost");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_metadata = deferred_stop(token_metadata);
data_dictionary_impl dd_impl;

View File

@@ -17,8 +17,8 @@
#include "types/set.hh"
#include "types/map.hh"
#include "utils/UUID_gen.hh"
#include "utils/fb_utilities.hh"
#include "utils/class_registrator.hh"
#include "service/storage_proxy.hh"
namespace tracing {
@@ -214,6 +214,10 @@ future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migrat
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
}
gms::inet_address trace_keyspace_helper::my_address() const noexcept {
return _qp_anchor->proxy().local_db().get_token_metadata().get_topology().my_address();
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
// Future is waited on indirectly in `stop()` (via `_pending_writes`).
(void)with_gate(_pending_writes, [this, records = std::move(records)] {
@@ -244,7 +248,7 @@ void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) {
});
}
cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_session_records& session_records) {
cql3::query_options trace_keyspace_helper::make_session_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
const session_record& record = session_records.session_rec;
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
std::vector<std::pair<data_value, data_value>> parameters_values_vector;
@@ -269,7 +273,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(utf8_type->decompose(type_to_string(record.command))),
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
cql3::raw_value::make_value(utf8_type->decompose(record.request)),
@@ -284,7 +288,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
// timestamp in minutes when the query began
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
@@ -302,7 +306,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
const session_record& record = session_records.session_rec;
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
@@ -325,7 +329,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
auto my_set_type = set_type_impl::get_instance(utf8_type, true);
std::vector<cql3::raw_value> values({
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(this_shard_id()))),
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
@@ -345,7 +349,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
// timestamp in minutes when the query began
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
@@ -357,7 +361,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(int32_t(this_shard_id()))),
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count())))
});
@@ -366,14 +370,14 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record) {
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(session_records.backend_state_ptr.get());
std::vector<cql3::raw_value> values({
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timeuuid_type->decompose(utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(backend_state_ptr->last_nanos, record.event_time_point)))),
cql3::raw_value::make_value(utf8_type->decompose(record.message)),
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(utf8_type->decompose(_local_tracing.get_thread_name())),
cql3::raw_value::make_value(long_type->decompose(int64_t(session_records.parent_id.get_id()))),
@@ -396,7 +400,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
std::vector<cql3::raw_value_vector_with_unset> values;
values.reserve(events_records.size());
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(my_address(), *all_records, one_event_record)); });
return do_with(
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)),
@@ -440,9 +444,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// if session is finished - store a session and a session time index entries
tlogger.trace("{}: going to store a session event", records->session_id);
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, &mm, records] {
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, my_address(), std::ref(*records)).then([this, &qp, &mm, records] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, my_address(), std::ref(*records));
}).then([this, &qp, &mm, records] {
if (!records->do_log_slow_query) {
return now();
@@ -451,9 +455,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// if slow query log is requested - store a slow query log and a slow query log time index entries
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
tlogger.trace("{}: going to store a slow query event", records->session_id);
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, my_address(), std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, my_address(), std::ref(*records), start_time_id);
});
});
} else {

View File

@@ -73,6 +73,9 @@ public:
virtual std::unique_ptr<backend_session_state_base> allocate_session_state() const override;
private:
// Valid only after start() sets _qp_anchor
gms::inet_address my_address() const noexcept;
/**
* Write records of a single tracing session
*
@@ -115,7 +118,7 @@ private:
*
* @return the relevant cql3::query_options object with the mutation data
*/
static cql3::query_options make_session_mutation_data(const one_session_records& all_records_handle);
static cql3::query_options make_session_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle);
/**
* Create a mutation data for a new session_idx record
@@ -124,7 +127,7 @@ private:
*
* @return the relevant cql3::query_options object with the mutation data
*/
static cql3::query_options make_session_time_idx_mutation_data(const one_session_records& all_records_handle);
static cql3::query_options make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle);
/**
* Create mutation for a new slow_query_log record
@@ -134,7 +137,7 @@ private:
*
* @return the relevant mutation
*/
static cql3::query_options make_slow_query_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id);
static cql3::query_options make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id);
/**
* Create mutation for a new slow_query_log_time_idx record
@@ -144,7 +147,7 @@ private:
*
* @return the relevant mutation
*/
static cql3::query_options make_slow_query_time_idx_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id);
static cql3::query_options make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id);
/**
* Create a mutation data for a new trace point record
@@ -156,7 +159,7 @@ private:
*
* @return a vector with the mutation data
*/
std::vector<cql3::raw_value> make_event_mutation_data(one_session_records& session_records, const event_record& record);
std::vector<cql3::raw_value> make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record);
/**
* Converts a @param elapsed to an int32_t value of microseconds.

View File

@@ -9,7 +9,6 @@
*/
#include "i_filter.hh"
#include "fb_utilities.hh"
#include "bytes.hh"
#include "utils/murmur_hash.hh"
#include <seastar/core/shared_ptr.hh>

View File

@@ -1,58 +0,0 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <cstdint>
#include <optional>
#include "gms/inet_address.hh"
namespace utils {
using inet_address = gms::inet_address;
class fb_utilities {
private:
static std::optional<inet_address>& broadcast_address() noexcept {
static std::optional<inet_address> _broadcast_address;
return _broadcast_address;
}
static std::optional<inet_address>& broadcast_rpc_address() noexcept {
static std::optional<inet_address> _broadcast_rpc_address;
return _broadcast_rpc_address;
}
public:
static constexpr int32_t MAX_UNSIGNED_SHORT = 0xFFFF;
static void set_broadcast_address(inet_address addr) noexcept {
broadcast_address() = addr;
}
static void set_broadcast_rpc_address(inet_address addr) noexcept {
broadcast_rpc_address() = addr;
}
static const inet_address get_broadcast_address() noexcept {
assert(broadcast_address());
return *broadcast_address();
}
static const inet_address get_broadcast_rpc_address() noexcept {
assert(broadcast_rpc_address());
return *broadcast_rpc_address();
}
static bool is_me(gms::inet_address addr) noexcept {
return addr == get_broadcast_address();
}
};
}