diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c89d075d1e..ce67065cb3 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -74,6 +74,7 @@ protected: frozen_mutation _mutation; std::unordered_set _targets; // who we sent this mutation to size_t _pending_endpoints; // how many endpoints in bootstrap state there is + size_t _cl_acks = 0; virtual size_t total_block_for() { // original comment from cassandra: // during bootstrap, include pending endpoints in the count @@ -88,6 +89,7 @@ public: _ready(0), _cl(cl), _ks(ks), _mutation(std::move(mutation)), _targets(targets), _pending_endpoints(pending_endpoints) {} virtual ~abstract_write_response_handler() {}; void signal(size_t nr = 1) { + _cl_acks += nr; _ready.signal(nr); } // return true on last ack @@ -101,14 +103,12 @@ public: future<> wait() { size_t block_for = total_block_for(); //FIXME: timeout is from DatabaseDescriptor.getWriteRpcTimeout() - return _ready.wait(std::chrono::seconds(2), block_for).then_wrapped([h = shared_from_this(), block_for] (future<>&& f) { + return _ready.wait(block_for).then_wrapped([h = shared_from_this(), block_for] (future<>&& f) { // hold reference for a handler in h. If timeout happens simultaneously with arrival of // a last response the handler can be deleted before this continuation has a chance to run try { f.get(); return make_ready_future<>(); - } catch(semaphore_timed_out& ex) { - throw mutation_write_timeout_error(block_for, block_for + h->_ready.current()); } catch(...) { throw; } @@ -120,6 +120,7 @@ public: const frozen_mutation& get_mutation() { return _mutation; } + friend storage_proxy; }; class datacenter_write_response_handler : public abstract_write_response_handler { @@ -160,12 +161,26 @@ storage_proxy::storage_proxy::response_id_type storage_proxy::register_response_ auto id = _next_response_id++; auto e = _response_handlers.emplace(id, rh_entry(std::move(h), [this, id] { auto& e = _response_handlers.find(id)->second; - // targets left in the handler are not responding. Write a hint. - hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets()); - remove_response_handler(id); + auto block_for = e.handler->total_block_for(); + auto left_for_cl = block_for - e.handler->_cl_acks; + if (left_for_cl <= 0 || e.handler->_cl == db::consistency_level::ANY) { + // we are here because either cl was achieved, but targets left in the handler are not + // responding, so a hint should be written for them, or cl == any in which case + // hints are counted towards consistency, so we need to write hints and count how much was written + e.handler->signal(hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets())); +// Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); + // check cl status after hints are written (can change for cl == any) + left_for_cl = block_for - e.handler->_cl_acks; + } + + if (left_for_cl > 0) { + // timeout happened before cl was achieved, throw exception + e.handler->_ready.broken(mutation_write_timeout_error(block_for, e.handler->_cl_acks)); + } else { + remove_response_handler(id); + } })); assert(e.second); - e.first->second.expire_timer.arm(std::chrono::seconds(1)/*getWriteRpcTimeout()*/); return id; } @@ -183,7 +198,12 @@ void storage_proxy::got_response(storage_proxy::storage_proxy::response_id_type } future<> storage_proxy::response_wait(storage_proxy::response_id_type id) { - return _response_handlers.find(id)->second.handler->wait(); + auto& e = _response_handlers.find(id)->second; + + //FIXME: timeout is from DatabaseDescriptor.getWriteRpcTimeout() + e.expire_timer.arm(std::chrono::seconds(2)); + + return e.handler->wait(); } abstract_write_response_handler& storage_proxy::get_write_response_handler(storage_proxy::response_id_type id) { @@ -740,20 +760,10 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) return; } catch(mutation_write_timeout_error& ex) { // timeout - if (cl == db::consistency_level::ANY) { - warn(unimplemented::cause::HINT); - // for cl==any hint counts towards consistency - have_cl->signal(); - auto& h = get_write_response_handler(response_id); - // unlike origin we hint only to those endpoints who did not respond - hint_to_dead_endpoints(h.get_mutation(), h.get_targets()); -// Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); - } else { -// writeMetrics.timeouts.mark(); -// ClientRequestMetrics.writeTimeouts.inc(); -// Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - have_cl->broken(ex); - } +// writeMetrics.timeouts.mark(); +// ClientRequestMetrics.writeTimeouts.inc(); +// Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + have_cl->broken(ex); } catch(...) { have_cl->broken(std::current_exception()); } @@ -1098,6 +1108,7 @@ size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) { bool storage_proxy::submit_hint(const frozen_mutation& m, gms::inet_address target) { + warn(unimplemented::cause::HINT); // local write that time out should be handled by LocalMutationRunnable assert(is_me(target)); return false;