Merge 'Remove gossiper argument from storage_service::join_cluster()' from Pavel Emelyanov
It's only needed to start hints via proxy, but proxy can do it without gossiper argument Closes scylladb/scylladb#19894 * github.com:scylladb/scylladb: storage_service: Remote gossiper argument from join_cluster() proxy: Use remote gossiper to start hints resource manager hints: Const-ify gossiper references and anchor pointers
This commit is contained in:
@@ -158,7 +158,7 @@ const column_mapping& hint_sender::get_column_mapping(lw_shared_ptr<send_one_fil
|
||||
return cm_it->second;
|
||||
}
|
||||
|
||||
hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy,replica::database& local_db, gms::gossiper& local_gossiper) noexcept
|
||||
hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy,replica::database& local_db, const gms::gossiper& local_gossiper) noexcept
|
||||
: _stopped(make_ready_future<>())
|
||||
, _ep_key(parent.end_point_key())
|
||||
, _ep_manager(parent)
|
||||
|
||||
@@ -112,13 +112,13 @@ private:
|
||||
service::storage_proxy& _proxy;
|
||||
replica::database& _db;
|
||||
seastar::scheduling_group _hints_cpu_sched_group;
|
||||
gms::gossiper& _gossiper;
|
||||
const gms::gossiper& _gossiper;
|
||||
seastar::shared_mutex& _file_update_mutex;
|
||||
|
||||
std::multimap<db::replay_position, lw_shared_ptr<std::optional<promise<>>>> _replay_waiters;
|
||||
|
||||
public:
|
||||
hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy, replica::database& local_db, gms::gossiper& local_gossiper) noexcept;
|
||||
hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy, replica::database& local_db, const gms::gossiper& local_gossiper) noexcept;
|
||||
~hint_sender();
|
||||
|
||||
/// \brief A constructor that should be called from the copy/move-constructor of hint_endpoint_manager.
|
||||
|
||||
@@ -196,7 +196,7 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> manager::start(shared_ptr<gms::gossiper> gossiper_ptr) {
|
||||
future<> manager::start(shared_ptr<const gms::gossiper> gossiper_ptr) {
|
||||
_gossiper_anchor = std::move(gossiper_ptr);
|
||||
|
||||
if (_proxy.features().host_id_based_hinted_handoff) {
|
||||
|
||||
@@ -115,7 +115,7 @@ private:
|
||||
node_to_hint_store_factory_type _store_factory;
|
||||
host_filter _host_filter;
|
||||
service::storage_proxy& _proxy;
|
||||
shared_ptr<gms::gossiper> _gossiper_anchor;
|
||||
shared_ptr<const gms::gossiper> _gossiper_anchor;
|
||||
int64_t _max_hint_window_us = 0;
|
||||
replica::database& _local_db;
|
||||
|
||||
@@ -172,7 +172,7 @@ public:
|
||||
|
||||
public:
|
||||
void register_metrics(const sstring& group_name);
|
||||
future<> start(shared_ptr<gms::gossiper> gossiper_ptr);
|
||||
future<> start(shared_ptr<const gms::gossiper> gossiper_ptr);
|
||||
future<> stop();
|
||||
bool store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
tracing::trace_state_ptr tr_state) noexcept;
|
||||
@@ -294,7 +294,7 @@ private:
|
||||
return _proxy;
|
||||
}
|
||||
|
||||
gms::gossiper& local_gossiper() const noexcept {
|
||||
const gms::gossiper& local_gossiper() const noexcept {
|
||||
return *_gossiper_anchor;
|
||||
}
|
||||
|
||||
|
||||
@@ -202,7 +202,7 @@ void space_watchdog::on_timer() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> resource_manager::start(shared_ptr<gms::gossiper> gossiper_ptr) {
|
||||
future<> resource_manager::start(shared_ptr<const gms::gossiper> gossiper_ptr) {
|
||||
_gossiper_ptr = std::move(gossiper_ptr);
|
||||
|
||||
return with_semaphore(_operation_lock, 1, [this] () {
|
||||
|
||||
@@ -130,7 +130,7 @@ class resource_manager {
|
||||
space_watchdog _space_watchdog;
|
||||
|
||||
service::storage_proxy& _proxy;
|
||||
shared_ptr<gms::gossiper> _gossiper_ptr;
|
||||
shared_ptr<const gms::gossiper> _gossiper_ptr;
|
||||
|
||||
enum class state {
|
||||
running,
|
||||
@@ -186,7 +186,7 @@ public:
|
||||
future<semaphore_units<named_semaphore::exception_factory>> get_send_units_for(size_t buf_size);
|
||||
size_t sending_queue_length() const;
|
||||
|
||||
future<> start(shared_ptr<gms::gossiper> gossiper_ptr);
|
||||
future<> start(shared_ptr<const gms::gossiper> gossiper_ptr);
|
||||
future<> stop() noexcept;
|
||||
|
||||
/// \brief Allows replaying hints for managers which are registered now or will be in the future.
|
||||
|
||||
2
main.cc
2
main.cc
@@ -1914,7 +1914,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return ss.local().join_cluster(sys_dist_ks, proxy, gossiper, service::start_hint_manager::yes, generation_number);
|
||||
return ss.local().join_cluster(sys_dist_ks, proxy, service::start_hint_manager::yes, generation_number);
|
||||
}).get();
|
||||
|
||||
supervisor::notify("starting tracing");
|
||||
|
||||
@@ -6542,12 +6542,12 @@ storage_proxy::query_nonsingular_data_locally(schema_ptr s, lw_shared_ptr<query:
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<> storage_proxy::start_hints_manager(shared_ptr<gms::gossiper> g) {
|
||||
future<> storage_proxy::start_hints_manager() {
|
||||
if (!_hints_manager.is_disabled_for_all()) {
|
||||
co_await _hints_resource_manager.register_manager(_hints_manager);
|
||||
}
|
||||
co_await _hints_resource_manager.register_manager(_hints_for_views_manager);
|
||||
co_await _hints_resource_manager.start(std::move(g));
|
||||
co_await _hints_resource_manager.start(remote().gossiper().shared_from_this());
|
||||
}
|
||||
|
||||
void storage_proxy::allow_replaying_hints() noexcept {
|
||||
|
||||
@@ -694,7 +694,7 @@ public:
|
||||
mutation get_batchlog_mutation_for(const std::vector<mutation>& mutations, const utils::UUID& id, int32_t version, db_clock::time_point now);
|
||||
|
||||
future<> stop();
|
||||
future<> start_hints_manager(shared_ptr<gms::gossiper>);
|
||||
future<> start_hints_manager();
|
||||
void allow_replaying_hints() noexcept;
|
||||
future<> abort_view_writes();
|
||||
|
||||
|
||||
@@ -1485,7 +1485,6 @@ future<> storage_service::await_tablets_rebuilt(raft::server_id replaced_id) {
|
||||
|
||||
future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
@@ -1783,8 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
// in the cluster -- either through gossiper, or by loading it from disk -- so it's safe
|
||||
// to start the hint managers.
|
||||
if (start_hm) {
|
||||
co_await proxy.invoke_on_all([&gossiper] (storage_proxy& local_proxy) {
|
||||
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
|
||||
co_await proxy.invoke_on_all([] (storage_proxy& local_proxy) {
|
||||
return local_proxy.start_hints_manager();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2874,7 +2873,7 @@ bool storage_service::is_topology_coordinator_enabled() const {
|
||||
}
|
||||
|
||||
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper, start_hint_manager start_hm, gms::generation_type new_generation) {
|
||||
start_hint_manager start_hm, gms::generation_type new_generation) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
if (_sys_ks.local().was_decommissioned()) {
|
||||
@@ -2989,7 +2988,7 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
}
|
||||
}
|
||||
|
||||
co_return co_await join_token_ring(sys_dist_ks, proxy, gossiper, std::move(initial_contact_nodes),
|
||||
co_return co_await join_token_ring(sys_dist_ks, proxy, std::move(initial_contact_nodes),
|
||||
std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm, new_generation);
|
||||
}
|
||||
|
||||
|
||||
@@ -375,7 +375,7 @@ public:
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
|
||||
future<> join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper_ptr, start_hint_manager start_hm, gms::generation_type new_generation);
|
||||
start_hint_manager start_hm, gms::generation_type new_generation);
|
||||
|
||||
void set_group0(service::raft_group0&);
|
||||
|
||||
@@ -396,7 +396,6 @@ private:
|
||||
bool is_first_node();
|
||||
future<> join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
|
||||
@@ -937,7 +937,7 @@ private:
|
||||
group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get();
|
||||
|
||||
try {
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy, _gossiper, service::start_hint_manager::no, generation_number).get();
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy, service::start_hint_manager::no, generation_number).get();
|
||||
} catch (std::exception& e) {
|
||||
// if any of the defers crashes too, we'll never see
|
||||
// the error
|
||||
|
||||
Reference in New Issue
Block a user