Merge seastar upstream
* seastar 397685c...c1dbd89 (13): > lowres_clock: drop cache-line alignment for _timer > net/packet: add missing include > Merge "Adding histogram and description support" from Amnon > reactor: Fix the error: cannot bind 'std::unique_ptr' lvalue to 'std::unique_ptr&&' > Set the option '--server' of tests/tcp_sctp_client to be required > core/memory: Remove superfluous assignment > core/memory: Remove dead code > core/reactor: Use logger instead of cerr > fix inverted logic in overprovision parameter > rpc: fix timeout checking condition > rpc: use lowres_clock instead of high resolution one > semaphore: make semaphore's clock configurable > rpc: detect timedout outgoing packets earlier Includes treewide change to accomodate rpc changing its timeout clock to lowres_clock. Includes fixup from Amnon: collectd api should use the metrics getters As part of a preperation of the change in the metrics layer, this change the way the collectd api uses the metrics value to use the getters instead of calling the member directly. This will be important when the internal implementation will changed from union to variant. Signed-off-by: Amnon Heiman <amnon@scylladb.com> Message-Id: <1485457657-17634-1-git-send-email-amnon@scylladb.com>
This commit is contained in:
@@ -40,13 +40,13 @@ static auto transformer(const std::vector<collectd_value>& values) {
|
||||
for (auto v: values) {
|
||||
switch (v._type) {
|
||||
case scollectd::data_type::GAUGE:
|
||||
collected_value.values.push(v.u._d);
|
||||
collected_value.values.push(v.d());
|
||||
break;
|
||||
case scollectd::data_type::DERIVE:
|
||||
collected_value.values.push(v.u._i);
|
||||
collected_value.values.push(v.i());
|
||||
break;
|
||||
default:
|
||||
collected_value.values.push(v.u._ui);
|
||||
collected_value.values.push(v.ui());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1055,7 +1055,7 @@ public:
|
||||
|
||||
class database {
|
||||
public:
|
||||
using timeout_clock = std::chrono::steady_clock;
|
||||
using timeout_clock = lowres_clock;
|
||||
private:
|
||||
::cf_stats _cf_stats;
|
||||
static constexpr size_t max_concurrent_reads() { return 100; }
|
||||
|
||||
@@ -183,8 +183,8 @@ public:
|
||||
using time_point = clock_type::time_point;
|
||||
using sseg_ptr = lw_shared_ptr<segment>;
|
||||
|
||||
using request_controller_type = basic_semaphore<timeout_exception_factory>;
|
||||
using request_controller_units = semaphore_units<timeout_exception_factory>;
|
||||
using request_controller_type = basic_semaphore<timeout_exception_factory, commitlog::timeout_clock>;
|
||||
using request_controller_units = semaphore_units<timeout_exception_factory, commitlog::timeout_clock>;
|
||||
request_controller_type _request_controller;
|
||||
|
||||
stdx::optional<shared_future<with_clock<commitlog::timeout_clock>>> _segment_allocating;
|
||||
@@ -382,7 +382,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
|
||||
time_point _sync_time;
|
||||
seastar::gate _gate;
|
||||
uint64_t _write_waiters = 0;
|
||||
utils::flush_queue<replay_position> _pending_ops;
|
||||
utils::flush_queue<replay_position, std::less<replay_position>, clock_type> _pending_ops;
|
||||
|
||||
uint64_t _num_allocs = 0;
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ using cf_id_type = utils::UUID;
|
||||
*/
|
||||
class commitlog {
|
||||
public:
|
||||
using timeout_clock = std::chrono::steady_clock;
|
||||
using timeout_clock = lowres_clock;
|
||||
|
||||
class segment_manager;
|
||||
class segment;
|
||||
|
||||
@@ -200,7 +200,7 @@ private:
|
||||
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
|
||||
bool _stopping = false;
|
||||
public:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using clock_type = lowres_clock;
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
|
||||
uint16_t port = 7000, bool listen_now = true);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 397685c5f0...c1dbd89896
@@ -1622,14 +1622,14 @@ protected:
|
||||
size_t _targets_count;
|
||||
promise<> _done_promise; // all target responded
|
||||
bool _timedout = false; // will be true if request timeouts
|
||||
timer<> _timeout;
|
||||
timer<lowres_clock> _timeout;
|
||||
size_t _responses = 0;
|
||||
schema_ptr _schema;
|
||||
|
||||
virtual void on_timeout() {}
|
||||
virtual size_t response_count() const = 0;
|
||||
public:
|
||||
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, std::chrono::steady_clock::time_point timeout)
|
||||
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout)
|
||||
: _cl(cl)
|
||||
, _targets_count(target_count)
|
||||
, _schema(std::move(schema))
|
||||
@@ -1685,7 +1685,7 @@ class digest_read_resolver : public abstract_read_resolver {
|
||||
return _digest_results.size();
|
||||
}
|
||||
public:
|
||||
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
|
||||
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
|
||||
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
|
||||
if (!_timedout) {
|
||||
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
|
||||
@@ -2032,7 +2032,7 @@ private:
|
||||
return false;
|
||||
}
|
||||
public:
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
|
||||
_data_results.reserve(targets_count);
|
||||
}
|
||||
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
|
||||
@@ -2219,7 +2219,7 @@ protected:
|
||||
using targets_iterator = std::vector<gms::inet_address>::iterator;
|
||||
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
|
||||
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
schema_ptr _schema;
|
||||
shared_ptr<storage_proxy> _proxy;
|
||||
@@ -2343,7 +2343,7 @@ protected:
|
||||
uint32_t original_partition_limit() const {
|
||||
return _cmd->partition_limit;
|
||||
}
|
||||
void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
||||
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
@@ -2418,12 +2418,12 @@ protected:
|
||||
}
|
||||
});
|
||||
}
|
||||
void reconcile(db::consistency_level cl, std::chrono::steady_clock::time_point timeout) {
|
||||
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) {
|
||||
reconcile(cl, timeout, _cmd);
|
||||
}
|
||||
|
||||
public:
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::steady_clock::time_point timeout) {
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) {
|
||||
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for, timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
@@ -2493,7 +2493,7 @@ public:
|
||||
class always_speculating_read_executor : public abstract_read_executor {
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) {
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
|
||||
resolver->add_wait_targets(_targets.size());
|
||||
// FIXME: consider disabling for CL=*ONE
|
||||
bool want_digest = true;
|
||||
@@ -2507,7 +2507,7 @@ class speculating_read_executor : public abstract_read_executor {
|
||||
timer<> _speculate_timer;
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, std::chrono::steady_clock::time_point timeout) {
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
|
||||
_speculate_timer.set_callback([this, resolver, timeout] {
|
||||
if (!resolver->is_completed()) { // at the time the callback runs request may be completed already
|
||||
resolver->add_wait_targets(1); // we send one more request so wait for it too
|
||||
@@ -2553,7 +2553,7 @@ class range_slice_read_executor : public abstract_read_executor {
|
||||
public:
|
||||
range_slice_read_executor(schema_ptr s, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state) :
|
||||
abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {}
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::steady_clock::time_point timeout) override {
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) override {
|
||||
reconcile(_cl, timeout);
|
||||
return _result_promise.get_future();
|
||||
}
|
||||
@@ -2684,7 +2684,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>>
|
||||
storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) {
|
||||
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
||||
exec.reserve(partition_ranges.size());
|
||||
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
|
||||
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
|
||||
|
||||
for (auto&& pr: partition_ranges) {
|
||||
if (!pr.is_singular()) {
|
||||
@@ -2708,7 +2708,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::parti
|
||||
}
|
||||
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
|
||||
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
|
||||
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
|
||||
uint32_t remaining_row_count, uint32_t remaining_partition_count) {
|
||||
@@ -2812,7 +2812,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
||||
dht::partition_range_vector ranges;
|
||||
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
|
||||
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
|
||||
|
||||
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
|
||||
// expensive in clusters with vnodes)
|
||||
|
||||
@@ -67,11 +67,11 @@ class mutation_holder;
|
||||
|
||||
class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*implements StorageProxyMBean*/ {
|
||||
public:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using clock_type = lowres_clock;
|
||||
private:
|
||||
struct rh_entry {
|
||||
::shared_ptr<abstract_write_response_handler> handler;
|
||||
timer<> expire_timer;
|
||||
timer<lowres_clock> expire_timer;
|
||||
rh_entry(::shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb);
|
||||
};
|
||||
|
||||
@@ -252,7 +252,7 @@ private:
|
||||
dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range);
|
||||
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
|
||||
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
|
||||
uint32_t remaining_row_count, uint32_t remaining_partition_count);
|
||||
|
||||
@@ -35,10 +35,10 @@ namespace utils {
|
||||
* when all func+post-ops for lower valued keys (T) are
|
||||
* completed.
|
||||
*/
|
||||
template<typename T, typename Comp = std::less<T>>
|
||||
template<typename T, typename Comp = std::less<T>, typename Clock = steady_clock_type>
|
||||
class flush_queue {
|
||||
public:
|
||||
using timeout_clock = steady_clock_type;
|
||||
using timeout_clock = Clock;
|
||||
using time_point = typename timeout_clock::time_point;
|
||||
using promise_type = shared_promise<with_clock<timeout_clock>>;
|
||||
private:
|
||||
|
||||
@@ -122,7 +122,7 @@ public:
|
||||
|
||||
// Groups regions for the purpose of statistics. Can be nested.
|
||||
class region_group {
|
||||
using timeout_clock = std::chrono::steady_clock;
|
||||
using timeout_clock = lowres_clock;
|
||||
|
||||
static region_group_reclaimer no_reclaimer;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user