storage_proxy: get rid of semaphore in mutation write response handler

Acks are counted in _cl_acks, so there is no need to count them once
more in semaphore, just make promise available when there is enough of
them.
This commit is contained in:
Gleb Natapov
2015-11-02 09:53:12 +02:00
parent 9381ad0741
commit 4838de008f

View File

@@ -105,7 +105,7 @@ sstring get_local_dc() {
class abstract_write_response_handler {
protected:
semaphore _ready; // available when cl is achieved
promise<> _ready; // available when cl is achieved
db::consistency_level _cl;
keyspace& _ks;
db::write_type _type;
@@ -117,6 +117,7 @@ protected:
// it should not be a huge burden. (flw)
std::vector<gms::inet_address> _dead_endpoints;
size_t _cl_acks = 0;
bool _cl_achieved = false;
virtual size_t total_block_for() {
// original comment from cassandra:
// during bootstrap, include pending endpoints in the count
@@ -132,14 +133,16 @@ public:
std::unordered_set<gms::inet_address> targets,
size_t pending_endpoints = 0,
std::vector<gms::inet_address> dead_endpoints = {})
: _ready(0), _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(
std::move(targets)), _pending_endpoints(pending_endpoints), _dead_endpoints(
std::move(dead_endpoints)) {
: _cl(cl), _ks(ks), _type(type), _mutation(std::move(mutation)), _targets(std::move(targets)),
_pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) {
}
virtual ~abstract_write_response_handler() {};
void signal(size_t nr = 1) {
_cl_acks += nr;
_ready.signal(nr);
if (!_cl_achieved && _cl_acks >= total_block_for()) {
_ready.set_value();
_cl_achieved = true;
}
}
// return true on last ack
bool response(gms::inet_address from) {
@@ -150,7 +153,7 @@ public:
return _targets.size() == 0;
}
future<> wait() {
return _ready.wait(total_block_for());
return _ready.get_future();
}
const std::unordered_set<gms::inet_address>& get_targets() const {
return _targets;
@@ -224,7 +227,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(std::un
if (left_for_cl > 0) {
// timeout happened before cl was achieved, throw exception
e.handler->_ready.broken(mutation_write_timeout_exception(e.handler->_cl, e.handler->_cl_acks, block_for, e.handler->_type));
e.handler->_ready.set_exception(mutation_write_timeout_exception(e.handler->_cl, e.handler->_cl_acks, block_for, e.handler->_type));
} else {
remove_response_handler(id);
}