diff --git a/configure.py b/configure.py index b98135d7e7..b048d91c92 100755 --- a/configure.py +++ b/configure.py @@ -518,6 +518,7 @@ scylla_core = (['database.cc', 'db/commitlog/commitlog_replayer.cc', 'db/commitlog/commitlog_entry.cc', 'db/hints/manager.cc', + 'db/hints/resource_manager.cc', 'db/config.cc', 'db/extensions.cc', 'db/heat_load_balance.cc', diff --git a/db/hints/manager.cc b/db/hints/manager.cc index bec76f86d0..45722c42aa 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -42,26 +42,26 @@ const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::S const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2); const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10); -const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chrono::seconds(1); -// TODO: remove this when we switch to C++17 -constexpr size_t manager::_max_hints_send_queue_length; -size_t db::hints::manager::max_shard_disk_space_size; +size_t db::hints::resource_manager::max_shard_disk_space_size; -manager::manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, distributed& db) +manager::manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager& res_manager, distributed& db) : _hints_dir(boost::filesystem::path(hints_directory) / format("{:d}", engine().cpu_id()).c_str()) , _hinted_dcs(hinted_dcs.begin(), hinted_dcs.end()) , _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr()) , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) - , _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, _max_hints_send_queue_length)) - , _min_send_hint_budget(_max_send_in_flight_memory / _max_hints_send_queue_length) - , _send_limiter(_max_send_in_flight_memory) - , _space_watchdog(*this) -{ + , _resource_manager(res_manager) +{} + +manager::~manager() { + assert(_ep_managers.empty()); +} + +void manager::register_metrics(const sstring& group_name) { namespace sm = seastar::metrics; - _metrics.add_group("hints_manager", { + _metrics.add_group(group_name, { sm::make_gauge("size_of_hints_in_progress", _stats.size_of_hints_in_progress, sm::description("Size of hinted mutations that are scheduled to be written.")), @@ -79,10 +79,6 @@ manager::manager(sstring hints_directory, std::vector hinted_dcs, int64 }); } -manager::~manager() { - assert(_ep_managers.empty()); -} - future<> manager::start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr) { _proxy_anchor = std::move(proxy_ptr); _gossiper_anchor = std::move(gossiper_ptr); @@ -95,8 +91,6 @@ future<> manager::start(shared_ptr proxy_ptr, shared_ptr return get_ep_manager(ep).populate_segments_to_replay(); }).then([this] { _strorage_service_anchor->register_subscriber(this); - // we are ready to store new hints... - _space_watchdog.start(); }); } @@ -110,24 +104,42 @@ future<> manager::stop() { _stopping = true; return _draining_eps_gate.close().finally([this] { - return when_all( - parallel_for_each(_ep_managers, [] (auto& pair) { + return parallel_for_each(_ep_managers, [] (auto& pair) { return pair.second.stop(); - }), - _space_watchdog.stop() - ).finally([this] { + }).finally([this] { _ep_managers.clear(); manager_logger.info("Stopped"); }).discard_result(); }); } +void manager::allow_hints() { + boost::for_each(_ep_managers, [] (auto& pair) { pair.second.allow_hints(); }); +} + +void manager::forbid_hints() { + boost::for_each(_ep_managers, [] (auto& pair) { pair.second.forbid_hints(); }); +} + +void manager::forbid_hints_for_eps_with_pending_hints() { + manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); + boost::for_each(_ep_managers, [this] (auto& pair) { + end_point_hints_manager& ep_man = pair.second; + if (has_ep_with_pending_hints(ep_man.end_point_key())) { + ep_man.forbid_hints(); + } else { + ep_man.allow_hints(); + } + }); +} + bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept { try { with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { ++_hints_in_progress; size_t mut_size = fm->representation().size(); shard_stats().size_of_hints_in_progress += mut_size; + shard_resource_manager().inc_size_of_hints_in_progress(mut_size); return with_shared(file_update_mutex(), [this, fm, s, tr_state] () mutable -> future<> { return get_or_load().then([this, fm = std::move(fm), s = std::move(s), tr_state] (hints_store_ptr log_ptr) mutable { @@ -148,6 +160,7 @@ bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr manager::end_point_hints_manager::add_store() noexcept { commitlog::config cfg; cfg.commit_log_location = _hints_dir.c_str(); - cfg.commitlog_segment_size_in_mb = _hint_segment_size_in_mb; - cfg.commitlog_total_space_in_mb = _max_hints_per_ep_size_mb; + cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb; + cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb; cfg.fname_prefix = manager::FILENAME_PREFIX; cfg.extensions = &_shard_manager.local_db().get_config().extensions(); @@ -420,122 +433,10 @@ const column_mapping& manager::end_point_hints_manager::sender::get_column_mappi return cm_it->second; } -manager::space_watchdog::space_watchdog(manager& shard_manager) - : _shard_manager(shard_manager) - , _timer([this] { on_timer(); }) -{} - -void manager::space_watchdog::start() { - _timer.arm(timer_clock_type::now()); -} - -future<> manager::space_watchdog::stop() noexcept { - try { - return _gate.close().finally([this] { _timer.cancel(); }); - } catch (...) { - return make_exception_future<>(std::current_exception()); - } -} - -future<> manager::space_watchdog::scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_key) { - return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key] (lister::path dir, directory_entry de) { - // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory - if (_files_count == 1) { - _eps_with_pending_hints.emplace(ep_key); - } - ++_files_count; - - return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) { - _total_size += fsize; - }); - }); -} - -void manager::space_watchdog::on_timer() { - with_gate(_gate, [this] { - return futurize_apply([this] { - _eps_with_pending_hints.clear(); - _eps_with_pending_hints.reserve(_shard_manager._ep_managers.size()); - _total_size = 0; - - // The hints directories are organized as follows: - // - // |- - // | |- - // | |- - // | |- - // | |- ... - // | |- - // | |- ... - // | |-... - // |- - // | |- ... - // ... - // |- - // | |- ... - // - - // This is a top level shard hints directory, let's enumerate per-end-point sub-directories... - return lister::scan_dir(_shard_manager._hints_dir, {directory_entry_type::directory}, [this] (lister::path dir, directory_entry de) { - _files_count = 0; - // Let's scan per-end-point directories and enumerate hints files... - // - // Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is - // not hintable). - // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply - // continue to enumeration - there is no one to change them. - auto it = _shard_manager.find_ep_manager(de.name); - if (it != _shard_manager.ep_managers_end()) { - return with_lock(it->second.file_update_mutex(), [this, dir = std::move(dir), ep_name = std::move(de.name)]() mutable { - return scan_one_ep_dir(dir / ep_name.c_str(), ep_key_type(ep_name)); - }); - } else { - return scan_one_ep_dir(dir / de.name.c_str(), ep_key_type(de.name)); - } - }).then([this] { - // Adjust the quota to take into account the space we guarantee to every end point manager - size_t adjusted_quota = 0; - size_t delta = _shard_manager._ep_managers.size() * _hint_segment_size_in_mb * 1024 * 1024; - if (max_shard_disk_space_size > delta) { - adjusted_quota = max_shard_disk_space_size - delta; - } - - bool can_hint = _total_size < adjusted_quota; - manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota); - - if (!can_hint) { - manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) { - end_point_hints_manager& ep_man = pair.second; - auto it = _eps_with_pending_hints.find(ep_man.end_point_key()); - if (it != _eps_with_pending_hints.end()) { - ep_man.forbid_hints(); - } else { - ep_man.allow_hints(); - } - }); - } else { - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [] (auto& pair) { - pair.second.allow_hints(); - }); - } - }); - }).handle_exception([this] (auto eptr) { - manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators"); - // Stop all hint generators if space_watchdog callback failed - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) { - pair.second.forbid_hints(); - }); - }).finally([this] { - _timer.arm(_watchdog_period); - }); - }); -} - bool manager::too_many_in_flight_hints_for(ep_key_type ep) const noexcept { // There is no need to check the DC here because if there is an in-flight hint for this end point then this means that // its DC has already been checked and found to be ok. - return _stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; + return _resource_manager.too_many_hints_in_progress() && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; } bool manager::can_hint_for(ep_key_type ep) const noexcept { @@ -552,8 +453,8 @@ bool manager::can_hint_for(ep_key_type ep) const noexcept { // hints is more than the maximum allowed value. // // In the worst case there's going to be (_max_size_of_hints_in_progress + N - 1) in-flight hints, where N is the total number Nodes in the cluster. - if (_stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) { - manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _stats.size_of_hints_in_progress, ep, hints_in_progress_for(ep)); + if (_resource_manager.too_many_hints_in_progress() && hints_in_progress_for(ep) > 0) { + manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _resource_manager.size_of_hints_in_progress(), ep, hints_in_progress_for(ep)); return false; } @@ -623,6 +524,7 @@ manager::end_point_hints_manager::sender::sender(end_point_hints_manager& parent , _ep_key(parent.end_point_key()) , _ep_manager(parent) , _shard_manager(_ep_manager._shard_manager) + , _resource_manager(_shard_manager._resource_manager) , _proxy(local_storage_proxy) , _db(local_db) , _gossiper(local_gossiper) @@ -634,6 +536,7 @@ manager::end_point_hints_manager::sender::sender(const sender& other, end_point_ , _ep_key(parent.end_point_key()) , _ep_manager(parent) , _shard_manager(_ep_manager._shard_manager) + , _resource_manager(_shard_manager._resource_manager) , _proxy(other._proxy) , _db(other._db) , _gossiper(other._gossiper) @@ -720,14 +623,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(mutation m) } future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { - // Let's approximate the memory size the mutation is going to consume by the size of its serialized form - size_t hint_memory_budget = std::max(_shard_manager._min_send_hint_budget, buf.size()); - // Allow a very big mutation to be sent out by consuming the whole shard budget - hint_memory_budget = std::min(hint_memory_budget, _shard_manager._max_send_in_flight_memory); - - manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _shard_manager._send_limiter.available_units()); - - return get_units(_shard_manager._send_limiter, hint_memory_budget).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { + return _resource_manager.get_send_units_for(buf.size()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { try { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 769ea3c087..e9f9d88463 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -32,9 +32,11 @@ #include #include #include "gms/gossiper.hh" +#include "locator/snitch_base.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" +#include "db/hints/resource_manager.hh" namespace service { class storage_service; @@ -61,6 +63,7 @@ private: class drain_tag {}; using drain = seastar::bool_class; +public: class end_point_hints_manager { public: using key_type = gms::inet_address; @@ -107,6 +110,7 @@ private: key_type _ep_key; end_point_hints_manager& _ep_manager; manager& _shard_manager; + resource_manager& _resource_manager; service::storage_proxy& _proxy; database& _db; gms::gossiper& _gossiper; @@ -377,66 +381,22 @@ private: struct stats& shard_stats() { return _shard_manager._stats; } + + resource_manager& shard_resource_manager() { + return _shard_manager._resource_manager; + } }; private: using ep_key_type = typename end_point_hints_manager::key_type; using ep_managers_map_type = std::unordered_map; - class space_watchdog { - private: - static const std::chrono::seconds _watchdog_period; - - private: - std::unordered_set _eps_with_pending_hints; - size_t _total_size = 0; - manager& _shard_manager; - seastar::gate _gate; - seastar::timer _timer; - int _files_count = 0; - - public: - space_watchdog(manager& shard_manager); - future<> stop() noexcept; - void start(); - - private: - /// \brief Check that hints don't occupy too much disk space. - /// - /// Verifies that the whole \ref manager::_hints_dir occupies less than \ref manager::max_shard_disk_space_size. - /// - /// If it does, stop all end point managers that have more than one hints file - we don't want some DOWN Node to - /// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout). - /// - /// This is a simplistic implementation of a manager for a limited shared resource with a minimum guarantied share for all - /// participants. - /// - /// This implementation guaranties at least a single hint share for all end point managers. - void on_timer(); - - /// \brief Scan files in a single end point directory. - /// - /// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID - /// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed - /// value. - /// - /// \param path directory to scan - /// \param ep_name end point ID (as a string) - /// \return future that resolves when scanning is complete - future<> scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_name); - }; - public: static const std::string FILENAME_PREFIX; static const std::chrono::seconds hints_flush_period; static const std::chrono::seconds hint_file_write_timeout; - static size_t max_shard_disk_space_size; private: - static constexpr uint64_t _max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB - static constexpr size_t _hint_segment_size_in_mb = 32; - static constexpr size_t _max_hints_per_ep_size_mb = 128; // 4 files 32MB each - static constexpr size_t _max_hints_send_queue_length = 128; const boost::filesystem::path _hints_dir; node_to_hint_store_factory_type _store_factory; @@ -450,20 +410,17 @@ private: bool _stopping = false; seastar::gate _draining_eps_gate; // gate used to control the progress of ep_managers stopping not in the context of manager::stop() call - // Limit the maximum size of in-flight (being sent) hints by 10% of the shard memory. - // Also don't allow more than 128 in-flight hints to limit the collateral memory consumption as well. - const size_t _max_send_in_flight_memory; - const size_t _min_send_hint_budget; - seastar::semaphore _send_limiter; + resource_manager& _resource_manager; - space_watchdog _space_watchdog; ep_managers_map_type _ep_managers; stats _stats; seastar::metrics::metric_groups _metrics; + std::unordered_set _eps_with_pending_hints; public: - manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, distributed& db); + manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); virtual ~manager(); + void register_metrics(const sstring& group_name); future<> start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr); future<> stop(); bool store_hint(gms::inet_address ep, schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept; @@ -510,6 +467,32 @@ public: return it->second.hints_in_progress(); } + void add_ep_with_pending_hints(ep_key_type key) { + _eps_with_pending_hints.insert(key); + } + + void clear_eps_with_pending_hints() { + _eps_with_pending_hints.clear(); + _eps_with_pending_hints.reserve(_ep_managers.size()); + } + + bool has_ep_with_pending_hints(ep_key_type key) const { + return _eps_with_pending_hints.count(key); + } + + size_t ep_managers_size() const { + return _ep_managers.size(); + } + + const boost::filesystem::path& hints_dir() const { + return _hints_dir; + } + + void allow_hints(); + void forbid_hints(); + void forbid_hints_for_eps_with_pending_hints(); + + static future<> rebalance() { // TODO return make_ready_future<>(); @@ -528,10 +511,6 @@ private: return _store_factory; } - const boost::filesystem::path& hints_dir() const { - return _hints_dir; - } - service::storage_proxy& local_storage_proxy() const noexcept { return *_proxy_anchor; } @@ -558,7 +537,7 @@ private: /// \param endpoint node that left the cluster void drain_for(gms::inet_address endpoint); -private: +public: ep_managers_map_type::iterator find_ep_manager(ep_key_type ep_key) noexcept { return _ep_managers.find(ep_key); } diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc new file mode 100644 index 0000000000..eda9523dc4 --- /dev/null +++ b/db/hints/resource_manager.cc @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "resource_manager.hh" +#include "manager.hh" +#include "log.hh" +#include +#include +#include "lister.hh" +#include "disk-error-handler.hh" +#include "seastarx.hh" + +namespace db { +namespace hints { + +static logging::logger resource_manager_logger("hints_resource_manager"); + +future> resource_manager::get_send_units_for(size_t buf_size) { + // Let's approximate the memory size the mutation is going to consume by the size of its serialized form + size_t hint_memory_budget = std::max(_min_send_hint_budget, buf_size); + // Allow a very big mutation to be sent out by consuming the whole shard budget + hint_memory_budget = std::min(hint_memory_budget, _max_send_in_flight_memory); + resource_manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _send_limiter.available_units()); + return get_units(_send_limiter, hint_memory_budget); +} + +const std::chrono::seconds space_watchdog::_watchdog_period = std::chrono::seconds(1); + +space_watchdog::space_watchdog(shard_managers_set& managers) + : _shard_managers(managers) + , _timer([this] { on_timer(); }) +{} + +void space_watchdog::start() { + _timer.arm(timer_clock_type::now()); +} + +future<> space_watchdog::stop() noexcept { + try { + return _gate.close().finally([this] { _timer.cancel(); }); + } catch (...) { + return make_exception_future<>(std::current_exception()); + } +} + +future<> space_watchdog::scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key) { + return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key, &shard_manager] (lister::path dir, directory_entry de) { + // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory + if (_files_count == 1) { + shard_manager.add_ep_with_pending_hints(ep_key); + } + ++_files_count; + + return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) { + _total_size += fsize; + }); + }); +} + +size_t space_watchdog::end_point_managers_count() const { + return boost::accumulate(_shard_managers, 0, [] (size_t sum, manager& shard_manager) { + return sum + shard_manager.ep_managers_size(); + }); +} + +void space_watchdog::on_timer() { + with_gate(_gate, [this] { + return futurize_apply([this] { + _total_size = 0; + + return do_for_each(_shard_managers, [this] (manager& shard_manager) { + shard_manager.clear_eps_with_pending_hints(); + + // The hints directories are organized as follows: + // + // |- + // | |- + // | |- + // | |- + // | |- ... + // | |- + // | |- ... + // | |-... + // |- + // | |- ... + // ... + // |- + // | |- ... + // + return lister::scan_dir(shard_manager.hints_dir(), {directory_entry_type::directory}, [this, &shard_manager] (lister::path dir, directory_entry de) { + _files_count = 0; + // Let's scan per-end-point directories and enumerate hints files... + // + // Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is + // not hintable). + // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply + // continue to enumeration - there is no one to change them. + auto it = shard_manager.find_ep_manager(de.name); + if (it != shard_manager.ep_managers_end()) { + return with_lock(it->second.file_update_mutex(), [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)]() mutable { + return scan_one_ep_dir(dir / ep_name.c_str(), shard_manager, ep_key_type(ep_name)); + }); + } else { + return scan_one_ep_dir(dir / de.name.c_str(), shard_manager, ep_key_type(de.name)); + } + }); + }).then([this] { + // Adjust the quota to take into account the space we guarantee to every end point manager + size_t adjusted_quota = 0; + size_t delta = end_point_managers_count() * resource_manager::hint_segment_size_in_mb * 1024 * 1024; + if (resource_manager::max_shard_disk_space_size > delta) { + adjusted_quota = resource_manager::max_shard_disk_space_size - delta; + } + + bool can_hint = _total_size < adjusted_quota; + resource_manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota); + + if (!can_hint) { + for (manager& shard_manager : _shard_managers) { + shard_manager.forbid_hints_for_eps_with_pending_hints(); + } + } else { + for (manager& shard_manager : _shard_managers) { + shard_manager.allow_hints(); + } + } + }); + }).handle_exception([this] (auto eptr) { + resource_manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators"); + // Stop all hint generators if space_watchdog callback failed + for (manager& shard_manager : _shard_managers) { + shard_manager.forbid_hints(); + } + }).finally([this] { + _timer.arm(_watchdog_period); + }); + }); +} + +future<> resource_manager::start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr) { + return parallel_for_each(_shard_managers, [proxy_ptr, gossiper_ptr, ss_ptr](manager& m) { + return m.start(proxy_ptr, gossiper_ptr, ss_ptr); + }).then([this]() { + return _space_watchdog.start(); + }); +} + +future<> resource_manager::stop() noexcept { + return parallel_for_each(_shard_managers, [](manager& m) { + return m.stop(); + }).finally([this]() { + return _space_watchdog.stop(); + }); +} + +void resource_manager::register_manager(manager& m) { + _shard_managers.insert(m); +} + +} +} diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh new file mode 100644 index 0000000000..9914b2efd2 --- /dev/null +++ b/db/hints/resource_manager.hh @@ -0,0 +1,153 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "seastarx.hh" +#include +#include +#include + + +namespace service { +class storage_proxy; +class storage_service; +} + +namespace db { +namespace hints { + +using timer_clock_type = seastar::lowres_clock; + +class manager; + +class space_watchdog { +private: + using ep_key_type = gms::inet_address; + static const std::chrono::seconds _watchdog_period; + + struct manager_hash { + size_t operator()(const manager& manager) const { + return reinterpret_cast(&manager); + } + }; + struct manager_comp { + bool operator()(const std::reference_wrapper& m1, const std::reference_wrapper& m2) const { + return std::addressof(m1.get()) == std::addressof(m2.get()); + } + }; + +public: + using shard_managers_set = std::unordered_set, manager_hash, manager_comp>; + +private: + size_t _total_size = 0; + shard_managers_set& _shard_managers; + seastar::gate _gate; + seastar::timer _timer; + int _files_count = 0; + +public: + space_watchdog(shard_managers_set& managers); + void start(); + future<> stop() noexcept; + size_t end_point_managers_count() const; + +private: + /// \brief Check that hints don't occupy too much disk space. + /// + /// Verifies that all \ref manager::_hints_dir dirs for all managers occupy less than \ref resource_manager::max_shard_disk_space_size. + /// + /// If they do, stop all end point managers that have more than one hints file - we don't want some DOWN Node to + /// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout). + /// + /// This is a simplistic implementation of a manager for a limited shared resource with a minimum guaranteed share for all + /// participants. + /// + /// This implementation guarantees at least a single hint share for all end point managers. + void on_timer(); + + /// \brief Scan files in a single end point directory. + /// + /// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID + /// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed + /// value. + /// + /// \param path directory to scan + /// \param ep_name end point ID (as a string) + /// \return future that resolves when scanning is complete + future<> scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key); +}; + +class resource_manager { + const size_t _max_send_in_flight_memory; + const size_t _min_send_hint_budget; + seastar::semaphore _send_limiter; + + uint64_t _size_of_hints_in_progress = 0; + space_watchdog::shard_managers_set _shard_managers; + space_watchdog _space_watchdog; + +public: + static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB + static constexpr size_t hint_segment_size_in_mb = 32; + static constexpr size_t max_hints_per_ep_size_mb = 128; // 4 files 32MB each + static constexpr size_t max_hints_send_queue_length = 128; + static size_t max_shard_disk_space_size; + +public: + resource_manager() + : _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, max_hints_send_queue_length)) + , _min_send_hint_budget(_max_send_in_flight_memory / max_hints_send_queue_length) + , _send_limiter(_max_send_in_flight_memory) + , _space_watchdog(_shard_managers) + {} + + future> get_send_units_for(size_t buf_size); + + bool too_many_hints_in_progress() const { + return _size_of_hints_in_progress > max_size_of_hints_in_progress; + } + + uint64_t size_of_hints_in_progress() const { + return _size_of_hints_in_progress; + } + + inline void inc_size_of_hints_in_progress(int64_t delta) { + _size_of_hints_in_progress += delta; + } + + inline void dec_size_of_hints_in_progress(int64_t delta) { + _size_of_hints_in_progress -= delta; + } + + future<> start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr); + future<> stop() noexcept; + void register_manager(manager& m); +}; + +} +} diff --git a/main.cc b/main.cc index 518ee53e44..bebd3ef03b 100644 --- a/main.cc +++ b/main.cc @@ -518,12 +518,22 @@ int main(int ac, char** av) { directories.insert(db.local().get_config().commitlog_directory()); supervisor::notify("creating hints directories"); - - boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory()); - dirs.touch_and_lock(db.local().get_config().hints_directory()).get(); - directories.insert(db.local().get_config().hints_directory()); + if (hinted_handoff_enabled) { + boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory()); + dirs.touch_and_lock(db.local().get_config().hints_directory()).get(); + directories.insert(db.local().get_config().hints_directory()); + for (unsigned i = 0; i < smp::count; ++i) { + sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native()); + dirs.touch_and_lock(shard_dir).get(); + directories.insert(std::move(shard_dir)); + } + } + boost::filesystem::path view_pending_updates_base_dir = boost::filesystem::path(db.local().get_config().data_file_directories()[0]) / "view_pending_updates"; + sstring view_pending_updates_base_dir_str = view_pending_updates_base_dir.native(); + dirs.touch_and_lock(view_pending_updates_base_dir_str).get(); + directories.insert(view_pending_updates_base_dir_str); for (unsigned i = 0; i < smp::count; ++i) { - sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native()); + sstring shard_dir((view_pending_updates_base_dir / seastar::to_sstring(i).c_str()).native()); dirs.touch_and_lock(shard_dir).get(); directories.insert(std::move(shard_dir)); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3ca890dc51..03c0755c11 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -633,7 +633,9 @@ void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep } storage_proxy::~storage_proxy() {} -storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) : _db(db) { +storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) + : _db(db) + , _hints_for_views_manager(_db.local().get_config().data_file_directories()[0] + "/view_pending_updates", {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db) { namespace sm = seastar::metrics; _metrics.add_group(COORDINATOR_STATS_CATEGORY, { sm::make_histogram("read_latency", sm::description("The general read latency histogram"), [this]{ return _stats.estimated_read.get_histogram(16, 20);}), @@ -724,17 +726,20 @@ storage_proxy::storage_proxy(distributed& db, stdx::optionalregister_metrics("hints_manager"); + _hints_resource_manager.register_manager(*_hints_manager); + } + + _hints_for_views_manager.register_metrics("hints_for_views_manager"); + _hints_resource_manager.register_manager(_hints_for_views_manager); } storage_proxy::rh_entry::rh_entry(shared_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} @@ -1789,8 +1794,9 @@ template size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept { if (hints_enabled(type)) { - return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state)] (gms::inet_address target) mutable -> bool { - return _hints_manager->store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); + db::hints::manager& hints_manager = hints_manager_for(type); + return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { + return hints_manager.store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); }); } else { return 0; @@ -3639,7 +3645,11 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, dht::partiti } bool storage_proxy::hints_enabled(db::write_type type) noexcept { - return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager)); + return bool(_hints_manager) || type == db::write_type::VIEW; +} + +db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) { + return type == db::write_type::VIEW ? _hints_for_views_manager : *_hints_manager; } future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) { @@ -4431,17 +4441,11 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, } future<> storage_proxy::start_hints_manager(shared_ptr gossiper_ptr, shared_ptr ss_ptr) { - if (_hints_manager) { - return _hints_manager->start(shared_from_this(), std::move(gossiper_ptr), std::move(ss_ptr)); - } - return make_ready_future<>(); + return _hints_resource_manager.start(shared_from_this(), gossiper_ptr, ss_ptr); } future<> storage_proxy::stop_hints_manager() { - if (_hints_manager) { - return _hints_manager->stop(); - } - return make_ready_future<>(); + return _hints_resource_manager.stop(); } future<> diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index a6011123d6..313c98138c 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -149,8 +149,9 @@ private: // not remove request from the buffer), but this is fine since request ids are unique, so we // just skip an entry if request no longer exists. circular_buffer _throttled_writes; + db::hints::resource_manager _hints_resource_manager; stdx::optional _hints_manager; - bool _hints_enabled_for_user_writes = false; + db::hints::manager _hints_for_views_manager; stats _stats; static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10; // for read repair chance calculation @@ -180,6 +181,7 @@ private: template bool cannot_hint(const Range& targets, db::write_type type); bool hints_enabled(db::write_type type) noexcept; + db::hints::manager& hints_manager_for(db::write_type type); std::vector get_live_endpoints(keyspace& ks, const dht::token& token); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s);