main: Postpone start-up of hint manager

In this commit, we postpone the start-up
of the hint manager until we obtain information
about other nodes in the cluster.

When we start the hint managers, one of the
things that happen is creating endpoint
managers -- structures managed by
db::hints::manager. Whether we create
an instance of endpoint manager depends on
the value returned by host_filter::can_hint_for,
which, in turn, may depend on the current state
of locator::topology.

If locator::topology is incomplete, some endpoint
managers may not be started even though they
should (because the target node IS part of the
cluster and we SHOULD send hints to it if there
are some).

The situation like that can happen because we
start the hint managers too early. This commit
aims to solve that problem. We only start
the hint managers when we've gathered information
about the other nodes in the cluster and created
the locator::topology using it.

Hinted Handoff is not negatively affected by these
changes since in between the previous point of
starting the hint managers and the current one,
all of the mutations performed by
service::storage_proxy target the local node, so
no hints would need to be generated anyway.

Fixes scylladb/scylladb#11870
Closes scylladb/scylladb#16511
This commit is contained in:
Dawid Medrek
2023-12-21 15:35:44 +01:00
committed by Kamil Braun
parent c6fd4dffbb
commit b92fb3537a
6 changed files with 34 additions and 10 deletions

View File

@@ -295,10 +295,12 @@ private:
_state.set(state::stopping);
}
public:
bool started() const noexcept {
return _state.contains(state::started);
}
private:
void set_started() noexcept {
_state.set(state::started);
}

View File

@@ -1559,9 +1559,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
view_hints_dir_initializer.ensure_rebalanced().get();
proxy.invoke_on_all([&lifecycle_notifier, &gossiper] (service::storage_proxy& local_proxy) {
proxy.invoke_on_all([&lifecycle_notifier] (service::storage_proxy& local_proxy) {
lifecycle_notifier.local().register_subscriber(&local_proxy);
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
}).get();
auto drain_proxy = defer_verbose_shutdown("drain storage proxy", [&proxy, &lifecycle_notifier] {
@@ -1792,7 +1791,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
with_scheduling_group(maintenance_scheduling_group, [&] {
return ss.local().join_cluster(sys_dist_ks, proxy);
return ss.local().join_cluster(sys_dist_ks, proxy, gossiper, service::start_hint_manager::yes);
}).get();
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {

View File

@@ -3069,6 +3069,8 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
auto all = boost::range::join(natural_endpoints, pending_endpoints);
// If the manager hasn't started yet, no mutation will be performed to another node.
// No hint will need to be stored.
if (cannot_hint(all, type)) {
get_stats().writes_failed_due_to_too_many_in_flight_hints++;
// avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
@@ -3795,7 +3797,9 @@ mutation storage_proxy::do_get_batchlog_mutation_for(schema_ptr schema, const st
template<typename Range>
bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const {
// if hints are disabled we "can always hint" since there's going to be no hint generated in this case
return hints_enabled(type) && boost::algorithm::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1));
return hints_enabled(type) &&
_hints_manager.started() &&
boost::algorithm::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1));
}
future<> storage_proxy::send_to_endpoint(

View File

@@ -1107,10 +1107,12 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
std::chrono::milliseconds delay) {
std::chrono::milliseconds delay,
start_hint_manager start_hm) {
std::unordered_set<token> bootstrap_tokens;
gms::application_state_map app_states;
/* The timestamp of the CDC streams generation that this node has proposed when joining.
@@ -1378,6 +1380,16 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
} ();
co_await _gossiper.wait_for_gossip_to_settle();
// This is the moment when the locator::topology has gathered information about other nodes
// in the cluster -- either through gossiper, or by loading it from disk -- so it's safe
// to start the hint managers.
if (start_hm) {
co_await proxy.invoke_on_all([&gossiper] (storage_proxy& local_proxy) {
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
});
}
// TODO: Look at the group 0 upgrade state and use it to decide whether to attach or not
if (!_raft_topology_change_enabled) {
co_await _feature_service.enable_features_on_join(_gossiper, _sys_ks.local());
@@ -2359,7 +2371,8 @@ bool storage_service::is_topology_coordinator_enabled() const {
return _raft_topology_change_enabled;
}
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper, start_hint_manager start_hm) {
assert(this_shard_id() == 0);
set_mode(mode::STARTING);
@@ -2429,7 +2442,8 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
co_return co_await join_token_ring(sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay());
co_return co_await join_token_ring(sys_dist_ks, proxy, gossiper, std::move(initial_contact_nodes),
std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm);
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {

View File

@@ -102,6 +102,8 @@ enum class disk_error { regular, commit };
class node_ops_meta_data;
using start_hint_manager = seastar::bool_class<class start_hint_manager_tag>;
/**
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
@@ -333,7 +335,8 @@ public:
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
future<> join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy);
future<> join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper_ptr, start_hint_manager start_hm);
void set_group0(service::raft_group0&, bool raft_topology_change_enabled);
@@ -354,10 +357,12 @@ private:
bool is_first_node();
future<> join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<service::storage_proxy>& proxy,
sharded<gms::gossiper>& gossiper,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
std::chrono::milliseconds);
std::chrono::milliseconds,
start_hint_manager start_hm);
future<> start_sys_dist_ks();
public:

View File

@@ -903,7 +903,7 @@ private:
});
try {
_ss.local().join_cluster(_sys_dist_ks, _proxy).get();
_ss.local().join_cluster(_sys_dist_ks, _proxy, _gossiper, service::start_hint_manager::no).get();
} catch (std::exception& e) {
// if any of the defers crashes too, we'll never see
// the error