diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1c1c9be21b..81c0b1fbe9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -189,6 +189,7 @@ protected: std::vector _dead_endpoints; size_t _cl_acks = 0; bool _cl_achieved = false; + bool _timedout = false; bool _throttled = false; protected: size_t total_block_for() { @@ -218,6 +219,8 @@ public: _proxy->_stats.background_write_bytes -= _mutation_holder->size(); _proxy->unthrottle(); } + } else if (_timedout) { + _ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, total_block_for(), _type)); } }; void unthrottle() { @@ -239,6 +242,12 @@ public: } } } + void on_timeout() { + if (_cl_achieved) { + logger.trace("Write is not acknowledged by {} replicas after achieving CL", get_targets()); + } + _timedout = true; + } // return true on last ack bool response(gms::inet_address from) { signal(from); @@ -360,13 +369,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(shared_ } } - // _cl_achieved can be modified after previous check by call to signal() above if cl == ANY - if (!e.handler->_cl_achieved) { - // timeout happened before cl was achieved, throw exception - e.handler->_ready.set_exception(mutation_write_timeout_exception(e.handler->get_schema()->ks_name(), e.handler->get_schema()->cf_name(), e.handler->_cl, e.handler->_cl_acks, e.handler->total_block_for(), e.handler->_type)); - } else { - logger.trace("Write is not acknowledged by {} replicas after achieving CL", e.handler->get_targets()); - } + e.handler->on_timeout(); remove_response_handler(id); })); assert(e.second); @@ -1347,9 +1350,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [&handler, response_id, this, my_address, timeout] (lw_shared_ptr m) { - tracing::trace(handler.get_trace_state(), "Executing a mutation locally"); - return mutate_locally(handler.get_schema(), *m, timeout).then([response_id, this, my_address, m, p = shared_from_this()] { + auto lmutate = [handler_ptr, response_id, this, my_address, timeout] (lw_shared_ptr m) mutable { + tracing::trace(handler_ptr->get_trace_state(), "Executing a mutation locally"); + auto s = handler_ptr->get_schema(); + return mutate_locally(std::move(s), *m, timeout).then([response_id, this, my_address, m, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address); @@ -1357,16 +1361,16 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, &handler, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { + auto rmutate = [this, handler_ptr, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { auto& ms = net::get_local_messaging_service(); auto msize = m.representation().size(); _stats.queued_write_bytes += msize; - auto& tr_state = handler.get_trace_state(); + auto& tr_state = handler_ptr->get_trace_state(); tracing::trace(tr_state, "Sending a mutation to /{}", coordinator); return ms.send_mutation(net::messaging_service::msg_addr{coordinator, 0}, timeout, m, - std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)).finally([this, p = shared_from_this(), msize] { + std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)).finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize] { _stats.queued_write_bytes -= msize; unthrottle(); });