From 23159378545eecd0b286a9c05e48295f3db099af Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 30 May 2018 10:39:18 +0200 Subject: [PATCH 1/7] hints: move constants to resource_manager Constants related to managing resources are moved to newly created resource_manager class. Later, this class will be used to manage (potentially shared) resources of hints managers. --- db/hints/manager.cc | 22 ++++++++--------- db/hints/manager.hh | 6 +---- db/hints/resource_manager.hh | 47 ++++++++++++++++++++++++++++++++++++ service/storage_proxy.cc | 2 +- 4 files changed, 59 insertions(+), 18 deletions(-) create mode 100644 db/hints/resource_manager.hh diff --git a/db/hints/manager.cc b/db/hints/manager.cc index bec76f86d0..d3f67ee03b 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -43,10 +43,8 @@ const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::S const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2); const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10); const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chrono::seconds(1); -// TODO: remove this when we switch to C++17 -constexpr size_t manager::_max_hints_send_queue_length; -size_t db::hints::manager::max_shard_disk_space_size; +size_t db::hints::resource_manager::max_shard_disk_space_size; manager::manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, distributed& db) : _hints_dir(boost::filesystem::path(hints_directory) / format("{:d}", engine().cpu_id()).c_str()) @@ -54,8 +52,8 @@ manager::manager(sstring hints_directory, std::vector hinted_dcs, int64 , _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr()) , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) - , _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, _max_hints_send_queue_length)) - , _min_send_hint_budget(_max_send_in_flight_memory / _max_hints_send_queue_length) + , _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, resource_manager::max_hints_send_queue_length)) + , _min_send_hint_budget(_max_send_in_flight_memory / resource_manager::max_hints_send_queue_length) , _send_limiter(_max_send_in_flight_memory) , _space_watchdog(*this) { @@ -280,8 +278,8 @@ future manager::end_point_hints_manager::add_store() noexcept { commitlog::config cfg; cfg.commit_log_location = _hints_dir.c_str(); - cfg.commitlog_segment_size_in_mb = _hint_segment_size_in_mb; - cfg.commitlog_total_space_in_mb = _max_hints_per_ep_size_mb; + cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb; + cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb; cfg.fname_prefix = manager::FILENAME_PREFIX; cfg.extensions = &_shard_manager.local_db().get_config().extensions(); @@ -495,9 +493,9 @@ void manager::space_watchdog::on_timer() { }).then([this] { // Adjust the quota to take into account the space we guarantee to every end point manager size_t adjusted_quota = 0; - size_t delta = _shard_manager._ep_managers.size() * _hint_segment_size_in_mb * 1024 * 1024; - if (max_shard_disk_space_size > delta) { - adjusted_quota = max_shard_disk_space_size - delta; + size_t delta = _shard_manager._ep_managers.size() * resource_manager::hint_segment_size_in_mb * 1024 * 1024; + if (resource_manager::max_shard_disk_space_size > delta) { + adjusted_quota = resource_manager::max_shard_disk_space_size - delta; } bool can_hint = _total_size < adjusted_quota; @@ -535,7 +533,7 @@ void manager::space_watchdog::on_timer() { bool manager::too_many_in_flight_hints_for(ep_key_type ep) const noexcept { // There is no need to check the DC here because if there is an in-flight hint for this end point then this means that // its DC has already been checked and found to be ok. - return _stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; + return _stats.size_of_hints_in_progress > resource_manager::max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; } bool manager::can_hint_for(ep_key_type ep) const noexcept { @@ -552,7 +550,7 @@ bool manager::can_hint_for(ep_key_type ep) const noexcept { // hints is more than the maximum allowed value. // // In the worst case there's going to be (_max_size_of_hints_in_progress + N - 1) in-flight hints, where N is the total number Nodes in the cluster. - if (_stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) { + if (_stats.size_of_hints_in_progress > resource_manager::max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) { manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _stats.size_of_hints_in_progress, ep, hints_in_progress_for(ep)); return false; } diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 769ea3c087..57a33c4420 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -35,6 +35,7 @@ #include "service/endpoint_lifecycle_subscriber.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" +#include "db/hints/resource_manager.hh" namespace service { class storage_service; @@ -430,13 +431,8 @@ public: static const std::string FILENAME_PREFIX; static const std::chrono::seconds hints_flush_period; static const std::chrono::seconds hint_file_write_timeout; - static size_t max_shard_disk_space_size; private: - static constexpr uint64_t _max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB - static constexpr size_t _hint_segment_size_in_mb = 32; - static constexpr size_t _max_hints_per_ep_size_mb = 128; // 4 files 32MB each - static constexpr size_t _max_hints_send_queue_length = 128; const boost::filesystem::path _hints_dir; node_to_hint_store_factory_type _store_factory; diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh new file mode 100644 index 0000000000..251b39bc21 --- /dev/null +++ b/db/hints/resource_manager.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "seastarx.hh" +#include +#include +#include + +namespace db { +namespace hints { + +class resource_manager { +public: + static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB + static constexpr size_t hint_segment_size_in_mb = 32; + static constexpr size_t max_hints_per_ep_size_mb = 128; // 4 files 32MB each + static constexpr size_t max_hints_send_queue_length = 128; + static size_t max_shard_disk_space_size; +}; + +} +} diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3ca890dc51..62b949aca8 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -733,7 +733,7 @@ storage_proxy::storage_proxy(distributed& db, stdx::optional Date: Wed, 30 May 2018 11:23:56 +0200 Subject: [PATCH 2/7] hints: move send limiter to resource manager Send limiting semaphore is moved from hints manager to resource manager. In consequence, hints manager now keeps a reference to its resource manager. --- configure.py | 1 + db/hints/manager.cc | 25 +++++++++-------------- db/hints/manager.hh | 13 ++++++------ db/hints/resource_manager.cc | 39 ++++++++++++++++++++++++++++++++++++ db/hints/resource_manager.hh | 31 ++++++++++++++++++++++++++++ service/storage_proxy.cc | 2 +- service/storage_proxy.hh | 1 + 7 files changed, 90 insertions(+), 22 deletions(-) create mode 100644 db/hints/resource_manager.cc diff --git a/configure.py b/configure.py index b98135d7e7..b048d91c92 100755 --- a/configure.py +++ b/configure.py @@ -518,6 +518,7 @@ scylla_core = (['database.cc', 'db/commitlog/commitlog_replayer.cc', 'db/commitlog/commitlog_entry.cc', 'db/hints/manager.cc', + 'db/hints/resource_manager.cc', 'db/config.cc', 'db/extensions.cc', 'db/heat_load_balance.cc', diff --git a/db/hints/manager.cc b/db/hints/manager.cc index d3f67ee03b..4b7813f2c1 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -46,15 +46,13 @@ const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chro size_t db::hints::resource_manager::max_shard_disk_space_size; -manager::manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, distributed& db) +manager::manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager& res_manager, distributed& db) : _hints_dir(boost::filesystem::path(hints_directory) / format("{:d}", engine().cpu_id()).c_str()) , _hinted_dcs(hinted_dcs.begin(), hinted_dcs.end()) , _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr()) , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) - , _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, resource_manager::max_hints_send_queue_length)) - , _min_send_hint_budget(_max_send_in_flight_memory / resource_manager::max_hints_send_queue_length) - , _send_limiter(_max_send_in_flight_memory) + , _resource_manager(res_manager) , _space_watchdog(*this) { namespace sm = seastar::metrics; @@ -126,6 +124,7 @@ bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptrrepresentation().size(); shard_stats().size_of_hints_in_progress += mut_size; + shard_resource_manager().inc_size_of_hints_in_progress(mut_size); return with_shared(file_update_mutex(), [this, fm, s, tr_state] () mutable -> future<> { return get_or_load().then([this, fm = std::move(fm), s = std::move(s), tr_state] (hints_store_ptr log_ptr) mutable { @@ -146,6 +145,7 @@ bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr resource_manager::max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; + return _resource_manager.too_many_hints_in_progress() && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; } bool manager::can_hint_for(ep_key_type ep) const noexcept { @@ -550,8 +550,8 @@ bool manager::can_hint_for(ep_key_type ep) const noexcept { // hints is more than the maximum allowed value. // // In the worst case there's going to be (_max_size_of_hints_in_progress + N - 1) in-flight hints, where N is the total number Nodes in the cluster. - if (_stats.size_of_hints_in_progress > resource_manager::max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) { - manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _stats.size_of_hints_in_progress, ep, hints_in_progress_for(ep)); + if (_resource_manager.too_many_hints_in_progress() && hints_in_progress_for(ep) > 0) { + manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _resource_manager.size_of_hints_in_progress(), ep, hints_in_progress_for(ep)); return false; } @@ -621,6 +621,7 @@ manager::end_point_hints_manager::sender::sender(end_point_hints_manager& parent , _ep_key(parent.end_point_key()) , _ep_manager(parent) , _shard_manager(_ep_manager._shard_manager) + , _resource_manager(_shard_manager._resource_manager) , _proxy(local_storage_proxy) , _db(local_db) , _gossiper(local_gossiper) @@ -632,6 +633,7 @@ manager::end_point_hints_manager::sender::sender(const sender& other, end_point_ , _ep_key(parent.end_point_key()) , _ep_manager(parent) , _shard_manager(_ep_manager._shard_manager) + , _resource_manager(_shard_manager._resource_manager) , _proxy(other._proxy) , _db(other._db) , _gossiper(other._gossiper) @@ -718,14 +720,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(mutation m) } future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { - // Let's approximate the memory size the mutation is going to consume by the size of its serialized form - size_t hint_memory_budget = std::max(_shard_manager._min_send_hint_budget, buf.size()); - // Allow a very big mutation to be sent out by consuming the whole shard budget - hint_memory_budget = std::min(hint_memory_budget, _shard_manager._max_send_in_flight_memory); - - manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _shard_manager._send_limiter.available_units()); - - return get_units(_shard_manager._send_limiter, hint_memory_budget).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { + return _resource_manager.get_send_units_for(buf.size()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { try { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 57a33c4420..4de97fa44a 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -108,6 +108,7 @@ private: key_type _ep_key; end_point_hints_manager& _ep_manager; manager& _shard_manager; + resource_manager& _resource_manager; service::storage_proxy& _proxy; database& _db; gms::gossiper& _gossiper; @@ -378,6 +379,10 @@ private: struct stats& shard_stats() { return _shard_manager._stats; } + + resource_manager& shard_resource_manager() { + return _shard_manager._resource_manager; + } }; private: @@ -446,11 +451,7 @@ private: bool _stopping = false; seastar::gate _draining_eps_gate; // gate used to control the progress of ep_managers stopping not in the context of manager::stop() call - // Limit the maximum size of in-flight (being sent) hints by 10% of the shard memory. - // Also don't allow more than 128 in-flight hints to limit the collateral memory consumption as well. - const size_t _max_send_in_flight_memory; - const size_t _min_send_hint_budget; - seastar::semaphore _send_limiter; + resource_manager& _resource_manager; space_watchdog _space_watchdog; ep_managers_map_type _ep_managers; @@ -458,7 +459,7 @@ private: seastar::metrics::metric_groups _metrics; public: - manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, distributed& db); + manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); virtual ~manager(); future<> start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr); future<> stop(); diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc new file mode 100644 index 0000000000..cb698358e8 --- /dev/null +++ b/db/hints/resource_manager.cc @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "resource_manager.hh" + +namespace db { +namespace hints { + +static logging::logger resource_manager_logger("hints_resource_manager"); + +future> resource_manager::get_send_units_for(size_t buf_size) { + // Let's approximate the memory size the mutation is going to consume by the size of its serialized form + size_t hint_memory_budget = std::max(_min_send_hint_budget, buf_size); + // Allow a very big mutation to be sent out by consuming the whole shard budget + hint_memory_budget = std::min(hint_memory_budget, _max_send_in_flight_memory); + resource_manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _send_limiter.available_units()); + return get_units(_send_limiter, hint_memory_budget); +} + +} +} diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index 251b39bc21..795057d401 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -35,12 +35,43 @@ namespace db { namespace hints { class resource_manager { + const size_t _max_send_in_flight_memory; + const size_t _min_send_hint_budget; + seastar::semaphore _send_limiter; + + uint64_t _size_of_hints_in_progress = 0; + public: static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB static constexpr size_t hint_segment_size_in_mb = 32; static constexpr size_t max_hints_per_ep_size_mb = 128; // 4 files 32MB each static constexpr size_t max_hints_send_queue_length = 128; static size_t max_shard_disk_space_size; + +public: + resource_manager() + : _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, max_hints_send_queue_length)) + , _min_send_hint_budget(_max_send_in_flight_memory / max_hints_send_queue_length) + , _send_limiter(_max_send_in_flight_memory) + {} + + future> get_send_units_for(size_t buf_size); + + bool too_many_hints_in_progress() const { + return _size_of_hints_in_progress > max_size_of_hints_in_progress; + } + + uint64_t size_of_hints_in_progress() const { + return _size_of_hints_in_progress; + } + + inline void inc_size_of_hints_in_progress(int64_t delta) { + _size_of_hints_in_progress += delta; + } + + inline void dec_size_of_hints_in_progress(int64_t delta) { + _size_of_hints_in_progress -= delta; + } }; } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 62b949aca8..5d162ab35c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -734,7 +734,7 @@ storage_proxy::storage_proxy(distributed& db, stdx::optional&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index a6011123d6..a73aba75c0 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -149,6 +149,7 @@ private: // not remove request from the buffer), but this is fine since request ids are unique, so we // just skip an entry if request no longer exists. circular_buffer _throttled_writes; + db::hints::resource_manager _hints_resource_manager; stdx::optional _hints_manager; bool _hints_enabled_for_user_writes = false; stats _stats; From f345efc79ae791b0076c316a244b8abd6508c27d Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 30 May 2018 12:11:10 +0200 Subject: [PATCH 3/7] hints: move space_watchdog to resource manager Space watchdog is decoupled from hints manager and moved to resource manager, so it can be shared among different hints manager instances. --- db/hints/manager.cc | 143 ++++++----------------------------- db/hints/manager.hh | 79 ++++++++----------- db/hints/resource_manager.cc | 140 ++++++++++++++++++++++++++++++++++ db/hints/resource_manager.hh | 77 ++++++++++++++++++- 4 files changed, 268 insertions(+), 171 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 4b7813f2c1..228bace506 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -42,7 +42,6 @@ const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::S const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2); const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10); -const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chrono::seconds(1); size_t db::hints::resource_manager::max_shard_disk_space_size; @@ -53,7 +52,6 @@ manager::manager(sstring hints_directory, std::vector hinted_dcs, int64 , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) , _resource_manager(res_manager) - , _space_watchdog(*this) { namespace sm = seastar::metrics; @@ -91,8 +89,6 @@ future<> manager::start(shared_ptr proxy_ptr, shared_ptr return get_ep_manager(ep).populate_segments_to_replay(); }).then([this] { _strorage_service_anchor->register_subscriber(this); - // we are ready to store new hints... - _space_watchdog.start(); }); } @@ -106,18 +102,35 @@ future<> manager::stop() { _stopping = true; return _draining_eps_gate.close().finally([this] { - return when_all( - parallel_for_each(_ep_managers, [] (auto& pair) { + return parallel_for_each(_ep_managers, [] (auto& pair) { return pair.second.stop(); - }), - _space_watchdog.stop() - ).finally([this] { + }).finally([this] { _ep_managers.clear(); manager_logger.info("Stopped"); }).discard_result(); }); } +void manager::allow_hints() { + boost::for_each(_ep_managers, [] (auto& pair) { pair.second.allow_hints(); }); +} + +void manager::forbid_hints() { + boost::for_each(_ep_managers, [] (auto& pair) { pair.second.forbid_hints(); }); +} + +void manager::forbid_hints_for_eps_with_pending_hints() { + manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); + boost::for_each(_ep_managers, [this] (auto& pair) { + end_point_hints_manager& ep_man = pair.second; + if (has_ep_with_pending_hints(ep_man.end_point_key())) { + ep_man.forbid_hints(); + } else { + ep_man.allow_hints(); + } + }); +} + bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept { try { with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { @@ -418,118 +431,6 @@ const column_mapping& manager::end_point_hints_manager::sender::get_column_mappi return cm_it->second; } -manager::space_watchdog::space_watchdog(manager& shard_manager) - : _shard_manager(shard_manager) - , _timer([this] { on_timer(); }) -{} - -void manager::space_watchdog::start() { - _timer.arm(timer_clock_type::now()); -} - -future<> manager::space_watchdog::stop() noexcept { - try { - return _gate.close().finally([this] { _timer.cancel(); }); - } catch (...) { - return make_exception_future<>(std::current_exception()); - } -} - -future<> manager::space_watchdog::scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_key) { - return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key] (lister::path dir, directory_entry de) { - // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory - if (_files_count == 1) { - _eps_with_pending_hints.emplace(ep_key); - } - ++_files_count; - - return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) { - _total_size += fsize; - }); - }); -} - -void manager::space_watchdog::on_timer() { - with_gate(_gate, [this] { - return futurize_apply([this] { - _eps_with_pending_hints.clear(); - _eps_with_pending_hints.reserve(_shard_manager._ep_managers.size()); - _total_size = 0; - - // The hints directories are organized as follows: - // - // |- - // | |- - // | |- - // | |- - // | |- ... - // | |- - // | |- ... - // | |-... - // |- - // | |- ... - // ... - // |- - // | |- ... - // - - // This is a top level shard hints directory, let's enumerate per-end-point sub-directories... - return lister::scan_dir(_shard_manager._hints_dir, {directory_entry_type::directory}, [this] (lister::path dir, directory_entry de) { - _files_count = 0; - // Let's scan per-end-point directories and enumerate hints files... - // - // Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is - // not hintable). - // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply - // continue to enumeration - there is no one to change them. - auto it = _shard_manager.find_ep_manager(de.name); - if (it != _shard_manager.ep_managers_end()) { - return with_lock(it->second.file_update_mutex(), [this, dir = std::move(dir), ep_name = std::move(de.name)]() mutable { - return scan_one_ep_dir(dir / ep_name.c_str(), ep_key_type(ep_name)); - }); - } else { - return scan_one_ep_dir(dir / de.name.c_str(), ep_key_type(de.name)); - } - }).then([this] { - // Adjust the quota to take into account the space we guarantee to every end point manager - size_t adjusted_quota = 0; - size_t delta = _shard_manager._ep_managers.size() * resource_manager::hint_segment_size_in_mb * 1024 * 1024; - if (resource_manager::max_shard_disk_space_size > delta) { - adjusted_quota = resource_manager::max_shard_disk_space_size - delta; - } - - bool can_hint = _total_size < adjusted_quota; - manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota); - - if (!can_hint) { - manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) { - end_point_hints_manager& ep_man = pair.second; - auto it = _eps_with_pending_hints.find(ep_man.end_point_key()); - if (it != _eps_with_pending_hints.end()) { - ep_man.forbid_hints(); - } else { - ep_man.allow_hints(); - } - }); - } else { - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [] (auto& pair) { - pair.second.allow_hints(); - }); - } - }); - }).handle_exception([this] (auto eptr) { - manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators"); - // Stop all hint generators if space_watchdog callback failed - std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) { - pair.second.forbid_hints(); - }); - }).finally([this] { - _timer.arm(_watchdog_period); - }); - }); -} - bool manager::too_many_in_flight_hints_for(ep_key_type ep) const noexcept { // There is no need to check the DC here because if there is an in-flight hint for this end point then this means that // its DC has already been checked and found to be ok. diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 4de97fa44a..f9d861ff1d 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -32,6 +32,7 @@ #include #include #include "gms/gossiper.hh" +#include "locator/snitch_base.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" @@ -62,6 +63,7 @@ private: class drain_tag {}; using drain = seastar::bool_class; +public: class end_point_hints_manager { public: using key_type = gms::inet_address; @@ -389,49 +391,6 @@ private: using ep_key_type = typename end_point_hints_manager::key_type; using ep_managers_map_type = std::unordered_map; - class space_watchdog { - private: - static const std::chrono::seconds _watchdog_period; - - private: - std::unordered_set _eps_with_pending_hints; - size_t _total_size = 0; - manager& _shard_manager; - seastar::gate _gate; - seastar::timer _timer; - int _files_count = 0; - - public: - space_watchdog(manager& shard_manager); - future<> stop() noexcept; - void start(); - - private: - /// \brief Check that hints don't occupy too much disk space. - /// - /// Verifies that the whole \ref manager::_hints_dir occupies less than \ref manager::max_shard_disk_space_size. - /// - /// If it does, stop all end point managers that have more than one hints file - we don't want some DOWN Node to - /// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout). - /// - /// This is a simplistic implementation of a manager for a limited shared resource with a minimum guarantied share for all - /// participants. - /// - /// This implementation guaranties at least a single hint share for all end point managers. - void on_timer(); - - /// \brief Scan files in a single end point directory. - /// - /// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID - /// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed - /// value. - /// - /// \param path directory to scan - /// \param ep_name end point ID (as a string) - /// \return future that resolves when scanning is complete - future<> scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_name); - }; - public: static const std::string FILENAME_PREFIX; static const std::chrono::seconds hints_flush_period; @@ -453,10 +412,10 @@ private: resource_manager& _resource_manager; - space_watchdog _space_watchdog; ep_managers_map_type _ep_managers; stats _stats; seastar::metrics::metric_groups _metrics; + std::unordered_set _eps_with_pending_hints; public: manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); @@ -507,6 +466,32 @@ public: return it->second.hints_in_progress(); } + void add_ep_with_pending_hints(ep_key_type key) { + _eps_with_pending_hints.insert(key); + } + + void clear_eps_with_pending_hints() { + _eps_with_pending_hints.clear(); + _eps_with_pending_hints.reserve(_ep_managers.size()); + } + + bool has_ep_with_pending_hints(ep_key_type key) const { + return _eps_with_pending_hints.count(key); + } + + size_t ep_managers_size() const { + return _ep_managers.size(); + } + + const boost::filesystem::path& hints_dir() const { + return _hints_dir; + } + + void allow_hints(); + void forbid_hints(); + void forbid_hints_for_eps_with_pending_hints(); + + static future<> rebalance() { // TODO return make_ready_future<>(); @@ -525,10 +510,6 @@ private: return _store_factory; } - const boost::filesystem::path& hints_dir() const { - return _hints_dir; - } - service::storage_proxy& local_storage_proxy() const noexcept { return *_proxy_anchor; } @@ -555,7 +536,7 @@ private: /// \param endpoint node that left the cluster void drain_for(gms::inet_address endpoint); -private: +public: ep_managers_map_type::iterator find_ep_manager(ep_key_type ep_key) noexcept { return _ep_managers.find(ep_key); } diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc index cb698358e8..eda9523dc4 100644 --- a/db/hints/resource_manager.cc +++ b/db/hints/resource_manager.cc @@ -20,6 +20,13 @@ */ #include "resource_manager.hh" +#include "manager.hh" +#include "log.hh" +#include +#include +#include "lister.hh" +#include "disk-error-handler.hh" +#include "seastarx.hh" namespace db { namespace hints { @@ -35,5 +42,138 @@ future> resource_manager::g return get_units(_send_limiter, hint_memory_budget); } +const std::chrono::seconds space_watchdog::_watchdog_period = std::chrono::seconds(1); + +space_watchdog::space_watchdog(shard_managers_set& managers) + : _shard_managers(managers) + , _timer([this] { on_timer(); }) +{} + +void space_watchdog::start() { + _timer.arm(timer_clock_type::now()); +} + +future<> space_watchdog::stop() noexcept { + try { + return _gate.close().finally([this] { _timer.cancel(); }); + } catch (...) { + return make_exception_future<>(std::current_exception()); + } +} + +future<> space_watchdog::scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key) { + return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key, &shard_manager] (lister::path dir, directory_entry de) { + // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory + if (_files_count == 1) { + shard_manager.add_ep_with_pending_hints(ep_key); + } + ++_files_count; + + return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) { + _total_size += fsize; + }); + }); +} + +size_t space_watchdog::end_point_managers_count() const { + return boost::accumulate(_shard_managers, 0, [] (size_t sum, manager& shard_manager) { + return sum + shard_manager.ep_managers_size(); + }); +} + +void space_watchdog::on_timer() { + with_gate(_gate, [this] { + return futurize_apply([this] { + _total_size = 0; + + return do_for_each(_shard_managers, [this] (manager& shard_manager) { + shard_manager.clear_eps_with_pending_hints(); + + // The hints directories are organized as follows: + // + // |- + // | |- + // | |- + // | |- + // | |- ... + // | |- + // | |- ... + // | |-... + // |- + // | |- ... + // ... + // |- + // | |- ... + // + return lister::scan_dir(shard_manager.hints_dir(), {directory_entry_type::directory}, [this, &shard_manager] (lister::path dir, directory_entry de) { + _files_count = 0; + // Let's scan per-end-point directories and enumerate hints files... + // + // Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is + // not hintable). + // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply + // continue to enumeration - there is no one to change them. + auto it = shard_manager.find_ep_manager(de.name); + if (it != shard_manager.ep_managers_end()) { + return with_lock(it->second.file_update_mutex(), [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)]() mutable { + return scan_one_ep_dir(dir / ep_name.c_str(), shard_manager, ep_key_type(ep_name)); + }); + } else { + return scan_one_ep_dir(dir / de.name.c_str(), shard_manager, ep_key_type(de.name)); + } + }); + }).then([this] { + // Adjust the quota to take into account the space we guarantee to every end point manager + size_t adjusted_quota = 0; + size_t delta = end_point_managers_count() * resource_manager::hint_segment_size_in_mb * 1024 * 1024; + if (resource_manager::max_shard_disk_space_size > delta) { + adjusted_quota = resource_manager::max_shard_disk_space_size - delta; + } + + bool can_hint = _total_size < adjusted_quota; + resource_manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota); + + if (!can_hint) { + for (manager& shard_manager : _shard_managers) { + shard_manager.forbid_hints_for_eps_with_pending_hints(); + } + } else { + for (manager& shard_manager : _shard_managers) { + shard_manager.allow_hints(); + } + } + }); + }).handle_exception([this] (auto eptr) { + resource_manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators"); + // Stop all hint generators if space_watchdog callback failed + for (manager& shard_manager : _shard_managers) { + shard_manager.forbid_hints(); + } + }).finally([this] { + _timer.arm(_watchdog_period); + }); + }); +} + +future<> resource_manager::start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr) { + return parallel_for_each(_shard_managers, [proxy_ptr, gossiper_ptr, ss_ptr](manager& m) { + return m.start(proxy_ptr, gossiper_ptr, ss_ptr); + }).then([this]() { + return _space_watchdog.start(); + }); +} + +future<> resource_manager::stop() noexcept { + return parallel_for_each(_shard_managers, [](manager& m) { + return m.stop(); + }).finally([this]() { + return _space_watchdog.stop(); + }); +} + +void resource_manager::register_manager(manager& m) { + _shard_managers.insert(m); +} + } } diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index 795057d401..9664070787 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -29,17 +29,87 @@ #include "seastarx.hh" #include #include -#include +#include + + +namespace service { +class storage_proxy; +class storage_service; +} namespace db { namespace hints { +using timer_clock_type = seastar::lowres_clock; + +class manager; + +class space_watchdog { +private: + using ep_key_type = gms::inet_address; + static const std::chrono::seconds _watchdog_period; + + struct manager_hash { + size_t operator()(const manager& manager) const { + return reinterpret_cast(&manager); + } + }; + struct manager_comp { + bool operator()(const std::reference_wrapper& m1, const std::reference_wrapper& m2) const { + return std::addressof(m1.get()) == std::addressof(m2.get()); + } + }; + +public: + using shard_managers_set = std::unordered_set, manager_hash, manager_comp>; + +private: + size_t _total_size = 0; + shard_managers_set& _shard_managers; + seastar::gate _gate; + seastar::timer _timer; + int _files_count = 0; + +public: + space_watchdog(shard_managers_set& managers); + void start(); + future<> stop() noexcept; + size_t end_point_managers_count() const; + +private: + /// \brief Check that hints don't occupy too much disk space. + /// + /// Verifies that all \ref manager::_hints_dir dirs for all managers occupy less than \ref resource_limits::max_shard_disk_space_size. + /// + /// If they do, stop all end point managers that have more than one hints file - we don't want some DOWN Node to + /// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout). + /// + /// This is a simplistic implementation of a manager for a limited shared resource with a minimum guaranteed share for all + /// participants. + /// + /// This implementation guarantees at least a single hint share for all end point managers. + void on_timer(); + + /// \brief Scan files in a single end point directory. + /// + /// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID + /// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed + /// value. + /// + /// \param path directory to scan + /// \param ep_name end point ID (as a string) + /// \return future that resolves when scanning is complete + future<> scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key); +}; + class resource_manager { const size_t _max_send_in_flight_memory; const size_t _min_send_hint_budget; seastar::semaphore _send_limiter; uint64_t _size_of_hints_in_progress = 0; + space_watchdog::shard_managers_set _shard_managers; + space_watchdog _space_watchdog; public: static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB @@ -53,6 +123,7 @@ public: : _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, max_hints_send_queue_length)) , _min_send_hint_budget(_max_send_in_flight_memory / max_hints_send_queue_length) , _send_limiter(_max_send_in_flight_memory) + , _space_watchdog(_shard_managers) {} future> get_send_units_for(size_t buf_size); @@ -72,6 +143,10 @@ public: inline void dec_size_of_hints_in_progress(int64_t delta) { _size_of_hints_in_progress -= delta; } + + future<> start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr); + future<> stop() noexcept; + void register_manager(manager& m); }; } From a791dce0ae33c10b4c59f34c76bf873eacce2fd0 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 25 May 2018 17:10:51 +0200 Subject: [PATCH 4/7] db, config: add view_pending_updates directory Hints for materialized view updates need to be kept somewhere, because their dedicated hints manager has to have a root directory. view_pending_updates directory resides in /data and is used for that purpose. --- main.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/main.cc b/main.cc index 518ee53e44..df7107ff0d 100644 --- a/main.cc +++ b/main.cc @@ -527,6 +527,15 @@ int main(int ac, char** av) { dirs.touch_and_lock(shard_dir).get(); directories.insert(std::move(shard_dir)); } + boost::filesystem::path view_pending_updates_base_dir = boost::filesystem::path(db.local().get_config().data_file_directories()[0]) / "view_pending_updates"; + sstring view_pending_updates_base_dir_str = view_pending_updates_base_dir.native(); + dirs.touch_and_lock(view_pending_updates_base_dir_str).get(); + directories.insert(view_pending_updates_base_dir_str); + for (unsigned i = 0; i < smp::count; ++i) { + sstring shard_dir((view_pending_updates_base_dir / seastar::to_sstring(i).c_str()).native()); + dirs.touch_and_lock(shard_dir).get(); + directories.insert(std::move(shard_dir)); + } supervisor::notify("verifying directories"); parallel_for_each(directories, [&db] (sstring pathname) { From 204bc17bd7274cc0e0623645eb98a4ed0ba7c384 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 28 May 2018 10:36:57 +0200 Subject: [PATCH 5/7] hints: decouple hints manager metrics from constructor Now that more than one instance of hints manager can be present at the same time, registering metrics is moved out of the constructor to prevent 'registering metrics twice' errors. --- db/hints/manager.cc | 14 ++++++++------ db/hints/manager.hh | 1 + db/hints/resource_manager.hh | 2 +- service/storage_proxy.cc | 2 ++ 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 228bace506..45722c42aa 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -52,10 +52,16 @@ manager::manager(sstring hints_directory, std::vector hinted_dcs, int64 , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) , _resource_manager(res_manager) -{ +{} + +manager::~manager() { + assert(_ep_managers.empty()); +} + +void manager::register_metrics(const sstring& group_name) { namespace sm = seastar::metrics; - _metrics.add_group("hints_manager", { + _metrics.add_group(group_name, { sm::make_gauge("size_of_hints_in_progress", _stats.size_of_hints_in_progress, sm::description("Size of hinted mutations that are scheduled to be written.")), @@ -73,10 +79,6 @@ manager::manager(sstring hints_directory, std::vector hinted_dcs, int64 }); } -manager::~manager() { - assert(_ep_managers.empty()); -} - future<> manager::start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr) { _proxy_anchor = std::move(proxy_ptr); _gossiper_anchor = std::move(gossiper_ptr); diff --git a/db/hints/manager.hh b/db/hints/manager.hh index f9d861ff1d..e9f9d88463 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -420,6 +420,7 @@ private: public: manager(sstring hints_directory, std::vector hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); virtual ~manager(); + void register_metrics(const sstring& group_name); future<> start(shared_ptr proxy_ptr, shared_ptr gossiper_ptr, shared_ptr ss_ptr); future<> stop(); bool store_hint(gms::inet_address ep, schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept; diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index 9664070787..9914b2efd2 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -79,7 +79,7 @@ public: private: /// \brief Check that hints don't occupy too much disk space. /// - /// Verifies that all \ref manager::_hints_dir dirs for all managers occupy less than \ref resource_limits::max_shard_disk_space_size. + /// Verifies that all \ref manager::_hints_dir dirs for all managers occupy less than \ref resource_manager::max_shard_disk_space_size. /// /// If they do, stop all end point managers that have more than one hints file - we don't want some DOWN Node to /// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout). diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 5d162ab35c..e4b0059ebf 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -735,6 +735,8 @@ storage_proxy::storage_proxy(distributed& db, stdx::optionalregister_metrics("hints_manager"); } storage_proxy::rh_entry::rh_entry(shared_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} From a6aae369da1e69a5ba9fa5ebebcbf5b53d855a37 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 25 May 2018 17:26:09 +0200 Subject: [PATCH 6/7] storage_proxy: add hints manager for views This commit adds a separate hints manager that serves only failed materialized view updates. --- service/storage_proxy.cc | 26 +++++++++++++++----------- service/storage_proxy.hh | 2 ++ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e4b0059ebf..5ed5a7744c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -633,7 +633,9 @@ void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep } storage_proxy::~storage_proxy() {} -storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) : _db(db) { +storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) + : _db(db) + , _hints_for_views_manager(_db.local().get_config().data_file_directories()[0] + "/view_pending_updates", {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db) { namespace sm = seastar::metrics; _metrics.add_group(COORDINATOR_STATS_CATEGORY, { sm::make_histogram("read_latency", sm::description("The general read latency histogram"), [this]{ return _stats.estimated_read.get_histogram(16, 20);}), @@ -737,6 +739,9 @@ storage_proxy::storage_proxy(distributed& db, stdx::optionalregister_metrics("hints_manager"); + _hints_for_views_manager.register_metrics("hints_for_views_manager"); + _hints_resource_manager.register_manager(*_hints_manager); + _hints_resource_manager.register_manager(_hints_for_views_manager); } storage_proxy::rh_entry::rh_entry(shared_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} @@ -1791,8 +1796,9 @@ template size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept { if (hints_enabled(type)) { - return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state)] (gms::inet_address target) mutable -> bool { - return _hints_manager->store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); + db::hints::manager& hints_manager = hints_manager_for(type); + return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { + return hints_manager.store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); }); } else { return 0; @@ -3644,6 +3650,10 @@ bool storage_proxy::hints_enabled(db::write_type type) noexcept { return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager)); } +db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) { + return type == db::write_type::VIEW ? _hints_for_views_manager : *_hints_manager; +} + future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) { slogger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname); @@ -4433,17 +4443,11 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, } future<> storage_proxy::start_hints_manager(shared_ptr gossiper_ptr, shared_ptr ss_ptr) { - if (_hints_manager) { - return _hints_manager->start(shared_from_this(), std::move(gossiper_ptr), std::move(ss_ptr)); - } - return make_ready_future<>(); + return _hints_resource_manager.start(shared_from_this(), gossiper_ptr, ss_ptr); } future<> storage_proxy::stop_hints_manager() { - if (_hints_manager) { - return _hints_manager->stop(); - } - return make_ready_future<>(); + return _hints_resource_manager.stop(); } future<> diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index a73aba75c0..2fa2e2fa93 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -151,6 +151,7 @@ private: circular_buffer _throttled_writes; db::hints::resource_manager _hints_resource_manager; stdx::optional _hints_manager; + db::hints::manager _hints_for_views_manager; bool _hints_enabled_for_user_writes = false; stats _stats; static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -181,6 +182,7 @@ private: template bool cannot_hint(const Range& targets, db::write_type type); bool hints_enabled(db::write_type type) noexcept; + db::hints::manager& hints_manager_for(db::write_type type); std::vector get_live_endpoints(keyspace& ks, const dht::token& token); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); From f12fdcffdb5e610d64a8201b275d37daa63996f0 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 28 May 2018 11:32:09 +0200 Subject: [PATCH 7/7] storage_proxy: restore optional hinted handoff Since hinted handoff for materialized views is now a separate entity, regular hinted handoff can go back to being optional. --- main.cc | 17 +++++++++-------- service/storage_proxy.cc | 18 ++++++++---------- service/storage_proxy.hh | 1 - 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/main.cc b/main.cc index df7107ff0d..bebd3ef03b 100644 --- a/main.cc +++ b/main.cc @@ -518,14 +518,15 @@ int main(int ac, char** av) { directories.insert(db.local().get_config().commitlog_directory()); supervisor::notify("creating hints directories"); - - boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory()); - dirs.touch_and_lock(db.local().get_config().hints_directory()).get(); - directories.insert(db.local().get_config().hints_directory()); - for (unsigned i = 0; i < smp::count; ++i) { - sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native()); - dirs.touch_and_lock(shard_dir).get(); - directories.insert(std::move(shard_dir)); + if (hinted_handoff_enabled) { + boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory()); + dirs.touch_and_lock(db.local().get_config().hints_directory()).get(); + directories.insert(db.local().get_config().hints_directory()); + for (unsigned i = 0; i < smp::count; ++i) { + sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native()); + dirs.touch_and_lock(shard_dir).get(); + directories.insert(std::move(shard_dir)); + } } boost::filesystem::path view_pending_updates_base_dir = boost::filesystem::path(db.local().get_config().data_file_directories()[0]) / "view_pending_updates"; sstring view_pending_updates_base_dir_str = view_pending_updates_base_dir.native(); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 5ed5a7744c..03c0755c11 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -726,21 +726,19 @@ storage_proxy::storage_proxy(distributed& db, stdx::optionalregister_metrics("hints_manager"); + _hints_resource_manager.register_manager(*_hints_manager); + } - _hints_manager->register_metrics("hints_manager"); _hints_for_views_manager.register_metrics("hints_for_views_manager"); - _hints_resource_manager.register_manager(*_hints_manager); _hints_resource_manager.register_manager(_hints_for_views_manager); } @@ -3647,7 +3645,7 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, dht::partiti } bool storage_proxy::hints_enabled(db::write_type type) noexcept { - return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager)); + return bool(_hints_manager) || type == db::write_type::VIEW; } db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 2fa2e2fa93..313c98138c 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -152,7 +152,6 @@ private: db::hints::resource_manager _hints_resource_manager; stdx::optional _hints_manager; db::hints::manager _hints_for_views_manager; - bool _hints_enabled_for_user_writes = false; stats _stats; static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10; // for read repair chance calculation