diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index df4bd0864c..d49cfe7493 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -105,7 +105,7 @@ sstring get_local_dc() { class abstract_write_response_handler { protected: - semaphore _ready; // available when cl is achieved + promise<> _ready; // available when cl is achieved db::consistency_level _cl; keyspace& _ks; db::write_type _type; @@ -117,6 +117,7 @@ protected: // it should not be a huge burden. (flw) std::vector _dead_endpoints; size_t _cl_acks = 0; + bool _cl_achieved = false; virtual size_t total_block_for() { // original comment from cassandra: // during bootstrap, include pending endpoints in the count @@ -132,14 +133,16 @@ public: std::unordered_set targets, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) - : _ready(0), _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets( - std::move(targets)), _pending_endpoints(pending_endpoints), _dead_endpoints( - std::move(dead_endpoints)) { + : _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)), + _pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) { } virtual ~abstract_write_response_handler() {}; void signal(size_t nr = 1) { _cl_acks += nr; - _ready.signal(nr); + if (!_cl_achieved && _cl_acks >= total_block_for()) { + _ready.set_value(); + _cl_achieved = true; + } } // return true on last ack bool response(gms::inet_address from) { @@ -150,7 +153,7 @@ public: return _targets.size() == 0; } future<> wait() { - return _ready.wait(total_block_for()); + return _ready.get_future(); } const std::unordered_set& get_targets() const { return _targets; @@ -224,7 +227,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(std::un if (left_for_cl > 0) { // timeout happened before cl was achieved, throw exception - e.handler->_ready.broken(mutation_write_timeout_exception(e.handler->_cl, e.handler->_cl_acks, block_for, e.handler->_type)); + e.handler->_ready.set_exception(mutation_write_timeout_exception(e.handler->_cl, e.handler->_cl_acks, block_for, e.handler->_type)); } else { remove_response_handler(id); }