service/storage_proxy: Use proper read_timeout_exception

Use the shiny new read_timeout_exception that handles CQL transport
protocol encoding properly.

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
Pekka Enberg
2015-07-27 17:40:06 +03:00
parent bd55f31982
commit a4225fa7d0

View File

@@ -38,6 +38,7 @@
#include "core/future-util.hh"
#include "db/read_repair_decision.hh"
#include "db/config.hh"
#include "exceptions/request_timeout_exception.hh"
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/iterator/counting_iterator.hpp>
@@ -1375,14 +1376,9 @@ public:
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
};
class read_timeout_exception : public cassandra_exception {
public:
read_timeout_exception(size_t block_for, size_t got) :
cassandra_exception(exception_code::READ_TIMEOUT, sprint("Read operation timed out: waited for %lu got %lu", block_for, got)) {}
};
class abstract_read_resolver {
protected:
db::consistency_level _cl;
size_t _targets_count;
promise<> _done_promise; // all target responded
bool _timedout = false; // will be true if request timeouts
@@ -1392,10 +1388,13 @@ protected:
virtual void on_timeout() {}
virtual size_t response_count() const = 0;
public:
abstract_read_resolver(size_t target_count, std::chrono::high_resolution_clock::time_point timeout) : _targets_count(target_count) {
abstract_read_resolver(db::consistency_level cl, size_t target_count, std::chrono::high_resolution_clock::time_point timeout)
: _cl(cl)
, _targets_count(target_count)
{
_timeout.set_callback([this] {
_timedout = true;
_done_promise.set_exception(read_timeout_exception(_targets_count, response_count()));
_done_promise.set_exception(read_timeout_exception(_cl, response_count(), _targets_count, false /* FIXME */));
on_timeout();
});
_timeout.arm(timeout);
@@ -1410,7 +1409,6 @@ public:
};
class digest_read_resolver : public abstract_read_resolver {
db::consistency_level _cl;
size_t _block_for;
size_t _cl_responses = 0;
promise<> _cl_promise; // cl is reached
@@ -1418,7 +1416,7 @@ class digest_read_resolver : public abstract_read_resolver {
std::vector<query::result_digest> _digest_results;
virtual void on_timeout() override {
_cl_promise.set_exception(read_timeout_exception(_block_for, _cl_responses));
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, false /* FIXME */));
// we will not need them any more
_data_results.clear();
_digest_results.clear();
@@ -1432,7 +1430,7 @@ class digest_read_resolver : public abstract_read_resolver {
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
public:
digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout), _cl(cl), _block_for(block_for) {}
digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
_digest_results.emplace_back(result->digest());
@@ -1497,7 +1495,7 @@ private:
}
public:
data_read_resolver(size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout) {
data_read_resolver(db::consistency_level cl, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout) {
_data_results.reserve(targets_count);
}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
@@ -1655,8 +1653,8 @@ protected:
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1),
_targets.size() > 1 ? make_digest_requests(resolver, _targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result();
}
void reconciliate(std::chrono::high_resolution_clock::time_point timeout) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_targets.size(), timeout);
void reconciliate(db::consistency_level cl, std::chrono::high_resolution_clock::time_point timeout) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(cl, _targets.size(), timeout);
auto exec = shared_from_this();
make_mutation_data_requests(data_resolver, _targets.begin(), _targets.end()).finally([exec]{});
@@ -1682,7 +1680,7 @@ public:
// hold on to executor until all queries are complete
});
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
digest_resolver->has_cl().then_wrapped([this, exec, digest_resolver, timeout] (future<> f) {
try {
f.get();
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
@@ -1702,7 +1700,7 @@ public:
done.discard_result(); // no need for background check, discard done future explicitly
}
} catch (digest_mismatch_exception& ex) {
exec->reconciliate(timeout);
exec->reconciliate(_cl, timeout);
} catch (read_timeout_exception& ex) {
exec->_result_promise.set_exception(ex);
}
@@ -1735,7 +1733,7 @@ public:
range_slice_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::high_resolution_clock::time_point timeout) override {
reconciliate(timeout);
reconciliate(_cl, timeout);
return _result_promise.get_future();
}
};