From 45b607083290e7899bc3c6b9394463aba2707211 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 26 Jan 2017 21:07:37 +0200 Subject: [PATCH] Merge seastar upstream * seastar 397685c...c1dbd89 (13): > lowres_clock: drop cache-line alignment for _timer > net/packet: add missing include > Merge "Adding histogram and description support" from Amnon > reactor: Fix the error: cannot bind 'std::unique_ptr' lvalue to 'std::unique_ptr&&' > Set the option '--server' of tests/tcp_sctp_client to be required > core/memory: Remove superfluous assignment > core/memory: Remove dead code > core/reactor: Use logger instead of cerr > fix inverted logic in overprovision parameter > rpc: fix timeout checking condition > rpc: use lowres_clock instead of high resolution one > semaphore: make semaphore's clock configurable > rpc: detect timedout outgoing packets earlier Includes treewide change to accomodate rpc changing its timeout clock to lowres_clock. Includes fixup from Amnon: collectd api should use the metrics getters As part of a preperation of the change in the metrics layer, this change the way the collectd api uses the metrics value to use the getters instead of calling the member directly. This will be important when the internal implementation will changed from union to variant. Signed-off-by: Amnon Heiman Message-Id: <1485457657-17634-1-git-send-email-amnon@scylladb.com> --- api/collectd.cc | 6 +++--- database.hh | 2 +- db/commitlog/commitlog.cc | 6 +++--- db/commitlog/commitlog.hh | 2 +- message/messaging_service.hh | 2 +- seastar | 2 +- service/storage_proxy.cc | 28 ++++++++++++++-------------- service/storage_proxy.hh | 6 +++--- utils/flush_queue.hh | 4 ++-- utils/logalloc.hh | 2 +- 10 files changed, 30 insertions(+), 30 deletions(-) diff --git a/api/collectd.cc b/api/collectd.cc index a7963bde1a..8dce92c8dc 100644 --- a/api/collectd.cc +++ b/api/collectd.cc @@ -40,13 +40,13 @@ static auto transformer(const std::vector& values) { for (auto v: values) { switch (v._type) { case scollectd::data_type::GAUGE: - collected_value.values.push(v.u._d); + collected_value.values.push(v.d()); break; case scollectd::data_type::DERIVE: - collected_value.values.push(v.u._i); + collected_value.values.push(v.i()); break; default: - collected_value.values.push(v.u._ui); + collected_value.values.push(v.ui()); break; } } diff --git a/database.hh b/database.hh index 41359bb4f5..131db4052c 100644 --- a/database.hh +++ b/database.hh @@ -1055,7 +1055,7 @@ public: class database { public: - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; private: ::cf_stats _cf_stats; static constexpr size_t max_concurrent_reads() { return 100; } diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index ebd9e28696..f0864e2fbf 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -183,8 +183,8 @@ public: using time_point = clock_type::time_point; using sseg_ptr = lw_shared_ptr; - using request_controller_type = basic_semaphore; - using request_controller_units = semaphore_units; + using request_controller_type = basic_semaphore; + using request_controller_units = semaphore_units; request_controller_type _request_controller; stdx::optional>> _segment_allocating; @@ -382,7 +382,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this { time_point _sync_time; seastar::gate _gate; uint64_t _write_waiters = 0; - utils::flush_queue _pending_ops; + utils::flush_queue, clock_type> _pending_ops; uint64_t _num_allocs = 0; diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index d5e7132a42..a58c4e6ba1 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -94,7 +94,7 @@ using cf_id_type = utils::UUID; */ class commitlog { public: - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; class segment_manager; class segment; diff --git a/message/messaging_service.hh b/message/messaging_service.hh index fbba13e147..4d5f76c453 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -200,7 +200,7 @@ private: uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; bool _stopping = false; public: - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000, bool listen_now = true); diff --git a/seastar b/seastar index 397685c5f0..c1dbd89896 160000 --- a/seastar +++ b/seastar @@ -1 +1 @@ -Subproject commit 397685c5f0cda0216177f2a287ef4f8ead73adeb +Subproject commit c1dbd89896ec120fd4347e15f8ec46955a582a48 diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c27599be9e..d916a4d13d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1622,14 +1622,14 @@ protected: size_t _targets_count; promise<> _done_promise; // all target responded bool _timedout = false; // will be true if request timeouts - timer<> _timeout; + timer _timeout; size_t _responses = 0; schema_ptr _schema; virtual void on_timeout() {} virtual size_t response_count() const = 0; public: - abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, std::chrono::steady_clock::time_point timeout) + abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout) : _cl(cl) , _targets_count(target_count) , _schema(std::move(schema)) @@ -1685,7 +1685,7 @@ class digest_read_resolver : public abstract_read_resolver { return _digest_results.size(); } public: - digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} + digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} void add_data(gms::inet_address from, foreign_ptr> result) { if (!_timedout) { // if only one target was queried digest_check() will be skipped so we can also skip digest calculation @@ -2032,7 +2032,7 @@ private: return false; } public: - data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { + data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { @@ -2219,7 +2219,7 @@ protected: using targets_iterator = std::vector::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; schema_ptr _schema; shared_ptr _proxy; @@ -2343,7 +2343,7 @@ protected: uint32_t original_partition_limit() const { return _cmd->partition_limit; } - void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr cmd) { + void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr cmd) { data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); @@ -2418,12 +2418,12 @@ protected: } }); } - void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout) { + void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) { reconcile(cl, timeout, _cmd); } public: - virtual future>> execute(std::chrono::steady_clock::time_point timeout) { + virtual future>> execute(lowres_clock::time_point timeout) { digest_resolver_ptr digest_resolver = ::make_shared(_schema, _cl, _block_for, timeout); auto exec = shared_from_this(); @@ -2493,7 +2493,7 @@ public: class always_speculating_read_executor : public abstract_read_executor { public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { resolver->add_wait_targets(_targets.size()); // FIXME: consider disabling for CL=*ONE bool want_digest = true; @@ -2507,7 +2507,7 @@ class speculating_read_executor : public abstract_read_executor { timer<> _speculate_timer; public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { _speculate_timer.set_callback([this, resolver, timeout] { if (!resolver->is_completed()) { // at the time the callback runs request may be completed already resolver->add_wait_targets(1); // we send one more request so wait for it too @@ -2553,7 +2553,7 @@ class range_slice_read_executor : public abstract_read_executor { public: range_slice_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, std::vector targets, tracing::trace_state_ptr trace_state) : abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {} - virtual future>> execute(std::chrono::steady_clock::time_point timeout) override { + virtual future>> execute(lowres_clock::time_point timeout) override { reconcile(_cl, timeout); return _result_promise.get_future(); } @@ -2684,7 +2684,7 @@ future>> storage_proxy::query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) { std::vector<::shared_ptr> exec; exec.reserve(partition_ranges.size()); - auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); for (auto&& pr: partition_ranges) { if (!pr.is_singular()) { @@ -2708,7 +2708,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, dht::parti } future>>> -storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector>>&& results, +storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count) { @@ -2812,7 +2812,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); dht::partition_range_vector ranges; - auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be // expensive in clusters with vnodes) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b4eed4c39a..9bde9742c0 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -67,11 +67,11 @@ class mutation_holder; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { public: - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; private: struct rh_entry { ::shared_ptr handler; - timer<> expire_timer; + timer expire_timer; rh_entry(::shared_ptr&& h, std::function&& cb); }; @@ -252,7 +252,7 @@ private: dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); static std::vector intersection(const std::vector& l1, const std::vector& l2); - future>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, + future>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count); diff --git a/utils/flush_queue.hh b/utils/flush_queue.hh index 7902256693..56855cb0b8 100644 --- a/utils/flush_queue.hh +++ b/utils/flush_queue.hh @@ -35,10 +35,10 @@ namespace utils { * when all func+post-ops for lower valued keys (T) are * completed. */ -template> +template, typename Clock = steady_clock_type> class flush_queue { public: - using timeout_clock = steady_clock_type; + using timeout_clock = Clock; using time_point = typename timeout_clock::time_point; using promise_type = shared_promise>; private: diff --git a/utils/logalloc.hh b/utils/logalloc.hh index dfa0fb4e43..b947ee27ac 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -122,7 +122,7 @@ public: // Groups regions for the purpose of statistics. Can be nested. class region_group { - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; static region_group_reclaimer no_reclaimer;