From 37fc0e684045a5dc0cdbcbcf0ffcc68c604f24bd Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 6 Feb 2017 11:52:37 +0200 Subject: [PATCH] storage_proxy: use storage_proxy clock instead of explicit lowres_clock Merge commit 45b607083290e7899 used butchered version of storage_proxy patch to adjust to rpc timer change instead the one I've sent. This patch fixes the differences. Message-Id: <20170206095237.GA7691@scylladb.com> (cherry picked from commit 3c372525edaaec061cfdf7b3a68be20e7eb745cf) --- service/storage_proxy.cc | 30 +++++++++++++++--------------- service/storage_proxy.hh | 4 ++-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e720c82c3b..23770132f6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1733,14 +1733,14 @@ protected: size_t _targets_count; promise<> _done_promise; // all target responded bool _timedout = false; // will be true if request timeouts - timer _timeout; + timer _timeout; size_t _responses = 0; schema_ptr _schema; virtual void on_timeout() {} virtual size_t response_count() const = 0; public: - abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout) + abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, storage_proxy::clock_type::time_point timeout) : _cl(cl) , _targets_count(target_count) , _schema(std::move(schema)) @@ -1796,7 +1796,7 @@ class digest_read_resolver : public abstract_read_resolver { return _digest_results.size(); } public: - digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} + digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} void add_data(gms::inet_address from, foreign_ptr> result) { if (!_timedout) { // if only one target was queried digest_check() will be skipped so we can also skip digest calculation @@ -2143,7 +2143,7 @@ private: return false; } public: - data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { + data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { @@ -2330,7 +2330,7 @@ protected: using targets_iterator = std::vector::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; - using clock_type = lowres_clock; + using clock_type = storage_proxy::clock_type; schema_ptr _schema; shared_ptr _proxy; @@ -2454,7 +2454,7 @@ protected: uint32_t original_partition_limit() const { return _cmd->partition_limit; } - void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr cmd) { + void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr cmd) { data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); @@ -2529,12 +2529,12 @@ protected: } }); } - void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) { + void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout) { reconcile(cl, timeout, _cmd); } public: - virtual future>> execute(lowres_clock::time_point timeout) { + virtual future>> execute(storage_proxy::clock_type::time_point timeout) { digest_resolver_ptr digest_resolver = ::make_shared(_schema, _cl, _block_for, timeout); auto exec = shared_from_this(); @@ -2604,7 +2604,7 @@ public: class always_speculating_read_executor : public abstract_read_executor { public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); // FIXME: consider disabling for CL=*ONE bool want_digest = true; @@ -2615,10 +2615,10 @@ public: // this executor sends request to an additional replica after some time below timeout class speculating_read_executor : public abstract_read_executor { - timer<> _speculate_timer; + timer _speculate_timer; public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { _speculate_timer.set_callback([this, resolver, timeout] { if (!resolver->is_completed()) { // at the time the callback runs request may be completed already resolver->add_wait_targets(1); // we send one more request so wait for it too @@ -2664,7 +2664,7 @@ class range_slice_read_executor : public abstract_read_executor { public: range_slice_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, std::vector targets, tracing::trace_state_ptr trace_state) : abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {} - virtual future>> execute(lowres_clock::time_point timeout) override { + virtual future>> execute(storage_proxy::clock_type::time_point timeout) override { reconcile(_cl, timeout); return _result_promise.get_future(); } @@ -2795,7 +2795,7 @@ future>> storage_proxy::query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) { std::vector<::shared_ptr> exec; exec.reserve(partition_ranges.size()); - auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = storage_proxy::clock_type::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); for (auto&& pr: partition_ranges) { if (!pr.is_singular()) { @@ -2819,7 +2819,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, dht::parti } future>>> -storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector>>&& results, +storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count) { @@ -2923,7 +2923,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); dht::partition_range_vector ranges; - auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = storage_proxy::clock_type::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be // expensive in clusters with vnodes) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 2eaad88cec..a9e6400466 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -71,7 +71,7 @@ public: private: struct rh_entry { ::shared_ptr handler; - timer expire_timer; + timer expire_timer; rh_entry(::shared_ptr&& h, std::function&& cb); }; @@ -253,7 +253,7 @@ private: dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); static std::vector intersection(const std::vector& l1, const std::vector& l2); - future>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout, + future>>> query_partition_key_range_concurrent(clock_type::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count);