Files
scylladb/sstables/sstables_manager.hh
Pavel Emelyanov 9cb776dee8 sstables_manager: Drop db::config from sstables_manager
Now it has all it needs via its own specific config.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-11-14 19:31:50 +03:00

308 lines
13 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/metrics.hh>
#include "utils/assert.hh"
#include "utils/disk-error-handler.hh"
#include "db_clock.hh"
#include "sstables/sstables.hh"
#include "sstables/shareable_components.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/version.hh"
#include "db/cache_tracker.hh"
#include "db/object_storage_endpoint_param.hh"
#include "locator/host_id.hh"
#include "reader_concurrency_semaphore.hh"
#include "utils/s3/creds.hh"
#include <boost/intrusive/list.hpp>
#include "sstable_compressor_factory.hh"
#include "sstables/sstables_manager_subscription.hh"
namespace db {
class large_data_handler;
class corrupt_data_handler;
class config;
} // namespace db
namespace s3 { class client; }
namespace gms { class feature_service; }
namespace sstables {
class object_storage_client;
class directory_semaphore;
using schema_ptr = lw_shared_ptr<const schema>;
using shareable_components_ptr = lw_shared_ptr<shareable_components>;
static constexpr size_t default_sstable_buffer_size = 128 * 1024;
class storage_manager : public peering_sharded_service<storage_manager> {
struct config_updater {
serialized_action action;
utils::observer<std::vector<db::object_storage_endpoint_param>> observer;
config_updater(const db::config& cfg, storage_manager&);
};
struct object_storage_endpoint {
db::object_storage_endpoint_param cfg;
shared_ptr<object_storage_client> client;
object_storage_endpoint(db::object_storage_endpoint_param);
};
semaphore _object_storage_clients_memory;
std::unordered_map<sstring, object_storage_endpoint> _object_storage_endpoints;
std::unique_ptr<config_updater> _config_updater;
seastar::metrics::metric_groups metrics;
future<> update_config(const db::config&);
public:
struct config {
size_t object_storage_clients_memory = 16 << 20; // 16M by default
bool skip_metrics_registration = false;
};
storage_manager(const db::config&, config cfg);
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint);
bool is_known_endpoint(sstring endpoint) const;
future<> stop();
std::vector<sstring> endpoints(sstring type = "") const noexcept;
};
class sstables_manager {
using list_type = boost::intrusive::list<sstable,
boost::intrusive::member_hook<sstable, sstable::manager_list_link_type, &sstable::_manager_list_link>,
boost::intrusive::constant_time_size<false>>;
using set_type = boost::intrusive::set<sstable,
boost::intrusive::member_hook<sstable, sstable::manager_set_link_type, &sstable::_manager_set_link>,
boost::intrusive::constant_time_size<false>,
boost::intrusive::compare<sstable::lesser_reclaimed_memory>>;
public:
struct config {
size_t available_memory;
bool enable_sstable_key_validation = false;
bool enable_data_integrity_check = false;
double sstable_summary_ratio = 0.0005;
size_t column_index_size = 64 << 10;
utils::updateable_value<uint32_t> column_index_auto_scale_threshold_in_kb = utils::updateable_value<uint32_t>(10240);
utils::updateable_value<double> memory_reclaim_threshold = utils::updateable_value<double>(0.2);
const std::vector<sstring>& data_file_directories;
utils::updateable_value<sstring> format = utils::updateable_value<sstring>(fmt::to_string(sstable_version_types::me));
};
private:
enum class notification_event_type {
// Note: other event types like "added" may be needed in the future
deleted
};
using signal_type = boost::signals2::signal_type<void (sstables::generation_type, notification_event_type), boost::signals2::keywords::mutex_type<boost::signals2::dummy_mutex>>::type;
storage_manager* _storage;
db::large_data_handler& _large_data_handler;
db::corrupt_data_handler& _corrupt_data_handler;
config _config;
std::vector<sstables::file_io_extension*> _file_io_extensions;
gms::feature_service& _features;
// _active and _undergoing_close are used in scylla-gdb.py to fetch all sstables
// on current shard using "scylla sstables" command. If those fields are renamed,
// update scylla-gdb.py as well.
list_type _active;
list_type _undergoing_close;
// Total reclaimable memory used by components of sstables in _active list
size_t _total_reclaimable_memory{0};
// Total memory reclaimed so far across all sstables
size_t _total_memory_reclaimed{0};
// Set of sstables from which memory has been reclaimed
set_type _reclaimed;
// Condition variable that needs to be notified when an sstable is created or deleted
seastar::condition_variable _components_memory_change_event;
future<> _components_reloader_status = make_ready_future<>();
bool _closing = false;
promise<> _done;
cache_tracker& _cache_tracker;
reader_concurrency_semaphore _sstable_metadata_concurrency_sem;
directory_semaphore& _dir_semaphore;
std::unique_ptr<sstables::sstables_registry> _sstables_registry;
// This function is bound to token_metadata.get_my_id() in the database constructor,
// it can return unset value (bool(host_id) == false) until host_id is loaded
// after system_keyspace initialization.
noncopyable_function<locator::host_id()> _resolve_host_id;
scheduling_group _maintenance_sg;
sstable_compressor_factory& _compressor_factory;
const abort_source& _abort;
named_gate _signal_gate;
signal_type _signal_source;
public:
explicit sstables_manager(
sstring name,
db::large_data_handler& large_data_handler,
db::corrupt_data_handler& corrupt_data_handler,
config cfg,
gms::feature_service& feat,
cache_tracker&,
directory_semaphore& dir_sem,
noncopyable_function<locator::host_id()>&& resolve_host_id,
sstable_compressor_factory&,
const abort_source& abort,
std::vector<file_io_extension*> file_io_extension = {},
scheduling_group maintenance_sg = current_scheduling_group(),
storage_manager* shared = nullptr);
virtual ~sstables_manager();
shared_sstable make_sstable(schema_ptr schema,
const data_dictionary::storage_options& storage,
generation_type generation,
sstable_state state = sstable_state::normal,
sstable_version_types v = get_highest_sstable_version(),
sstable_format_types f = sstable_format_types::big,
db_clock::time_point now = db_clock::now(),
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(),
size_t buffer_size = default_sstable_buffer_size);
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint) const {
SCYLLA_ASSERT(_storage != nullptr);
return _storage->get_endpoint_client(std::move(endpoint));
}
bool is_known_endpoint(sstring endpoint) const {
SCYLLA_ASSERT(_storage != nullptr);
return _storage->is_known_endpoint(std::move(endpoint));
}
virtual sstable_writer_config configure_writer(sstring origin) const;
const config& get_config() const noexcept { return _config; }
cache_tracker& get_cache_tracker() { return _cache_tracker; }
const std::vector<sstables::file_io_extension*>& file_io_extensions() const { return _file_io_extensions; }
// Get the highest supported sstable version, according to cluster features.
sstables::sstable::version_types get_highest_supported_format() const noexcept;
// Get the preferred sstable version for writing new sstables,
// according to cluster features and database config.
//
// 1. The choice must be new enough to support existing data.
// (For example, range tombstones with infinite endpoints were added in version "mc",
// so if the enabled cluster features permit such tombstones to exist, we must
// pick at least "mc").
// 2. The choice must be old enough to be supported by cluster features.
// 3. The choice should respect the config, as long as it doesn't contradict (1) and (2).
// The user might wish to use an older format, and we should respect that if possible.
sstables::sstable::version_types get_preferred_sstable_version() const;
// Like get_sstable_version_for_write(), but additionally assume that
// all features implied by `existing_version` are enabled.
//
// This is used when rewriting (reshaping or resharding) system sstables
// during startup. At this point cluster features aren't known to `feature_service` yet.
// But we must still pick some format compatible with the existing data.
// So use existing sstables to infer the set of enabled features.
sstables::sstable::version_types get_safe_sstable_version_for_rewrites(sstable_version_types existing_version) const;
locator::host_id get_local_host_id() const;
reader_concurrency_semaphore& sstable_metadata_concurrency_sem() noexcept { return _sstable_metadata_concurrency_sem; }
// Wait until all sstables managed by this sstables_manager instance
// (previously created by make_sstable()) have been disposed of:
// - if they were marked for deletion, the files are deleted
// - in any case, the open file handles are closed
// - all memory resources are freed
//
// Note that close() will not complete until all references to all
// sstables have been destroyed.
future<> close();
directory_semaphore& dir_semaphore() noexcept { return _dir_semaphore; }
void plug_sstables_registry(std::unique_ptr<sstables_registry>) noexcept;
void unplug_sstables_registry() noexcept;
// Only for sstable::storage usage
sstables::sstables_registry& sstables_registry() const noexcept {
SCYLLA_ASSERT(_sstables_registry && "sstables_registry is not plugged");
return *_sstables_registry;
}
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage(const schema& s, const data_dictionary::storage_options& so);
future<> destroy_table_storage(const data_dictionary::storage_options& so);
future<> init_keyspace_storage(const data_dictionary::storage_options& so, sstring dir);
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
const abort_source& get_abort_source() const noexcept { return _abort; }
// To be called by the sstable to signal its unlinking
void on_unlink(sstable* sst);
std::vector<std::filesystem::path> get_local_directories(const data_dictionary::storage_options::local& so) const;
sstable_compressor_factory& get_compressor_factory() const { return _compressor_factory; }
// unsubscribe happens automatically when the handler is destroyed
void subscribe(sstables_manager_event_handler& handler);
private:
void add(sstable* sst);
// Transition the sstable to the "inactive" state. It has no
// visible references at this point, and only waits for its
// files to be deleted (if necessary) and closed.
void deactivate(sstable* sst);
void remove(sstable* sst);
void maybe_done();
static constexpr size_t max_count_sstable_metadata_concurrent_reads{10};
// Allow at most 10% of memory to be filled with such reads.
size_t max_memory_sstable_metadata_concurrent_reads(size_t available_memory) { return available_memory * 0.1; }
// Increment the _total_reclaimable_memory with the new SSTable's reclaimable memory
void increment_total_reclaimable_memory(sstable* sst);
// Fiber to reload reclaimed components back into memory when memory becomes available.
future<> components_reclaim_reload_fiber();
// Reclaims components from SSTables if total memory usage exceeds the threshold.
future<> maybe_reclaim_components();
// Reloads components from reclaimed SSTables if memory is available.
future<> maybe_reload_components();
size_t get_components_memory_reclaim_threshold() const;
size_t get_memory_available_for_reclaimable_components() const;
// Reclaim memory from the SSTable and remove it from the memory tracking metrics.
// The method is idempotent and for an sstable that is deleted, it is called both
// during unlink and during deactivation.
void reclaim_memory_and_stop_tracking_sstable(sstable* sst);
private:
db::large_data_handler& get_large_data_handler() const {
return _large_data_handler;
}
db::corrupt_data_handler& get_corrupt_data_handler() const {
return _corrupt_data_handler;
}
friend class sstable;
// Allow testing private methods/variables via test_env_sstables_manager
friend class test_env_sstables_manager;
};
} // namespace sstables