diff --git a/api/collectd.cc b/api/collectd.cc index a7963bde1a..8dce92c8dc 100644 --- a/api/collectd.cc +++ b/api/collectd.cc @@ -40,13 +40,13 @@ static auto transformer(const std::vector& values) { for (auto v: values) { switch (v._type) { case scollectd::data_type::GAUGE: - collected_value.values.push(v.u._d); + collected_value.values.push(v.d()); break; case scollectd::data_type::DERIVE: - collected_value.values.push(v.u._i); + collected_value.values.push(v.i()); break; default: - collected_value.values.push(v.u._ui); + collected_value.values.push(v.ui()); break; } } diff --git a/database.hh b/database.hh index 41359bb4f5..131db4052c 100644 --- a/database.hh +++ b/database.hh @@ -1055,7 +1055,7 @@ public: class database { public: - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; private: ::cf_stats _cf_stats; static constexpr size_t max_concurrent_reads() { return 100; } diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index ebd9e28696..f0864e2fbf 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -183,8 +183,8 @@ public: using time_point = clock_type::time_point; using sseg_ptr = lw_shared_ptr; - using request_controller_type = basic_semaphore; - using request_controller_units = semaphore_units; + using request_controller_type = basic_semaphore; + using request_controller_units = semaphore_units; request_controller_type _request_controller; stdx::optional>> _segment_allocating; @@ -382,7 +382,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this { time_point _sync_time; seastar::gate _gate; uint64_t _write_waiters = 0; - utils::flush_queue _pending_ops; + utils::flush_queue, clock_type> _pending_ops; uint64_t _num_allocs = 0; diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index d5e7132a42..a58c4e6ba1 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -94,7 +94,7 @@ using cf_id_type = utils::UUID; */ class commitlog { public: - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; class segment_manager; class segment; diff --git a/message/messaging_service.hh b/message/messaging_service.hh index fbba13e147..4d5f76c453 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -200,7 +200,7 @@ private: uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; bool _stopping = false; public: - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000, bool listen_now = true); diff --git a/seastar b/seastar index 397685c5f0..c1dbd89896 160000 --- a/seastar +++ b/seastar @@ -1 +1 @@ -Subproject commit 397685c5f0cda0216177f2a287ef4f8ead73adeb +Subproject commit c1dbd89896ec120fd4347e15f8ec46955a582a48 diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c27599be9e..d916a4d13d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1622,14 +1622,14 @@ protected: size_t _targets_count; promise<> _done_promise; // all target responded bool _timedout = false; // will be true if request timeouts - timer<> _timeout; + timer _timeout; size_t _responses = 0; schema_ptr _schema; virtual void on_timeout() {} virtual size_t response_count() const = 0; public: - abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, std::chrono::steady_clock::time_point timeout) + abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout) : _cl(cl) , _targets_count(target_count) , _schema(std::move(schema)) @@ -1685,7 +1685,7 @@ class digest_read_resolver : public abstract_read_resolver { return _digest_results.size(); } public: - digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} + digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} void add_data(gms::inet_address from, foreign_ptr> result) { if (!_timedout) { // if only one target was queried digest_check() will be skipped so we can also skip digest calculation @@ -2032,7 +2032,7 @@ private: return false; } public: - data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { + data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { @@ -2219,7 +2219,7 @@ protected: using targets_iterator = std::vector::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; schema_ptr _schema; shared_ptr _proxy; @@ -2343,7 +2343,7 @@ protected: uint32_t original_partition_limit() const { return _cmd->partition_limit; } - void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr cmd) { + void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr cmd) { data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); @@ -2418,12 +2418,12 @@ protected: } }); } - void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout) { + void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) { reconcile(cl, timeout, _cmd); } public: - virtual future>> execute(std::chrono::steady_clock::time_point timeout) { + virtual future>> execute(lowres_clock::time_point timeout) { digest_resolver_ptr digest_resolver = ::make_shared(_schema, _cl, _block_for, timeout); auto exec = shared_from_this(); @@ -2493,7 +2493,7 @@ public: class always_speculating_read_executor : public abstract_read_executor { public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { resolver->add_wait_targets(_targets.size()); // FIXME: consider disabling for CL=*ONE bool want_digest = true; @@ -2507,7 +2507,7 @@ class speculating_read_executor : public abstract_read_executor { timer<> _speculate_timer; public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) { + virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) { _speculate_timer.set_callback([this, resolver, timeout] { if (!resolver->is_completed()) { // at the time the callback runs request may be completed already resolver->add_wait_targets(1); // we send one more request so wait for it too @@ -2553,7 +2553,7 @@ class range_slice_read_executor : public abstract_read_executor { public: range_slice_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, std::vector targets, tracing::trace_state_ptr trace_state) : abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {} - virtual future>> execute(std::chrono::steady_clock::time_point timeout) override { + virtual future>> execute(lowres_clock::time_point timeout) override { reconcile(_cl, timeout); return _result_promise.get_future(); } @@ -2684,7 +2684,7 @@ future>> storage_proxy::query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) { std::vector<::shared_ptr> exec; exec.reserve(partition_ranges.size()); - auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); for (auto&& pr: partition_ranges) { if (!pr.is_singular()) { @@ -2708,7 +2708,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, dht::parti } future>>> -storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector>>&& results, +storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count) { @@ -2812,7 +2812,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); dht::partition_range_vector ranges; - auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); + auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be // expensive in clusters with vnodes) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b4eed4c39a..9bde9742c0 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -67,11 +67,11 @@ class mutation_holder; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { public: - using clock_type = std::chrono::steady_clock; + using clock_type = lowres_clock; private: struct rh_entry { ::shared_ptr handler; - timer<> expire_timer; + timer expire_timer; rh_entry(::shared_ptr&& h, std::function&& cb); }; @@ -252,7 +252,7 @@ private: dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); static std::vector intersection(const std::vector& l1, const std::vector& l2); - future>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, + future>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i, dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t remaining_row_count, uint32_t remaining_partition_count); diff --git a/utils/flush_queue.hh b/utils/flush_queue.hh index 7902256693..56855cb0b8 100644 --- a/utils/flush_queue.hh +++ b/utils/flush_queue.hh @@ -35,10 +35,10 @@ namespace utils { * when all func+post-ops for lower valued keys (T) are * completed. */ -template> +template, typename Clock = steady_clock_type> class flush_queue { public: - using timeout_clock = steady_clock_type; + using timeout_clock = Clock; using time_point = typename timeout_clock::time_point; using promise_type = shared_promise>; private: diff --git a/utils/logalloc.hh b/utils/logalloc.hh index dfa0fb4e43..b947ee27ac 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -122,7 +122,7 @@ public: // Groups regions for the purpose of statistics. Can be nested. class region_group { - using timeout_clock = std::chrono::steady_clock; + using timeout_clock = lowres_clock; static region_group_reclaimer no_reclaimer;