394 lines
15 KiB
C++
394 lines
15 KiB
C++
/*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2017-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
// Seastar features.
|
|
#include "utils/assert.hh"
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/core/shared_mutex.hh>
|
|
#include <seastar/util/noncopyable_function.hh>
|
|
|
|
// Scylla includes.
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "db/hints/internal/common.hh"
|
|
#include "db/hints/internal/hint_storage.hh"
|
|
#include "db/hints/internal/hint_endpoint_manager.hh"
|
|
#include "db/hints/resource_manager.hh"
|
|
#include "db/hints/host_filter.hh"
|
|
#include "db/hints/sync_point.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
|
|
// STD.
|
|
#include <chrono>
|
|
#include <span>
|
|
#include <unordered_map>
|
|
|
|
namespace utils {
|
|
class directories;
|
|
} // namespace utils
|
|
|
|
namespace gms {
|
|
class gossiper;
|
|
} // namespace gms
|
|
|
|
namespace db::hints {
|
|
|
|
/// A helper class which tracks hints directory creation
|
|
/// and allows to perform hints directory initialization lazily.
|
|
class directory_initializer {
|
|
private:
|
|
class impl;
|
|
::std::shared_ptr<impl> _impl;
|
|
|
|
directory_initializer(::std::shared_ptr<impl> impl);
|
|
|
|
public:
|
|
/// Creates an initializer that does nothing. Useful in tests.
|
|
static directory_initializer make_dummy() noexcept {
|
|
return {nullptr};
|
|
}
|
|
static future<directory_initializer> make(utils::directories& dirs, sstring hints_directory);
|
|
|
|
future<> ensure_created_and_verified();
|
|
future<> ensure_rebalanced();
|
|
};
|
|
|
|
class manager {
|
|
public:
|
|
using endpoint_id = internal::endpoint_id;
|
|
private:
|
|
using hint_stats = internal::hint_stats;
|
|
using drain = internal::drain;
|
|
|
|
friend class internal::hint_endpoint_manager;
|
|
friend class internal::hint_sender;
|
|
|
|
using hint_endpoint_manager = internal::hint_endpoint_manager;
|
|
using node_to_hint_store_factory_type = internal::node_to_hint_store_factory_type;
|
|
|
|
using hint_directory_manager = internal::hint_directory_manager;
|
|
|
|
enum class state {
|
|
started, // Hinting is currently allowed (start() has completed).
|
|
migrating, // The hint manager is being migrated from using IPs to name
|
|
// hint directories to using host IDs for that purpose. No new
|
|
// incoming hints will be accepted as long as this is the state.
|
|
replay_allowed, // Replaying (sending) hints is allowed.
|
|
draining_all, // Accepting new hints is not allowed. All endpoint managers
|
|
// are being drained because the node is leaving the cluster.
|
|
stopping // Accepting new hints is not allowed. Stopping this manager
|
|
// is in progress (stop() has been called).
|
|
};
|
|
|
|
using state_set = enum_set<super_enum<state,
|
|
state::started,
|
|
state::migrating,
|
|
state::replay_allowed,
|
|
state::draining_all,
|
|
state::stopping>>;
|
|
|
|
public:
|
|
static inline const std::string FILENAME_PREFIX{"HintsLog" + commitlog::descriptor::SEPARATOR};
|
|
// Non-const - can be modified with an error injection.
|
|
static inline std::chrono::seconds hints_flush_period = std::chrono::seconds(10);
|
|
private:
|
|
static constexpr uint64_t MAX_SIZE_OF_HINTS_IN_PROGRESS = 10 * 1024 * 1024; // 10MB
|
|
|
|
private:
|
|
state_set _state;
|
|
const fs::path _hints_dir;
|
|
dev_t _hints_dir_device_id = 0;
|
|
|
|
node_to_hint_store_factory_type _store_factory;
|
|
host_filter _host_filter;
|
|
service::storage_proxy& _proxy;
|
|
shared_ptr<const gms::gossiper> _gossiper_anchor;
|
|
int64_t _max_hint_window_us = 0;
|
|
replica::database& _local_db;
|
|
|
|
seastar::named_gate _draining_eps_gate; // gate used to control the progress of endpoint_managers stopping not in the context of manager::stop() call
|
|
|
|
resource_manager& _resource_manager;
|
|
|
|
std::unordered_map<endpoint_id, hint_endpoint_manager> _ep_managers;
|
|
|
|
// This is ONLY used when `_uses_host_id` is false. Otherwise, this map should stay EMPTY.
|
|
//
|
|
// Invariants:
|
|
// (1) there is an endpoint manager in `_ep_managers` identified by host ID `H` if an only if
|
|
// there is a mapping corresponding to `H` in `_hint_directory_manager`,
|
|
// (2) a hint directory representing an IP address `I` is managed by an endpoint manager
|
|
// if and only if there is a mapping corresponding to `I` in `_hint_directory_manager`.
|
|
hint_directory_manager _hint_directory_manager;
|
|
|
|
hint_stats _stats;
|
|
seastar::metrics::metric_groups _metrics;
|
|
|
|
// We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will
|
|
// still represent IP addresses. But after the migration, they will start representing host IDs.
|
|
// We need to handle either case.
|
|
//
|
|
// It's especially important when dealing with the scenario when there is an IP directory, but there is
|
|
// no mapping for in locator::token_metadata. Since we sometimes have to save a directory like that
|
|
// in this set as well, this variant is necessary.
|
|
std::unordered_set<std::variant<locator::host_id, gms::inet_address>> _eps_with_pending_hints;
|
|
|
|
seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}};
|
|
|
|
bool _uses_host_id = false;
|
|
std::any _migration_callback = std::nullopt;
|
|
future<> _migrating_done = make_ready_future();
|
|
|
|
// Unique lock if and only if there is an ongoing migration to the host-ID-based hinted handoff.
|
|
// Shared lock if and only if there is a fiber already executing `manager::wait_for_sync_point`.
|
|
seastar::shared_mutex _migration_mutex{};
|
|
|
|
public:
|
|
manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter,
|
|
int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db);
|
|
|
|
manager(const manager&) = delete;
|
|
manager& operator=(const manager&) = delete;
|
|
|
|
manager(manager&&) = delete;
|
|
manager& operator=(manager&&) = delete;
|
|
|
|
~manager() noexcept {
|
|
SCYLLA_ASSERT(_ep_managers.empty());
|
|
}
|
|
|
|
public:
|
|
void register_metrics(const sstring& group_name);
|
|
future<> start(shared_ptr<const gms::gossiper> gossiper_ptr);
|
|
future<> stop();
|
|
bool store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
|
tracing::trace_state_ptr tr_state) noexcept;
|
|
|
|
/// \brief Changes the host_filter currently used, stopping and starting endpoint_managers relevant to the new host_filter.
|
|
/// \param filter the new host_filter
|
|
/// \return A future that resolves when the operation is complete.
|
|
future<> change_host_filter(host_filter filter);
|
|
|
|
const host_filter& get_host_filter() const noexcept {
|
|
return _host_filter;
|
|
}
|
|
|
|
/// \brief Check if a hint may be generated to the give end point
|
|
/// \param ep end point to check
|
|
/// \return true if we should generate the hint to the given end point if it becomes unavailable
|
|
bool can_hint_for(endpoint_id ep) const noexcept;
|
|
|
|
/// \brief Check if there aren't too many in-flight hints
|
|
///
|
|
/// This function checks if there are too many "in-flight" hints on the current shard - hints that are being stored
|
|
/// and which storing is not complete yet. This is meant to stabilize the memory consumption of the hints storing path
|
|
/// which is initialed from the storage_proxy WRITE flow. storage_proxy is going to check this condition and if it
|
|
/// returns TRUE it won't attempt any new WRITEs thus eliminating the possibility of new hints generation. If new hints
|
|
/// are not generated the amount of in-flight hints amount and thus the memory they are consuming is going to drop eventually
|
|
/// because the hints are going to be either stored or dropped. After that the things are going to get back to normal again.
|
|
///
|
|
/// Note that we can't consider the disk usage consumption here because the disk usage is not promised to drop down shortly
|
|
/// because it requires the remote node to be UP.
|
|
///
|
|
/// \param ep end point to check
|
|
/// \return TRUE if we are allowed to generate hint to the given end point but there are too many in-flight hints
|
|
bool too_many_in_flight_hints_for(endpoint_id ep) const noexcept;
|
|
|
|
/// \brief Check if DC \param ep belongs to is "hintable"
|
|
/// \param ep End point identificator
|
|
/// \return TRUE if hints are allowed to be generated to \param ep.
|
|
bool check_dc_for(endpoint_id ep) const noexcept;
|
|
|
|
/// Execute a given functor while having an endpoint's file update mutex locked.
|
|
///
|
|
/// The caller must ensure that the passed endpoint_id is valid, i.e. this manager instance
|
|
/// really manages an endpoint manager corresponding to it. See @ref have_ep_manager.
|
|
///
|
|
/// \param ep endpoint whose file update mutex should be locked
|
|
/// \param func functor to be executed
|
|
future<> with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
|
|
noncopyable_function<future<> ()> func);
|
|
|
|
/// \brief Checks if hints are disabled for all endpoints
|
|
/// \return TRUE if hints are disabled.
|
|
bool is_disabled_for_all() const noexcept {
|
|
return _host_filter.is_disabled_for_all();
|
|
}
|
|
|
|
/// \return Size of mutations of hints in-flight (to the disk) at the moment.
|
|
uint64_t size_of_hints_in_progress() const noexcept {
|
|
return _stats.size_of_hints_in_progress;
|
|
}
|
|
|
|
/// \brief Get the number of in-flight (to the disk) hints to a given end point.
|
|
/// \param ep End point identificator
|
|
/// \return Number of hints in-flight to \param ep.
|
|
uint64_t hints_in_progress_for(endpoint_id ep) const noexcept {
|
|
auto it = _ep_managers.find(ep);
|
|
if (it == _ep_managers.end()) {
|
|
return 0;
|
|
}
|
|
return it->second.hints_in_progress();
|
|
}
|
|
|
|
void add_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) {
|
|
_eps_with_pending_hints.insert(key);
|
|
}
|
|
|
|
void clear_eps_with_pending_hints() {
|
|
_eps_with_pending_hints.clear();
|
|
_eps_with_pending_hints.reserve(_ep_managers.size());
|
|
}
|
|
|
|
bool has_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) const {
|
|
return _eps_with_pending_hints.contains(key);
|
|
}
|
|
|
|
size_t ep_managers_size() const {
|
|
return _ep_managers.size();
|
|
}
|
|
|
|
const fs::path& hints_dir() const {
|
|
return _hints_dir;
|
|
}
|
|
|
|
dev_t hints_dir_device_id() const {
|
|
return _hints_dir_device_id;
|
|
}
|
|
|
|
void allow_hints();
|
|
void forbid_hints();
|
|
void forbid_hints_for_eps_with_pending_hints();
|
|
|
|
void allow_replaying() noexcept {
|
|
_state.set(state::replay_allowed);
|
|
}
|
|
|
|
/// \brief Returns a set of replay positions for hint queues towards endpoints from the `target_eps`.
|
|
///
|
|
/// \param target_eps The list of endpoints the sync point should correspond to. When empty, the function assumes all endpoints.
|
|
/// \return Sync point corresponding to the specified endpoints.
|
|
sync_point::shard_rps calculate_current_sync_point(std::span<const locator::host_id> target_eps) const;
|
|
|
|
/// \brief Waits until hint replay reach replay positions described in `rps`.
|
|
future<> wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps);
|
|
|
|
private:
|
|
future<> compute_hints_dir_device_id();
|
|
|
|
node_to_hint_store_factory_type& store_factory() noexcept {
|
|
return _store_factory;
|
|
}
|
|
|
|
service::storage_proxy& local_storage_proxy() const noexcept {
|
|
return _proxy;
|
|
}
|
|
|
|
const gms::gossiper& local_gossiper() const noexcept {
|
|
return *_gossiper_anchor;
|
|
}
|
|
|
|
replica::database& local_db() noexcept {
|
|
return _local_db;
|
|
}
|
|
|
|
hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip);
|
|
|
|
uint64_t max_size_of_hints_in_progress() const noexcept;
|
|
|
|
public:
|
|
bool have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept;
|
|
|
|
public:
|
|
/// \brief Initiate the draining when we detect that the node has left the cluster.
|
|
///
|
|
/// If the node that has left is the current node - drains all pending hints to all nodes.
|
|
/// Otherwise drains hints to the node that has left.
|
|
///
|
|
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
|
|
/// corresponding hint_endpoint_manager objects.
|
|
///
|
|
/// \param host_id host ID of the node that left the cluster
|
|
/// \param ip the IP of the node that left the cluster
|
|
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
|
|
|
void update_backlog(size_t backlog, size_t max_backlog);
|
|
|
|
bool uses_host_id() const noexcept {
|
|
return _uses_host_id;
|
|
}
|
|
|
|
private:
|
|
bool stopping() const noexcept {
|
|
return _state.contains(state::stopping);
|
|
}
|
|
|
|
void set_stopping() noexcept {
|
|
_state.set(state::stopping);
|
|
}
|
|
|
|
public:
|
|
bool started() const noexcept {
|
|
return _state.contains(state::started);
|
|
}
|
|
|
|
private:
|
|
void set_started() noexcept {
|
|
_state.set(state::started);
|
|
}
|
|
|
|
bool replay_allowed() const noexcept {
|
|
return _state.contains(state::replay_allowed);
|
|
}
|
|
|
|
void set_draining_all() noexcept {
|
|
_state.set(state::draining_all);
|
|
}
|
|
|
|
bool draining_all() noexcept {
|
|
return _state.contains(state::draining_all);
|
|
}
|
|
|
|
/// Iterates over existing hint directories and for each, if the corresponding endpoint is present
|
|
/// in locator::topology, creates an endpoint manager.
|
|
future<> initialize_endpoint_managers();
|
|
|
|
/// Renames host directories named after IPs to host IDs.
|
|
///
|
|
/// In the past, hosts were identified by their IPs. Now we use host IDs for that purpose,
|
|
/// but we want to ensure that old hints don't get lost if possible. This function serves
|
|
/// this purpose. It's only necessary when upgrading Scylla.
|
|
///
|
|
/// This function should ONLY be called by `manager::start()` and `manager::perform_migration()`.
|
|
///
|
|
/// Calling this function again while the previous call has not yet finished
|
|
/// is undefined behavior.
|
|
future<> migrate_ip_directories();
|
|
|
|
/// Migrates this hint manager to using host IDs, i.e. when a call to this function ends,
|
|
/// the names of hint directories will start being represented by host IDs instead of IPs.
|
|
///
|
|
/// This function suspends hinted handoff throughout its execution. Among other consequences,
|
|
/// ALL requested sync points will be canceled, i.e. an exception will be issued
|
|
/// in the corresponding futures.
|
|
future<> perform_migration();
|
|
|
|
public:
|
|
/// Performs draining for all nodes that have already left the cluster.
|
|
/// This should only be called when the hint endpoint managers have been initialized
|
|
/// and the hint manager has started.
|
|
future<> drain_left_nodes();
|
|
};
|
|
|
|
} // namespace db::hints
|