From 9e6cc5b024fc045c0423c25f00749eb092ae9c5d Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 13 Dec 2018 11:18:18 +0000 Subject: [PATCH] service/storage_proxy: Embed the expire timer in the response handler Embedding the expire timer for a write response in the abstract_write_response_handler simplifies the code as it allows removing the rh_entry type. It will also make the timeout easily accessible inside the handler, for future patches. Signed-off-by: Duarte Nunes Message-Id: <20181213111818.39983-1-duarte@scylladb.com> (cherry picked from commit f8878238ed2a4c7bcb4c514edb2cefa190930456) --- service/storage_proxy.cc | 39 ++++++++++++++++++++++----------------- service/storage_proxy.hh | 8 +------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index cafe499c43..084a2dc88c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -214,6 +214,7 @@ protected: size_t _all_failures = 0; // total amount of failures size_t _total_endpoints = 0; storage_proxy::write_stats& _stats; + timer _expire_timer; protected: virtual bool waited_for(gms::inet_address from) = 0; @@ -228,7 +229,7 @@ public: std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), - _dead_endpoints(std::move(dead_endpoints)), _stats(stats) { + _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }) { // original comment from cassandra: // during bootstrap, include pending endpoints in the count // or we may fail the consistency level guarantees (see #833, #8058) @@ -323,6 +324,12 @@ public: timeout_cb(); } } + void expire_at(storage_proxy::clock_type::time_point timeout) { + _expire_timer.arm(timeout); + } + void on_released() { + _expire_timer.cancel(); + } void timeout_cb() { if (_cl_achieved || _cl == db::consistency_level::ANY) { // we are here because either cl was achieved, but targets left in the handler are not @@ -503,7 +510,7 @@ void storage_proxy::unthrottle() { _throttled_writes.pop_front(); auto it = _response_handlers.find(id); if (it != _response_handlers.end()) { - it->second.handler->unthrottle(); + it->second->unthrottle(); } } } @@ -516,17 +523,19 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(shared_ } void storage_proxy::remove_response_handler(storage_proxy::response_id_type id) { - _response_handlers.erase(id); + auto entry = _response_handlers.find(id); + entry->second->on_released(); + _response_handlers.erase(std::move(entry)); } void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_address from) { auto it = _response_handlers.find(id); if (it != _response_handlers.end()) { - tracing::trace(it->second.handler->get_trace_state(), "Got a response from /{}", from); - if (it->second.handler->response(from)) { + tracing::trace(it->second->get_trace_state(), "Got a response from /{}", from); + if (it->second->response(from)) { remove_response_handler(id); // last one, remove entry. Will cancel expiration timer too. } else { - it->second.handler->check_for_early_completion(); + it->second->check_for_early_completion(); } } } @@ -534,25 +543,23 @@ void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_a void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count) { auto it = _response_handlers.find(id); if (it != _response_handlers.end()) { - tracing::trace(it->second.handler->get_trace_state(), "Got {} failures from /{}", count, from); - if (it->second.handler->failure_response(from, count)) { + tracing::trace(it->second->get_trace_state(), "Got {} failures from /{}", count, from); + if (it->second->failure_response(from, count)) { remove_response_handler(id); } else { - it->second.handler->check_for_early_completion(); + it->second->check_for_early_completion(); } } } future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_type::time_point timeout) { - auto& e = _response_handlers.find(id)->second; - - e.expire_timer.arm(timeout); - - return e.handler->wait(); + auto& handler = _response_handlers.find(id)->second; + handler->expire_at(timeout); + return handler->wait(); } ::shared_ptr& storage_proxy::get_write_response_handler(storage_proxy::response_id_type id) { - return _response_handlers.find(id)->second.handler; + return _response_handlers.find(id)->second; } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, @@ -801,8 +808,6 @@ storage_proxy::storage_proxy(distributed& db, storage_proxy::config cf _hints_resource_manager.register_manager(_hints_for_views_manager); } -storage_proxy::rh_entry::rh_entry(shared_ptr&& h) : handler(std::move(h)), expire_timer([this] { handler->timeout_cb(); }) {} - storage_proxy::unique_response_handler::unique_response_handler(storage_proxy& p_, response_id_type id_) : id(id_), p(p_) {} storage_proxy::unique_response_handler::unique_response_handler(unique_response_handler&& x) : id(x.id), p(x.p) { x.id = 0; }; storage_proxy::unique_response_handler::~unique_response_handler() { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 2b522ca935..f21dd19bde 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -80,12 +80,6 @@ public: size_t available_memory; }; private: - struct rh_entry { - ::shared_ptr handler; - timer expire_timer; - rh_entry(::shared_ptr&& h); - rh_entry(rh_entry&&) = delete; - }; using response_id_type = uint64_t; struct unique_response_handler { @@ -145,7 +139,7 @@ public: private: distributed& _db; response_id_type _next_response_id; - std::unordered_map _response_handlers; + std::unordered_map> _response_handlers; // This buffer hold ids of throttled writes in case resource consumption goes // below the threshold and we want to unthrottle some of them. Without this throttled // request with dead or slow replica may wait for up to timeout ms before replying