storage_proxy: use storage_proxy clock instead of explicit lowres_clock

Merge commit 45b6070832 used butchered version of storage_proxy
patch to adjust to rpc timer change instead the one I've sent. This
patch fixes the differences.

Message-Id: <20170206095237.GA7691@scylladb.com>
(cherry picked from commit 3c372525ed)
This commit is contained in:
Gleb Natapov
2017-02-06 11:52:37 +02:00
committed by Avi Kivity
parent 0429e5d8ea
commit 37fc0e6840
2 changed files with 17 additions and 17 deletions

View File

@@ -1733,14 +1733,14 @@ protected:
size_t _targets_count;
promise<> _done_promise; // all target responded
bool _timedout = false; // will be true if request timeouts
timer<lowres_clock> _timeout;
timer<storage_proxy::clock_type> _timeout;
size_t _responses = 0;
schema_ptr _schema;
virtual void on_timeout() {}
virtual size_t response_count() const = 0;
public:
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, lowres_clock::time_point timeout)
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, storage_proxy::clock_type::time_point timeout)
: _cl(cl)
, _targets_count(target_count)
, _schema(std::move(schema))
@@ -1796,7 +1796,7 @@ class digest_read_resolver : public abstract_read_resolver {
return _digest_results.size();
}
public:
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
@@ -2143,7 +2143,7 @@ private:
return false;
}
public:
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, lowres_clock::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
_data_results.reserve(targets_count);
}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
@@ -2330,7 +2330,7 @@ protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
using clock_type = lowres_clock;
using clock_type = storage_proxy::clock_type;
schema_ptr _schema;
shared_ptr<storage_proxy> _proxy;
@@ -2454,7 +2454,7 @@ protected:
uint32_t original_partition_limit() const {
return _cmd->partition_limit;
}
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
auto exec = shared_from_this();
@@ -2529,12 +2529,12 @@ protected:
}
});
}
void reconcile(db::consistency_level cl, lowres_clock::time_point timeout) {
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout) {
reconcile(cl, timeout, _cmd);
}
public:
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) {
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for, timeout);
auto exec = shared_from_this();
@@ -2604,7 +2604,7 @@ public:
class always_speculating_read_executor : public abstract_read_executor {
public:
using abstract_read_executor::abstract_read_executor;
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
resolver->add_wait_targets(_targets.size());
// FIXME: consider disabling for CL=*ONE
bool want_digest = true;
@@ -2615,10 +2615,10 @@ public:
// this executor sends request to an additional replica after some time below timeout
class speculating_read_executor : public abstract_read_executor {
timer<> _speculate_timer;
timer<storage_proxy::clock_type> _speculate_timer;
public:
using abstract_read_executor::abstract_read_executor;
virtual future<> make_requests(digest_resolver_ptr resolver, lowres_clock::time_point timeout) {
virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
_speculate_timer.set_callback([this, resolver, timeout] {
if (!resolver->is_completed()) { // at the time the callback runs request may be completed already
resolver->add_wait_targets(1); // we send one more request so wait for it too
@@ -2664,7 +2664,7 @@ class range_slice_read_executor : public abstract_read_executor {
public:
range_slice_read_executor(schema_ptr s, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state) :
abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(lowres_clock::time_point timeout) override {
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) override {
reconcile(_cl, timeout);
return _result_promise.get_future();
}
@@ -2795,7 +2795,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state) {
std::vector<::shared_ptr<abstract_read_executor>> exec;
exec.reserve(partition_ranges.size());
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
auto timeout = storage_proxy::clock_type::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
for (auto&& pr: partition_ranges) {
if (!pr.is_singular()) {
@@ -2819,7 +2819,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, dht::parti
}
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(lowres_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t remaining_row_count, uint32_t remaining_partition_count) {
@@ -2923,7 +2923,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
dht::partition_range_vector ranges;
auto timeout = lowres_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
auto timeout = storage_proxy::clock_type::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
// expensive in clusters with vnodes)

View File

@@ -71,7 +71,7 @@ public:
private:
struct rh_entry {
::shared_ptr<abstract_write_response_handler> handler;
timer<lowres_clock> expire_timer;
timer<clock_type> expire_timer;
rh_entry(::shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb);
};
@@ -253,7 +253,7 @@ private:
dht::partition_range_vector get_restricted_ranges(keyspace& ks, const schema& s, dht::partition_range range);
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(lowres_clock::time_point timeout,
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(clock_type::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, dht::partition_range_vector::iterator&& i,
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t remaining_row_count, uint32_t remaining_partition_count);