storage_proxy: Delay timeout response until background work ceases

Write requests which timed out may still occupy memory for a while due
to local write. It should time out soon as well but there is a time
window in which it has not yet. If we don't delay timeout response,
the request would be seen as not consuming any memory too early. This
in turn would cause the CQL server to allow more requests than we
want. In some cases causing OOM or exceeding memory limits and causing
excessive cache eviciton.

Fixes #1756.
This commit is contained in:
Tomasz Grabiec
2016-11-04 15:30:37 +01:00
parent ba3779802f
commit 14cb31f69a

View File

@@ -189,6 +189,7 @@ protected:
std::vector<gms::inet_address> _dead_endpoints;
size_t _cl_acks = 0;
bool _cl_achieved = false;
bool _timedout = false;
bool _throttled = false;
protected:
size_t total_block_for() {
@@ -218,6 +219,8 @@ public:
_proxy->_stats.background_write_bytes -= _mutation_holder->size();
_proxy->unthrottle();
}
} else if (_timedout) {
_ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, total_block_for(), _type));
}
};
void unthrottle() {
@@ -239,6 +242,12 @@ public:
}
}
}
void on_timeout() {
if (_cl_achieved) {
logger.trace("Write is not acknowledged by {} replicas after achieving CL", get_targets());
}
_timedout = true;
}
// return true on last ack
bool response(gms::inet_address from) {
signal(from);
@@ -360,13 +369,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(shared_
}
}
// _cl_achieved can be modified after previous check by call to signal() above if cl == ANY
if (!e.handler->_cl_achieved) {
// timeout happened before cl was achieved, throw exception
e.handler->_ready.set_exception(mutation_write_timeout_exception(e.handler->get_schema()->ks_name(), e.handler->get_schema()->cf_name(), e.handler->_cl, e.handler->_cl_acks, e.handler->total_block_for(), e.handler->_type));
} else {
logger.trace("Write is not acknowledged by {} replicas after achieving CL", e.handler->get_targets());
}
e.handler->on_timeout();
remove_response_handler(id);
}));
assert(e.second);
@@ -1347,9 +1350,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto my_address = utils::fb_utilities::get_broadcast_address();
// lambda for applying mutation locally
auto lmutate = [&handler, response_id, this, my_address, timeout] (lw_shared_ptr<const frozen_mutation> m) {
tracing::trace(handler.get_trace_state(), "Executing a mutation locally");
return mutate_locally(handler.get_schema(), *m, timeout).then([response_id, this, my_address, m, p = shared_from_this()] {
auto lmutate = [handler_ptr, response_id, this, my_address, timeout] (lw_shared_ptr<const frozen_mutation> m) mutable {
tracing::trace(handler_ptr->get_trace_state(), "Executing a mutation locally");
auto s = handler_ptr->get_schema();
return mutate_locally(std::move(s), *m, timeout).then([response_id, this, my_address, m, h = std::move(handler_ptr), p = shared_from_this()] {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
got_response(response_id, my_address);
@@ -1357,16 +1361,16 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
};
// lambda for applying mutation remotely
auto rmutate = [this, &handler, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward, const frozen_mutation& m) {
auto rmutate = [this, handler_ptr, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward, const frozen_mutation& m) {
auto& ms = net::get_local_messaging_service();
auto msize = m.representation().size();
_stats.queued_write_bytes += msize;
auto& tr_state = handler.get_trace_state();
auto& tr_state = handler_ptr->get_trace_state();
tracing::trace(tr_state, "Sending a mutation to /{}", coordinator);
return ms.send_mutation(net::messaging_service::msg_addr{coordinator, 0}, timeout, m,
std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)).finally([this, p = shared_from_this(), msize] {
std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)).finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize] {
_stats.queued_write_bytes -= msize;
unthrottle();
});