main: Postpone start-up of hint manager
In this commit, we postpone the start-up of the hint manager until we obtain information about other nodes in the cluster. When we start the hint managers, one of the things that happen is creating endpoint managers -- structures managed by db::hints::manager. Whether we create an instance of endpoint manager depends on the value returned by host_filter::can_hint_for, which, in turn, may depend on the current state of locator::topology. If locator::topology is incomplete, some endpoint managers may not be started even though they should (because the target node IS part of the cluster and we SHOULD send hints to it if there are some). The situation like that can happen because we start the hint managers too early. This commit aims to solve that problem. We only start the hint managers when we've gathered information about the other nodes in the cluster and created the locator::topology using it. Hinted Handoff is not negatively affected by these changes since in between the previous point of starting the hint managers and the current one, all of the mutations performed by service::storage_proxy target the local node, so no hints would need to be generated anyway. Fixes scylladb/scylladb#11870 Closes scylladb/scylladb#16511
This commit is contained in:
committed by
Kamil Braun
parent
c6fd4dffbb
commit
b92fb3537a
@@ -295,10 +295,12 @@ private:
|
||||
_state.set(state::stopping);
|
||||
}
|
||||
|
||||
public:
|
||||
bool started() const noexcept {
|
||||
return _state.contains(state::started);
|
||||
}
|
||||
|
||||
private:
|
||||
void set_started() noexcept {
|
||||
_state.set(state::started);
|
||||
}
|
||||
|
||||
5
main.cc
5
main.cc
@@ -1559,9 +1559,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
view_hints_dir_initializer.ensure_rebalanced().get();
|
||||
|
||||
proxy.invoke_on_all([&lifecycle_notifier, &gossiper] (service::storage_proxy& local_proxy) {
|
||||
proxy.invoke_on_all([&lifecycle_notifier] (service::storage_proxy& local_proxy) {
|
||||
lifecycle_notifier.local().register_subscriber(&local_proxy);
|
||||
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
|
||||
}).get();
|
||||
|
||||
auto drain_proxy = defer_verbose_shutdown("drain storage proxy", [&proxy, &lifecycle_notifier] {
|
||||
@@ -1792,7 +1791,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return ss.local().join_cluster(sys_dist_ks, proxy);
|
||||
return ss.local().join_cluster(sys_dist_ks, proxy, gossiper, service::start_hint_manager::yes);
|
||||
}).get();
|
||||
|
||||
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {
|
||||
|
||||
@@ -3069,6 +3069,8 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
||||
|
||||
auto all = boost::range::join(natural_endpoints, pending_endpoints);
|
||||
|
||||
// If the manager hasn't started yet, no mutation will be performed to another node.
|
||||
// No hint will need to be stored.
|
||||
if (cannot_hint(all, type)) {
|
||||
get_stats().writes_failed_due_to_too_many_in_flight_hints++;
|
||||
// avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
|
||||
@@ -3795,7 +3797,9 @@ mutation storage_proxy::do_get_batchlog_mutation_for(schema_ptr schema, const st
|
||||
template<typename Range>
|
||||
bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const {
|
||||
// if hints are disabled we "can always hint" since there's going to be no hint generated in this case
|
||||
return hints_enabled(type) && boost::algorithm::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1));
|
||||
return hints_enabled(type) &&
|
||||
_hints_manager.started() &&
|
||||
boost::algorithm::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1));
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_to_endpoint(
|
||||
|
||||
@@ -1107,10 +1107,12 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
|
||||
future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
std::chrono::milliseconds delay) {
|
||||
std::chrono::milliseconds delay,
|
||||
start_hint_manager start_hm) {
|
||||
std::unordered_set<token> bootstrap_tokens;
|
||||
gms::application_state_map app_states;
|
||||
/* The timestamp of the CDC streams generation that this node has proposed when joining.
|
||||
@@ -1378,6 +1380,16 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
} ();
|
||||
|
||||
co_await _gossiper.wait_for_gossip_to_settle();
|
||||
|
||||
// This is the moment when the locator::topology has gathered information about other nodes
|
||||
// in the cluster -- either through gossiper, or by loading it from disk -- so it's safe
|
||||
// to start the hint managers.
|
||||
if (start_hm) {
|
||||
co_await proxy.invoke_on_all([&gossiper] (storage_proxy& local_proxy) {
|
||||
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Look at the group 0 upgrade state and use it to decide whether to attach or not
|
||||
if (!_raft_topology_change_enabled) {
|
||||
co_await _feature_service.enable_features_on_join(_gossiper, _sys_ks.local());
|
||||
@@ -2359,7 +2371,8 @@ bool storage_service::is_topology_coordinator_enabled() const {
|
||||
return _raft_topology_change_enabled;
|
||||
}
|
||||
|
||||
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {
|
||||
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper, start_hint_manager start_hm) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
set_mode(mode::STARTING);
|
||||
@@ -2429,7 +2442,8 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
co_return co_await join_token_ring(sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay());
|
||||
co_return co_await join_token_ring(sys_dist_ks, proxy, gossiper, std::move(initial_contact_nodes),
|
||||
std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm);
|
||||
}
|
||||
|
||||
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
|
||||
@@ -102,6 +102,8 @@ enum class disk_error { regular, commit };
|
||||
|
||||
class node_ops_meta_data;
|
||||
|
||||
using start_hint_manager = seastar::bool_class<class start_hint_manager_tag>;
|
||||
|
||||
/**
|
||||
* This abstraction contains the token/identifier of this node
|
||||
* on the identifier space. This token gets gossiped around.
|
||||
@@ -333,7 +335,8 @@ public:
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
|
||||
future<> join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy);
|
||||
future<> join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper_ptr, start_hint_manager start_hm);
|
||||
|
||||
void set_group0(service::raft_group0&, bool raft_topology_change_enabled);
|
||||
|
||||
@@ -354,10 +357,12 @@ private:
|
||||
bool is_first_node();
|
||||
future<> join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
std::chrono::milliseconds);
|
||||
std::chrono::milliseconds,
|
||||
start_hint_manager start_hm);
|
||||
future<> start_sys_dist_ks();
|
||||
public:
|
||||
|
||||
|
||||
@@ -903,7 +903,7 @@ private:
|
||||
});
|
||||
|
||||
try {
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy).get();
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy, _gossiper, service::start_hint_manager::no).get();
|
||||
} catch (std::exception& e) {
|
||||
// if any of the defers crashes too, we'll never see
|
||||
// the error
|
||||
|
||||
Reference in New Issue
Block a user