From a4225fa7d06dcaa709bb0b608ef2d5fd68ddc87f Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 27 Jul 2015 17:40:06 +0300 Subject: [PATCH] service/storage_proxy: Use proper read_timeout_exception Use the shiny new read_timeout_exception that handles CQL transport protocol encoding properly. Signed-off-by: Pekka Enberg --- service/storage_proxy.cc | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index a53f5b53b1..b5684b7aa2 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -38,6 +38,7 @@ #include "core/future-util.hh" #include "db/read_repair_decision.hh" #include "db/config.hh" +#include "exceptions/request_timeout_exception.hh" #include #include #include @@ -1375,14 +1376,9 @@ public: digest_mismatch_exception() : std::runtime_error("Digest mismatch") {} }; -class read_timeout_exception : public cassandra_exception { -public: - read_timeout_exception(size_t block_for, size_t got) : - cassandra_exception(exception_code::READ_TIMEOUT, sprint("Read operation timed out: waited for %lu got %lu", block_for, got)) {} -}; - class abstract_read_resolver { protected: + db::consistency_level _cl; size_t _targets_count; promise<> _done_promise; // all target responded bool _timedout = false; // will be true if request timeouts @@ -1392,10 +1388,13 @@ protected: virtual void on_timeout() {} virtual size_t response_count() const = 0; public: - abstract_read_resolver(size_t target_count, std::chrono::high_resolution_clock::time_point timeout) : _targets_count(target_count) { + abstract_read_resolver(db::consistency_level cl, size_t target_count, std::chrono::high_resolution_clock::time_point timeout) + : _cl(cl) + , _targets_count(target_count) + { _timeout.set_callback([this] { _timedout = true; - _done_promise.set_exception(read_timeout_exception(_targets_count, response_count())); + _done_promise.set_exception(read_timeout_exception(_cl, response_count(), _targets_count, false /* FIXME */)); on_timeout(); }); _timeout.arm(timeout); @@ -1410,7 +1409,6 @@ public: }; class digest_read_resolver : public abstract_read_resolver { - db::consistency_level _cl; size_t _block_for; size_t _cl_responses = 0; promise<> _cl_promise; // cl is reached @@ -1418,7 +1416,7 @@ class digest_read_resolver : public abstract_read_resolver { std::vector _digest_results; virtual void on_timeout() override { - _cl_promise.set_exception(read_timeout_exception(_block_for, _cl_responses)); + _cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, false /* FIXME */)); // we will not need them any more _data_results.clear(); _digest_results.clear(); @@ -1432,7 +1430,7 @@ class digest_read_resolver : public abstract_read_resolver { return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end(); } public: - digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout), _cl(cl), _block_for(block_for) {} + digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout), _block_for(block_for) {} void add_data(gms::inet_address from, foreign_ptr> result) { if (!_timedout) { _digest_results.emplace_back(result->digest()); @@ -1497,7 +1495,7 @@ private: } public: - data_read_resolver(size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout) { + data_read_resolver(db::consistency_level cl, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout) { _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { @@ -1655,8 +1653,8 @@ protected: return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1), _targets.size() > 1 ? make_digest_requests(resolver, _targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result(); } - void reconciliate(std::chrono::high_resolution_clock::time_point timeout) { - data_resolver_ptr data_resolver = ::make_shared(_targets.size(), timeout); + void reconciliate(db::consistency_level cl, std::chrono::high_resolution_clock::time_point timeout) { + data_resolver_ptr data_resolver = ::make_shared(cl, _targets.size(), timeout); auto exec = shared_from_this(); make_mutation_data_requests(data_resolver, _targets.begin(), _targets.end()).finally([exec]{}); @@ -1682,7 +1680,7 @@ public: // hold on to executor until all queries are complete }); - digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) { + digest_resolver->has_cl().then_wrapped([this, exec, digest_resolver, timeout] (future<> f) { try { f.get(); exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception @@ -1702,7 +1700,7 @@ public: done.discard_result(); // no need for background check, discard done future explicitly } } catch (digest_mismatch_exception& ex) { - exec->reconciliate(timeout); + exec->reconciliate(_cl, timeout); } catch (read_timeout_exception& ex) { exec->_result_promise.set_exception(ex); } @@ -1735,7 +1733,7 @@ public: range_slice_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : abstract_read_executor(std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} virtual future>> execute(std::chrono::high_resolution_clock::time_point timeout) override { - reconciliate(timeout); + reconciliate(_cl, timeout); return _result_promise.get_future(); } };