Files
scylladb/db/hints/manager.hh
Dawid Mędrek 77a934e5b9 db/hints: Prevent draining hints before hint replay is allowed
Context
-------
The procedure of hint draining boils down to the following steps:

1. Drain a hint sender. That should get rid of all hints stored
   for the corresponding endpoint.
2. Remove the hint directory corresponding to that endpoint.

Obviously, it gets more complex than this high-level perspective.
Without blurring the view, the relevant information is that step 1
in the algorithm above may not be executed.

Breaking it down, it comprises of two calls to
`hint_sender::send_hints_maybe()`. The function is responsible for
sending out hints, but it's not unconditional and will not be performed
if any of the following bullets is not satisfied:

* `hint_sender::replay_allowed()` is not `true`. This can happen when
  hint replay hasn't been turned on yet.
* `hint_sender::can_send()` is not `true`. This can happen if the
  corresponding endpoint is not alive AND it hasn't left the cluster
  AND it's still a normal token owner.

There is one more relevant point: sending hints can be stopped if
replaying hints fails and `hint_sender::send_hints_maybe()` returns
`false`. However, that's not not possible in the case of draining.
In that case, if Scylla comes across any failure, it'll simply delete
the corresponding hint segment. Because of that, we ignore it and
only focus on the two bullets.

---

Why is it a problem?
--------------------
If a hint directory is not purged of all hint segments in it,
any attempt to remove it will fail and we'll observe an error like this:

```
Exception when draining <host ID>: std::filesystem::__cxx11::filesystem_error
(error system:39, filesystem error: remove failed: Directory not empty [<path>])
```

The folder with the remaining hints will also stay on disk, which is, of
course, undesired.

---

When can it happen?
-------------------
As highlighted in the Context section of this commit message, the
key part of the code that can lead to a dangerous situation like that
is `hint_sender::send_hints_maybe()`. The function is called twice when
draining a hint endpoint manager: once to purge all of the existing
hints, and another time after flushing all hints stored in a commitlog
instances, but not listed by `hint_sender` yet. If any of those calls
misbehaves, we may end up with a problem. That's why it's crucial to
ensure that the function always goes through ALL of the hints.

Dangerous situations:

1. We try to drain hints before hint replay is allowed. That will
   violate the first bullet above.
2. The node we're draining is dead, but it hasn't left the cluster,
   and it still possesses some tokens.

---

How do we solve that?
---------------------
Hint replay is turned on in `main.cc`. Once enabled, it cannot be
disabled. So to address the first bullet above, it suffices to ensure
that no draining occurs beforehand. It's perfectly fine to prevent it.
Soon after hint replay is allowed, `main.cc` also asks the hint manager
to drain all of the endpoint managers whose endpoints are no longer
normal token owners (cf. `db::hints::manager::drain_left_nodes()`).

The other bullet is more tricky. It's important here to know that
draining only initiated in three situations:

1. As part of the call to `storage_service::notify_left()`.
2. As part of the call to `storage_service::notify_released()`.
3. As part of the call to `db::hints::manager::drain_left_nodes()`.

The last one is trivially non-problematic. The nodes that it'll try to
drain are no longer normal token owners, so `can_send()` must always
return `true`.

The second situation is similar. As we read in the commit message of
scylladb/scylladb@eb92f50413, which
introduced the notion of released nodes, the nodes are no longer
normal token owners:

> In this patch we postpone the hint draining for the "left" nodes to
> the time when we know that the target nodes no longer hold ownership
> of any tokens - so they're no longer referenced in topology. I'm
> calling such nodes "released".

I suggest reading the full commit message there because the problems
there are somewhat similar these changes try to solve.

Finally, the first situation: unfortunately, it's more tricky. The same
commit message says:

> When a node is being replaced, it enters a "left" state while still
> owning tokens. Before this patch, this is also the time when we start
> draining hints targeted to this node, so the hints may get sent before
> the token ownership gets migrated to another replica, and these hints
> may get lost.

This suggests that `storage_service::notify_left()` may be called when
the corresponding node still has some tokens! That's something that may
prevent properly draining hints.

Fortunately, no hope is lost. We only drain hints via `notify_left()`
when hinted handoff hasn't been upgraded to being host-ID-based yet.
If it has, draining always happens via `notify_released()`.

When I write this commit message, all of the supported versions of
Scylla 2025.1+ use host-ID-based hinted handoff. That means that
problems can only arise when upgrading from an older version of Scylla
(2024.1 downwards). Because of that, we don't cover it. It would most
likely require more extensive changes.

---

Non-issues
----------
There are notions that are closely related to sending hints. One of them
is the host filter that hinted handoff uses. It decides which endpoints
are eligible for receiving hints, and which are not. Fortunately, all
endpoints rejected by the host filter lose their hint endpoint managers
-- they're stopped as part of that procedure. What's more, draining
hints and changing the host filter cannot be happening at the same time,
so it cannot lead to any problems.

The solution
------------
To solve the described issue, we simply prevent draining hints before
hint replay is allowed. No reproducer test is attached because it's not
feasible to write one.

Fixes scylladb/scylladb#27693

Closes scylladb/scylladb#27713
2026-01-04 16:54:05 +02:00

398 lines
15 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
// Seastar features.
#include "utils/assert.hh"
#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_mutex.hh>
#include <seastar/util/noncopyable_function.hh>
// Scylla includes.
#include "db/commitlog/commitlog.hh"
#include "db/hints/internal/common.hh"
#include "db/hints/internal/hint_storage.hh"
#include "db/hints/internal/hint_endpoint_manager.hh"
#include "db/hints/resource_manager.hh"
#include "db/hints/host_filter.hh"
#include "db/hints/sync_point.hh"
#include "gms/inet_address.hh"
#include "locator/abstract_replication_strategy.hh"
// STD.
#include <chrono>
#include <span>
#include <unordered_map>
namespace utils {
class directories;
} // namespace utils
namespace gms {
class gossiper;
} // namespace gms
namespace db::hints {
/// A helper class which tracks hints directory creation
/// and allows to perform hints directory initialization lazily.
class directory_initializer {
private:
class impl;
::std::shared_ptr<impl> _impl;
directory_initializer(::std::shared_ptr<impl> impl);
public:
/// Creates an initializer that does nothing. Useful in tests.
static directory_initializer make_dummy() noexcept {
return {nullptr};
}
static future<directory_initializer> make(utils::directories& dirs, sstring hints_directory);
future<> ensure_created_and_verified();
future<> ensure_rebalanced();
};
class manager {
public:
using endpoint_id = internal::endpoint_id;
private:
using hint_stats = internal::hint_stats;
using drain = internal::drain;
friend class internal::hint_endpoint_manager;
friend class internal::hint_sender;
using hint_endpoint_manager = internal::hint_endpoint_manager;
using node_to_hint_store_factory_type = internal::node_to_hint_store_factory_type;
using hint_directory_manager = internal::hint_directory_manager;
enum class state {
started, // Hinting is currently allowed (start() has completed).
migrating, // The hint manager is being migrated from using IPs to name
// hint directories to using host IDs for that purpose. No new
// incoming hints will be accepted as long as this is the state.
replay_allowed, // Replaying (sending) hints is allowed.
draining_all, // Accepting new hints is not allowed. All endpoint managers
// are being drained because the node is leaving the cluster.
stopping // Accepting new hints is not allowed. Stopping this manager
// is in progress (stop() has been called).
};
using state_set = enum_set<super_enum<state,
state::started,
state::migrating,
state::replay_allowed,
state::draining_all,
state::stopping>>;
public:
static inline const std::string FILENAME_PREFIX{"HintsLog" + commitlog::descriptor::SEPARATOR};
// Non-const - can be modified with an error injection.
static inline std::chrono::seconds hints_flush_period = std::chrono::seconds(10);
private:
static constexpr uint64_t MAX_SIZE_OF_HINTS_IN_PROGRESS = 10 * 1024 * 1024; // 10MB
private:
state_set _state;
const fs::path _hints_dir;
dev_t _hints_dir_device_id = 0;
node_to_hint_store_factory_type _store_factory;
host_filter _host_filter;
service::storage_proxy& _proxy;
shared_ptr<const gms::gossiper> _gossiper_anchor;
int64_t _max_hint_window_us = 0;
replica::database& _local_db;
seastar::named_gate _draining_eps_gate; // gate used to control the progress of endpoint_managers stopping not in the context of manager::stop() call
resource_manager& _resource_manager;
std::unordered_map<endpoint_id, hint_endpoint_manager> _ep_managers;
// This is ONLY used when `_uses_host_id` is false. Otherwise, this map should stay EMPTY.
//
// Invariants:
// (1) there is an endpoint manager in `_ep_managers` identified by host ID `H` if an only if
// there is a mapping corresponding to `H` in `_hint_directory_manager`,
// (2) a hint directory representing an IP address `I` is managed by an endpoint manager
// if and only if there is a mapping corresponding to `I` in `_hint_directory_manager`.
hint_directory_manager _hint_directory_manager;
hint_stats _stats;
seastar::metrics::metric_groups _metrics;
// We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will
// still represent IP addresses. But after the migration, they will start representing host IDs.
// We need to handle either case.
//
// It's especially important when dealing with the scenario when there is an IP directory, but there is
// no mapping for in locator::token_metadata. Since we sometimes have to save a directory like that
// in this set as well, this variant is necessary.
std::unordered_set<std::variant<locator::host_id, gms::inet_address>> _eps_with_pending_hints;
seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}};
bool _uses_host_id = false;
std::any _migration_callback = std::nullopt;
future<> _migrating_done = make_ready_future();
// Unique lock if and only if there is an ongoing migration to the host-ID-based hinted handoff.
// Shared lock if and only if there is a fiber already executing `manager::wait_for_sync_point`.
seastar::shared_mutex _migration_mutex{};
public:
manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter,
int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db);
manager(const manager&) = delete;
manager& operator=(const manager&) = delete;
manager(manager&&) = delete;
manager& operator=(manager&&) = delete;
~manager() noexcept {
SCYLLA_ASSERT(_ep_managers.empty());
}
public:
void register_metrics(const sstring& group_name);
future<> start(shared_ptr<const gms::gossiper> gossiper_ptr);
future<> stop();
bool store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
tracing::trace_state_ptr tr_state) noexcept;
/// \brief Changes the host_filter currently used, stopping and starting endpoint_managers relevant to the new host_filter.
/// \param filter the new host_filter
/// \return A future that resolves when the operation is complete.
future<> change_host_filter(host_filter filter);
const host_filter& get_host_filter() const noexcept {
return _host_filter;
}
/// \brief Check if a hint may be generated to the give end point
/// \param ep end point to check
/// \return true if we should generate the hint to the given end point if it becomes unavailable
bool can_hint_for(endpoint_id ep) const noexcept;
/// \brief Check if there aren't too many in-flight hints
///
/// This function checks if there are too many "in-flight" hints on the current shard - hints that are being stored
/// and which storing is not complete yet. This is meant to stabilize the memory consumption of the hints storing path
/// which is initialed from the storage_proxy WRITE flow. storage_proxy is going to check this condition and if it
/// returns TRUE it won't attempt any new WRITEs thus eliminating the possibility of new hints generation. If new hints
/// are not generated the amount of in-flight hints amount and thus the memory they are consuming is going to drop eventually
/// because the hints are going to be either stored or dropped. After that the things are going to get back to normal again.
///
/// Note that we can't consider the disk usage consumption here because the disk usage is not promised to drop down shortly
/// because it requires the remote node to be UP.
///
/// \param ep end point to check
/// \return TRUE if we are allowed to generate hint to the given end point but there are too many in-flight hints
bool too_many_in_flight_hints_for(endpoint_id ep) const noexcept;
/// \brief Check if DC \param ep belongs to is "hintable"
/// \param ep End point identificator
/// \return TRUE if hints are allowed to be generated to \param ep.
bool check_dc_for(endpoint_id ep) const noexcept;
/// Execute a given functor while having an endpoint's file update mutex locked.
///
/// The caller must ensure that the passed endpoint_id is valid, i.e. this manager instance
/// really manages an endpoint manager corresponding to it. See @ref have_ep_manager.
///
/// \param ep endpoint whose file update mutex should be locked
/// \param func functor to be executed
future<> with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
noncopyable_function<future<> ()> func);
/// \brief Checks if hints are disabled for all endpoints
/// \return TRUE if hints are disabled.
bool is_disabled_for_all() const noexcept {
return _host_filter.is_disabled_for_all();
}
/// \return Size of mutations of hints in-flight (to the disk) at the moment.
uint64_t size_of_hints_in_progress() const noexcept {
return _stats.size_of_hints_in_progress;
}
/// \brief Get the number of in-flight (to the disk) hints to a given end point.
/// \param ep End point identificator
/// \return Number of hints in-flight to \param ep.
uint64_t hints_in_progress_for(endpoint_id ep) const noexcept {
auto it = _ep_managers.find(ep);
if (it == _ep_managers.end()) {
return 0;
}
return it->second.hints_in_progress();
}
void add_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) {
_eps_with_pending_hints.insert(key);
}
void clear_eps_with_pending_hints() {
_eps_with_pending_hints.clear();
_eps_with_pending_hints.reserve(_ep_managers.size());
}
bool has_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) const {
return _eps_with_pending_hints.contains(key);
}
size_t ep_managers_size() const {
return _ep_managers.size();
}
const fs::path& hints_dir() const {
return _hints_dir;
}
dev_t hints_dir_device_id() const {
return _hints_dir_device_id;
}
void allow_hints();
void forbid_hints();
void forbid_hints_for_eps_with_pending_hints();
void allow_replaying() noexcept {
_state.set(state::replay_allowed);
}
/// \brief Returns a set of replay positions for hint queues towards endpoints from the `target_eps`.
///
/// \param target_eps The list of endpoints the sync point should correspond to. When empty, the function assumes all endpoints.
/// \return Sync point corresponding to the specified endpoints.
sync_point::shard_rps calculate_current_sync_point(std::span<const locator::host_id> target_eps) const;
/// \brief Waits until hint replay reach replay positions described in `rps`.
future<> wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps);
private:
future<> compute_hints_dir_device_id();
node_to_hint_store_factory_type& store_factory() noexcept {
return _store_factory;
}
service::storage_proxy& local_storage_proxy() const noexcept {
return _proxy;
}
const gms::gossiper& local_gossiper() const noexcept {
return *_gossiper_anchor;
}
replica::database& local_db() noexcept {
return _local_db;
}
hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip);
uint64_t max_size_of_hints_in_progress() const noexcept;
public:
bool have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept;
public:
/// \brief Initiate the draining when we detect that the node has left the cluster.
///
/// If the node that has left is the current node - drains all pending hints to all nodes.
/// Otherwise drains hints to the node that has left.
///
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
/// corresponding hint_endpoint_manager objects.
///
/// Preconditions:
/// * Hint replay must be allowed (i.e. `replay_allowed()` must be true) throughout
/// the execution of this function.
///
/// \param host_id host ID of the node that left the cluster
/// \param ip the IP of the node that left the cluster
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
void update_backlog(size_t backlog, size_t max_backlog);
bool uses_host_id() const noexcept {
return _uses_host_id;
}
private:
bool stopping() const noexcept {
return _state.contains(state::stopping);
}
void set_stopping() noexcept {
_state.set(state::stopping);
}
public:
bool started() const noexcept {
return _state.contains(state::started);
}
bool replay_allowed() const noexcept {
return _state.contains(state::replay_allowed);
}
private:
void set_started() noexcept {
_state.set(state::started);
}
void set_draining_all() noexcept {
_state.set(state::draining_all);
}
bool draining_all() noexcept {
return _state.contains(state::draining_all);
}
/// Iterates over existing hint directories and for each, if the corresponding endpoint is present
/// in locator::topology, creates an endpoint manager.
future<> initialize_endpoint_managers();
/// Renames host directories named after IPs to host IDs.
///
/// In the past, hosts were identified by their IPs. Now we use host IDs for that purpose,
/// but we want to ensure that old hints don't get lost if possible. This function serves
/// this purpose. It's only necessary when upgrading Scylla.
///
/// This function should ONLY be called by `manager::start()` and `manager::perform_migration()`.
///
/// Calling this function again while the previous call has not yet finished
/// is undefined behavior.
future<> migrate_ip_directories();
/// Migrates this hint manager to using host IDs, i.e. when a call to this function ends,
/// the names of hint directories will start being represented by host IDs instead of IPs.
///
/// This function suspends hinted handoff throughout its execution. Among other consequences,
/// ALL requested sync points will be canceled, i.e. an exception will be issued
/// in the corresponding futures.
future<> perform_migration();
public:
/// Performs draining for all nodes that have already left the cluster.
/// This should only be called when the hint endpoint managers have been initialized
/// and the hint manager has started.
future<> drain_left_nodes();
};
} // namespace db::hints