Merge seastar upstream

* seastar 397685c...c1dbd89 (13):
  > lowres_clock: drop cache-line alignment for _timer
  > net/packet: add missing include
  > Merge "Adding histogram and description support" from Amnon
  > reactor: Fix the error: cannot bind 'std::unique_ptr' lvalue to 'std::unique_ptr&&'
  > Set the option '--server' of tests/tcp_sctp_client to be required
  > core/memory: Remove superfluous assignment
  > core/memory: Remove dead code
  > core/reactor: Use logger instead of cerr
  > fix inverted logic in overprovision parameter
  > rpc: fix timeout checking condition
  > rpc: use lowres_clock instead of high resolution one
  > semaphore: make semaphore's clock configurable
  > rpc: detect timedout outgoing packets earlier

Includes treewide change to accomodate rpc changing its timeout clock
to lowres_clock.

Includes fixup from Amnon:

collectd api should use the metrics getters

As part of a preperation of the change in the metrics layer, this change
the way the collectd api uses the metrics value to use the getters
instead of calling the member directly.

This will be important when the internal implementation will changed
from union to variant.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1485457657-17634-1-git-send-email-amnon@scylladb.com>
This commit is contained in:
Amnon Heiman
2017-01-26 21:07:37 +02:00
committed by Avi Kivity
parent facb0aa6d9
commit 45b6070832
10 changed files with 30 additions and 30 deletions

View File

@@ -40,13 +40,13 @@ static auto transformer(const std::vector<collectd_value>& values) {
for (auto v: values) {
switch (v._type) {
case scollectd::data_type::GAUGE:
collected_value.values.push(v.u._d);
collected_value.values.push(v.d());
break;
case scollectd::data_type::DERIVE:
collected_value.values.push(v.u._i);
collected_value.values.push(v.i());
break;
default:
collected_value.values.push(v.u._ui);
collected_value.values.push(v.ui());
break;
}
}

View File

@@ -1055,7 +1055,7 @@ public:
class database {
public:
using timeout_clock = std::chrono::steady_clock;
using timeout_clock = lowres_clock;
private:
::cf_stats _cf_stats;
static constexpr size_t max_concurrent_reads() { return 100; }

View File

@@ -183,8 +183,8 @@ public:
using time_point = clock_type::time_point;
using sseg_ptr = lw_shared_ptr<segment>;
using request_controller_type = basic_semaphore<timeout_exception_factory>;
using request_controller_units = semaphore_units<timeout_exception_factory>;
using request_controller_type = basic_semaphore<timeout_exception_factory, commitlog::timeout_clock>;
using request_controller_units = semaphore_units<timeout_exception_factory, commitlog::timeout_clock>;
request_controller_type _request_controller;
stdx::optional<shared_future<with_clock<commitlog::timeout_clock>>> _segment_allocating;
@@ -382,7 +382,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
time_point _sync_time;
seastar::gate _gate;
uint64_t _write_waiters = 0;
utils::flush_queue<replay_position> _pending_ops;
utils::flush_queue<replay_position, std::less<replay_position>, clock_type> _pending_ops;
uint64_t _num_allocs = 0;

View File

@@ -94,7 +94,7 @@ using cf_id_type = utils::UUID;
*/
class commitlog {
public:
using timeout_clock = std::chrono::steady_clock;
using timeout_clock = lowres_clock;
class segment_manager;
class segment;

View File

@@ -200,7 +200,7 @@ private:
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
bool _stopping = false;
public:
using clock_type = std::chrono::steady_clock;
using clock_type = lowres_clock;
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
uint16_t port = 7000, bool listen_now = true);

Submodule seastar updated: 397685c5f0...c1dbd89896

View File

@@ -1622,14 +1622,14 @@ protected:
size_t _targets_count;
promise<> _done_promise; // all target responded
bool _timedout = false; // will be true if request timeouts
timer<> _timeout;
timer<lowres_clock> _timeout;
size_t _responses = 0;
schema_ptr _schema;
virtual void on_timeout() {}
virtual size_t response_count() const = 0;
public:
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, std::chrono::steady_clock::time_point timeout)
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout)
: _cl(cl)
, _targets_count(target_count)
, _schema(std::move(schema))
@@ -1685,7 +1685,7 @@ class digest_read_resolver : public abstract_read_resolver {
return _digest_results.size();
}
public:
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
@@ -2032,7 +2032,7 @@ private:
return false;
}
public:
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
_data_results.reserve(targets_count);
}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
@@ -2219,7 +2219,7 @@ protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
using clock_type = std::chrono::steady_clock;
using clock_type = lowres_clock;
schema_ptr _schema;
shared_ptr<storage_proxy> _proxy;
@@ -2343,7 +2343,7 @@ protected:
uint32_t original_partition_limit() const {
return _cmd->partition_limit;
}
void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
auto exec = shared_from_this();
@@ -2418,12 +2418,12 @@ protected:
}
});
}
void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout) {
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) {
reconcile(cl, timeout, _cmd);
}
public:
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::steady_clock::time_point timeout) {
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) {
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for, timeout);
auto exec = shared_from_this();
@@ -2493,7 +2493,7 @@ public:
class always_speculating_read_executor : public abstract_read_executor {
public:
using abstract_read_executor::abstract_read_executor;
virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) {
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
resolver->add_wait_targets(_targets.size());
// FIXME: consider disabling for CL=*ONE
bool want_digest = true;
@@ -2507,7 +2507,7 @@ class speculating_read_executor : public abstract_read_executor {
timer<> _speculate_timer;
public:
using abstract_read_executor::abstract_read_executor;
virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) {
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
_speculate_timer.set_callback([this, resolver, timeout] {
if (!resolver->is_completed()) { // at the time the callback runs request may be completed already
resolver->add_wait_targets(1); // we send one more request so wait for it too
@@ -2553,7 +2553,7 @@ class range_slice_read_executor : public abstract_read_executor {
public:
range_slice_read_executor(schema_ptr s, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state) :
abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::steady_clock::time_point timeout) override {
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) override {
reconcile(_cl, timeout);
return _result_promise.get_future();
}
@@ -2684,7 +2684,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) {
std::vector<::shared_ptr<abstract_read_executor>> exec;
exec.reserve(partition_ranges.size());
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
for (auto&& pr: partition_ranges) {
if (!pr.is_singular()) {
@@ -2708,7 +2708,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::parti
}
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t remaining_row_count, uint32_t remaining_partition_count) {
@@ -2812,7 +2812,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
dht::partition_range_vector ranges;
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
// expensive in clusters with vnodes)

View File

@@ -67,11 +67,11 @@ class mutation_holder;
class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*implements StorageProxyMBean*/ {
public:
using clock_type = std::chrono::steady_clock;
using clock_type = lowres_clock;
private:
struct rh_entry {
::shared_ptr<abstract_write_response_handler> handler;
timer<> expire_timer;
timer<lowres_clock> expire_timer;
rh_entry(::shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb);
};
@@ -252,7 +252,7 @@ private:
dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range);
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t remaining_row_count, uint32_t remaining_partition_count);

View File

@@ -35,10 +35,10 @@ namespace utils {
* when all func+post-ops for lower valued keys (T) are
* completed.
*/
template<typename T, typename Comp = std::less<T>>
template<typename T, typename Comp = std::less<T>, typename Clock = steady_clock_type>
class flush_queue {
public:
using timeout_clock = steady_clock_type;
using timeout_clock = Clock;
using time_point = typename timeout_clock::time_point;
using promise_type = shared_promise<with_clock<timeout_clock>>;
private:

View File

@@ -122,7 +122,7 @@ public:
// Groups regions for the purpose of statistics. Can be nested.
class region_group {
using timeout_clock = std::chrono::steady_clock;
using timeout_clock = lowres_clock;
static region_group_reclaimer no_reclaimer;