Files
scylladb/locator/snitch_base.hh
Pavel Emelyanov a1ea553fe1 code: Replace distributed<> with sharded<>
The latter is recommended in seastar, and the former was left as
compatibility alias. Latest seastar explicitly marks it as deprecated so
once the submodule is updated, compilation logs will explode.

Most of the patch is generated with

    for f in $(git grep -l '\<distributed<[A-Za-z0-9:_]*>') ; do sed -e 's/\<distributed<\([A-Za-z0-9:_]*\)>/sharded<\1>/g' -i $f; done
    for f in $(git grep -l distributed.hh); do sed -e 's/distributed.hh/sharded.hh/' -i $f ; done

and a small manual change in test/perf/perf.hh

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#26136
2025-09-19 12:22:51 +02:00

303 lines
8.0 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "utils/assert.hh"
#include <boost/signals2/signal_type.hpp>
#include <boost/signals2/dummy_mutex.hpp>
#include "gms/endpoint_state.hh"
#include "locator/types.hh"
#include "gms/inet_address.hh"
#include <seastar/core/thread.hh>
#include <seastar/core/sharded.hh>
#include "utils/log.hh"
namespace gms {
class gossiper;
enum class application_state;
}
namespace locator {
using snitch_signal_t = boost::signals2::signal_type<void (), boost::signals2::keywords::mutex_type<boost::signals2::dummy_mutex>>::type;
using snitch_signal_slot_t = std::function<future<>()>;
using snitch_signal_connection_t = boost::signals2::scoped_connection;
struct snitch_ptr;
typedef gms::inet_address inet_address;
struct snitch_config {
sstring name = "SimpleSnitch";
sstring properties_file_name = "";
unsigned io_cpu_id = 0;
// Gossiping-property-file specific
gms::inet_address listen_address;
gms::inet_address broadcast_address;
// GCE-specific
sstring gce_meta_server_url = "";
};
struct i_endpoint_snitch {
public:
using ptr_type = std::unique_ptr<i_endpoint_snitch>;
static future<> reset_snitch(sharded<snitch_ptr>& snitch, snitch_config cfg);
/**
* returns a String representing the rack local node belongs to
*/
virtual sstring get_rack() const = 0;
/**
* returns a String representing the datacenter local node belongs to
*/
virtual sstring get_datacenter() const = 0;
locator::endpoint_dc_rack get_location() const {
return locator::endpoint_dc_rack{
.dc = get_datacenter(),
.rack = get_rack(),
};
}
virtual std::optional<inet_address> get_public_address() const noexcept { return std::nullopt; }
/**
* returns whatever info snitch wants to gossip
*/
virtual gms::application_state_map get_app_states() const = 0;
virtual ~i_endpoint_snitch() { SCYLLA_ASSERT(_state == snitch_state::stopped); };
// noop by default
virtual future<> stop() {
_state = snitch_state::stopped;
return make_ready_future<>();
}
// noop by default
virtual future<> pause_io() {
_state = snitch_state::io_paused;
return make_ready_future<>();
};
// noop by default
virtual void resume_io() {
_state = snitch_state::running;
};
// noop by default
virtual future<> start() {
_state = snitch_state::running;
return make_ready_future<>();
}
// noop by default
virtual void set_my_dc_and_rack(const sstring& new_dc, const sstring& enw_rack) {};
virtual void set_prefer_local(bool prefer_local) {};
virtual void set_local_private_addr(const sstring& addr_str) {};
void set_snitch_ready() {
_state = snitch_state::running;
}
virtual sstring get_name() const = 0;
// should be called for production snitches before calling start()
virtual void set_backreference(snitch_ptr& d) {
//noop by default
}
virtual future<> reload_gossiper_state() {
// noop by default
return make_ready_future<>();
}
virtual snitch_signal_connection_t when_reconfigured(snitch_signal_slot_t& slot) {
// no updates by default
return snitch_signal_connection_t();
}
// tells whether the INTERNAL_IP address should be preferred over endpoint address
virtual bool prefer_local() const noexcept {
return false;
}
static logging::logger& logger() {
static logging::logger snitch_logger("snitch_logger");
return snitch_logger;
}
protected:
static unsigned& io_cpu_id() {
static unsigned id = 0;
return id;
}
protected:
enum class snitch_state {
initializing,
running,
io_pausing,
io_paused,
stopping,
stopped
} _state = snitch_state::initializing;
};
struct snitch_ptr : public peering_sharded_service<snitch_ptr> {
using ptr_type = i_endpoint_snitch::ptr_type;
future<> stop() {
if (_ptr) {
return _ptr->stop();
} else {
return make_ready_future<>();
}
}
future<> start() {
if (_ptr) {
return _ptr->start();
} else {
return make_ready_future<>();
}
}
i_endpoint_snitch* operator->() {
return _ptr.get();
}
const i_endpoint_snitch* operator->() const {
return _ptr.get();
}
snitch_ptr& operator=(ptr_type&& new_val) {
_ptr = std::move(new_val);
return *this;
}
snitch_ptr& operator=(snitch_ptr&& new_val) {
_ptr = std::move(new_val._ptr);
return *this;
}
operator bool() const {
return _ptr ? true : false;
}
snitch_ptr(const snitch_config cfg);
private:
ptr_type _ptr;
};
/**
* Resets the global snitch instance with the new value
*
* @param snitch_name Name of a new snitch
* @param A optional parameters for a new snitch constructor
*
* @return ready future when the transition is complete
*
* The flow goes as follows:
* 1) Create a new sharded<snitch_ptr> and initialize it with the new
* snitch.
* 2) Start the new snitches above - this will initialize the snitches objects
* and will make them ready to be used.
* 3) Stop() the current global per-shard snitch objects.
* 4) Pause the per-shard snitch objects from (1) - this will stop the async
* I/O parts of the snitches if any.
* 5) Assign the per-shard snitch_ptr's from new distributed from (1) to the
* global one and update the sharded<> pointer in the new snitch
* instances.
* 6) Start the new snitches.
* 7) Stop() the temporary sharded<snitch_ptr> from (1).
*/
inline future<> i_endpoint_snitch::reset_snitch(sharded<snitch_ptr>& snitch, snitch_config cfg) {
return seastar::async([cfg = std::move(cfg), &snitch] {
// (1) create a new snitch
sharded<snitch_ptr> tmp_snitch;
try {
tmp_snitch.start(cfg).get();
// (2) start the local instances of the new snitch
tmp_snitch.invoke_on_all([] (snitch_ptr& local_inst) {
return local_inst.start();
}).get();
} catch (...) {
tmp_snitch.stop().get();
throw;
}
// If we've got here then we may not fail
// (3) stop the current snitch instances on all CPUs
snitch.invoke_on_all([] (snitch_ptr& s) {
return s->stop();
}).get();
//
// (4) If we've got here - the new snitch has been successfully created
// and initialized. We may pause its I/O it now and start moving
// pointers...
//
tmp_snitch.invoke_on_all([] (snitch_ptr& local_inst) {
return local_inst->pause_io();
}).get();
//
// (5) move the pointers - this would ensure the atomicity on a
// per-shard level (since users are holding snitch_ptr objects only)
//
tmp_snitch.invoke_on_all([&snitch] (snitch_ptr& local_inst) {
local_inst->set_backreference(snitch.local());
snitch.local() = std::move(local_inst);
return make_ready_future<>();
}).get();
// (6) re-start I/O on the new snitches
snitch.invoke_on_all([] (snitch_ptr& local_inst) {
local_inst->resume_io();
}).get();
// (7) stop the temporary from (1)
tmp_snitch.stop().get();
});
}
class snitch_base : public i_endpoint_snitch {
public:
snitch_base(const snitch_config& cfg) : _cfg(cfg) {}
//
// Sons have to implement:
// virtual sstring get_rack() = 0;
// virtual sstring get_datacenter() = 0;
//
virtual gms::application_state_map get_app_states() const override;
protected:
sstring _my_dc;
sstring _my_rack;
snitch_config _cfg;
};
} // namespace locator