service/storage_proxy: Embed the expire timer in the response handler

Embedding the expire timer for a write response in the
abstract_write_response_handler simplifies the code as it allows
removing the rh_entry type.

It will also make the timeout easily accessible inside the handler,
for future patches.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <20181213111818.39983-1-duarte@scylladb.com>
(cherry picked from commit f8878238ed)
This commit is contained in:
Duarte Nunes
2018-12-13 11:18:18 +00:00
parent 13b72c7b92
commit 9e6cc5b024
2 changed files with 23 additions and 24 deletions

View File

@@ -214,6 +214,7 @@ protected:
size_t _all_failures = 0; // total amount of failures
size_t _total_endpoints = 0;
storage_proxy::write_stats& _stats;
timer<storage_proxy::clock_type> _expire_timer;
protected:
virtual bool waited_for(gms::inet_address from) = 0;
@@ -228,7 +229,7 @@ public:
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
storage_proxy::write_stats& stats, size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
: _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_dead_endpoints(std::move(dead_endpoints)), _stats(stats) {
_dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }) {
// original comment from cassandra:
// during bootstrap, include pending endpoints in the count
// or we may fail the consistency level guarantees (see #833, #8058)
@@ -323,6 +324,12 @@ public:
timeout_cb();
}
}
void expire_at(storage_proxy::clock_type::time_point timeout) {
_expire_timer.arm(timeout);
}
void on_released() {
_expire_timer.cancel();
}
void timeout_cb() {
if (_cl_achieved || _cl == db::consistency_level::ANY) {
// we are here because either cl was achieved, but targets left in the handler are not
@@ -503,7 +510,7 @@ void storage_proxy::unthrottle() {
_throttled_writes.pop_front();
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
it->second.handler->unthrottle();
it->second->unthrottle();
}
}
}
@@ -516,17 +523,19 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(shared_
}
void storage_proxy::remove_response_handler(storage_proxy::response_id_type id) {
_response_handlers.erase(id);
auto entry = _response_handlers.find(id);
entry->second->on_released();
_response_handlers.erase(std::move(entry));
}
void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_address from) {
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
tracing::trace(it->second.handler->get_trace_state(), "Got a response from /{}", from);
if (it->second.handler->response(from)) {
tracing::trace(it->second->get_trace_state(), "Got a response from /{}", from);
if (it->second->response(from)) {
remove_response_handler(id); // last one, remove entry. Will cancel expiration timer too.
} else {
it->second.handler->check_for_early_completion();
it->second->check_for_early_completion();
}
}
}
@@ -534,25 +543,23 @@ void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_a
void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count) {
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
tracing::trace(it->second.handler->get_trace_state(), "Got {} failures from /{}", count, from);
if (it->second.handler->failure_response(from, count)) {
tracing::trace(it->second->get_trace_state(), "Got {} failures from /{}", count, from);
if (it->second->failure_response(from, count)) {
remove_response_handler(id);
} else {
it->second.handler->check_for_early_completion();
it->second->check_for_early_completion();
}
}
}
future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_type::time_point timeout) {
auto& e = _response_handlers.find(id)->second;
e.expire_timer.arm(timeout);
return e.handler->wait();
auto& handler = _response_handlers.find(id)->second;
handler->expire_at(timeout);
return handler->wait();
}
::shared_ptr<abstract_write_response_handler>& storage_proxy::get_write_response_handler(storage_proxy::response_id_type id) {
return _response_handlers.find(id)->second.handler;
return _response_handlers.find(id)->second;
}
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
@@ -801,8 +808,6 @@ storage_proxy::storage_proxy(distributed<database>& db, storage_proxy::config cf
_hints_resource_manager.register_manager(_hints_for_views_manager);
}
storage_proxy::rh_entry::rh_entry(shared_ptr<abstract_write_response_handler>&& h) : handler(std::move(h)), expire_timer([this] { handler->timeout_cb(); }) {}
storage_proxy::unique_response_handler::unique_response_handler(storage_proxy& p_, response_id_type id_) : id(id_), p(p_) {}
storage_proxy::unique_response_handler::unique_response_handler(unique_response_handler&& x) : id(x.id), p(x.p) { x.id = 0; };
storage_proxy::unique_response_handler::~unique_response_handler() {

View File

@@ -80,12 +80,6 @@ public:
size_t available_memory;
};
private:
struct rh_entry {
::shared_ptr<abstract_write_response_handler> handler;
timer<clock_type> expire_timer;
rh_entry(::shared_ptr<abstract_write_response_handler>&& h);
rh_entry(rh_entry&&) = delete;
};
using response_id_type = uint64_t;
struct unique_response_handler {
@@ -145,7 +139,7 @@ public:
private:
distributed<database>& _db;
response_id_type _next_response_id;
std::unordered_map<response_id_type, rh_entry> _response_handlers;
std::unordered_map<response_id_type, ::shared_ptr<abstract_write_response_handler>> _response_handlers;
// This buffer hold ids of throttled writes in case resource consumption goes
// below the threshold and we want to unthrottle some of them. Without this throttled
// request with dead or slow replica may wait for up to timeout ms before replying