From a6aae369da1e69a5ba9fa5ebebcbf5b53d855a37 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 25 May 2018 17:26:09 +0200 Subject: [PATCH] storage_proxy: add hints manager for views This commit adds a separate hints manager that serves only failed materialized view updates. --- service/storage_proxy.cc | 26 +++++++++++++++----------- service/storage_proxy.hh | 2 ++ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e4b0059ebf..5ed5a7744c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -633,7 +633,9 @@ void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep } storage_proxy::~storage_proxy() {} -storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) : _db(db) { +storage_proxy::storage_proxy(distributed& db, stdx::optional> hinted_handoff_enabled) + : _db(db) + , _hints_for_views_manager(_db.local().get_config().data_file_directories()[0] + "/view_pending_updates", {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db) { namespace sm = seastar::metrics; _metrics.add_group(COORDINATOR_STATS_CATEGORY, { sm::make_histogram("read_latency", sm::description("The general read latency histogram"), [this]{ return _stats.estimated_read.get_histogram(16, 20);}), @@ -737,6 +739,9 @@ storage_proxy::storage_proxy(distributed& db, stdx::optionalregister_metrics("hints_manager"); + _hints_for_views_manager.register_metrics("hints_for_views_manager"); + _hints_resource_manager.register_manager(*_hints_manager); + _hints_resource_manager.register_manager(_hints_for_views_manager); } storage_proxy::rh_entry::rh_entry(shared_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} @@ -1791,8 +1796,9 @@ template size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept { if (hints_enabled(type)) { - return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state)] (gms::inet_address target) mutable -> bool { - return _hints_manager->store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); + db::hints::manager& hints_manager = hints_manager_for(type); + return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { + return hints_manager.store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state); }); } else { return 0; @@ -3644,6 +3650,10 @@ bool storage_proxy::hints_enabled(db::write_type type) noexcept { return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager)); } +db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) { + return type == db::write_type::VIEW ? _hints_for_views_manager : *_hints_manager; +} + future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) { slogger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname); @@ -4433,17 +4443,11 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, } future<> storage_proxy::start_hints_manager(shared_ptr gossiper_ptr, shared_ptr ss_ptr) { - if (_hints_manager) { - return _hints_manager->start(shared_from_this(), std::move(gossiper_ptr), std::move(ss_ptr)); - } - return make_ready_future<>(); + return _hints_resource_manager.start(shared_from_this(), gossiper_ptr, ss_ptr); } future<> storage_proxy::stop_hints_manager() { - if (_hints_manager) { - return _hints_manager->stop(); - } - return make_ready_future<>(); + return _hints_resource_manager.stop(); } future<> diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index a73aba75c0..2fa2e2fa93 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -151,6 +151,7 @@ private: circular_buffer _throttled_writes; db::hints::resource_manager _hints_resource_manager; stdx::optional _hints_manager; + db::hints::manager _hints_for_views_manager; bool _hints_enabled_for_user_writes = false; stats _stats; static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -181,6 +182,7 @@ private: template bool cannot_hint(const Range& targets, db::write_type type); bool hints_enabled(db::write_type type) noexcept; + db::hints::manager& hints_manager_for(db::write_type type); std::vector get_live_endpoints(keyspace& ks, const dht::token& token); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s);