cluster: consolidate mutation clustering timers
Currently mutation clustering uses two timers, one expires when wait for cl timeouts and is canceled when cl is achieved, another expires if some endpoints do not answer for a long time (cl may be already achieved at this point and first timer will be canceled). This is too complicated especially since both timers can expire simultaneously. Simplify it by having only one timer and checking in a callback whether cl was achieved.
This commit is contained in:
@@ -74,6 +74,7 @@ protected:
|
||||
frozen_mutation _mutation;
|
||||
std::unordered_set<gms::inet_address> _targets; // who we sent this mutation to
|
||||
size_t _pending_endpoints; // how many endpoints in bootstrap state there is
|
||||
size_t _cl_acks = 0;
|
||||
virtual size_t total_block_for() {
|
||||
// original comment from cassandra:
|
||||
// during bootstrap, include pending endpoints in the count
|
||||
@@ -88,6 +89,7 @@ public:
|
||||
_ready(0), _cl(cl), _ks(ks), _mutation(std::move(mutation)), _targets(targets), _pending_endpoints(pending_endpoints) {}
|
||||
virtual ~abstract_write_response_handler() {};
|
||||
void signal(size_t nr = 1) {
|
||||
_cl_acks += nr;
|
||||
_ready.signal(nr);
|
||||
}
|
||||
// return true on last ack
|
||||
@@ -101,14 +103,12 @@ public:
|
||||
future<> wait() {
|
||||
size_t block_for = total_block_for();
|
||||
//FIXME: timeout is from DatabaseDescriptor.getWriteRpcTimeout()
|
||||
return _ready.wait(std::chrono::seconds(2), block_for).then_wrapped([h = shared_from_this(), block_for] (future<>&& f) {
|
||||
return _ready.wait(block_for).then_wrapped([h = shared_from_this(), block_for] (future<>&& f) {
|
||||
// hold reference for a handler in h. If timeout happens simultaneously with arrival of
|
||||
// a last response the handler can be deleted before this continuation has a chance to run
|
||||
try {
|
||||
f.get();
|
||||
return make_ready_future<>();
|
||||
} catch(semaphore_timed_out& ex) {
|
||||
throw mutation_write_timeout_error(block_for, block_for + h->_ready.current());
|
||||
} catch(...) {
|
||||
throw;
|
||||
}
|
||||
@@ -120,6 +120,7 @@ public:
|
||||
const frozen_mutation& get_mutation() {
|
||||
return _mutation;
|
||||
}
|
||||
friend storage_proxy;
|
||||
};
|
||||
|
||||
class datacenter_write_response_handler : public abstract_write_response_handler {
|
||||
@@ -160,12 +161,26 @@ storage_proxy::storage_proxy::response_id_type storage_proxy::register_response_
|
||||
auto id = _next_response_id++;
|
||||
auto e = _response_handlers.emplace(id, rh_entry(std::move(h), [this, id] {
|
||||
auto& e = _response_handlers.find(id)->second;
|
||||
// targets left in the handler are not responding. Write a hint.
|
||||
hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets());
|
||||
remove_response_handler(id);
|
||||
auto block_for = e.handler->total_block_for();
|
||||
auto left_for_cl = block_for - e.handler->_cl_acks;
|
||||
if (left_for_cl <= 0 || e.handler->_cl == db::consistency_level::ANY) {
|
||||
// we are here because either cl was achieved, but targets left in the handler are not
|
||||
// responding, so a hint should be written for them, or cl == any in which case
|
||||
// hints are counted towards consistency, so we need to write hints and count how much was written
|
||||
e.handler->signal(hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets()));
|
||||
// Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
|
||||
// check cl status after hints are written (can change for cl == any)
|
||||
left_for_cl = block_for - e.handler->_cl_acks;
|
||||
}
|
||||
|
||||
if (left_for_cl > 0) {
|
||||
// timeout happened before cl was achieved, throw exception
|
||||
e.handler->_ready.broken(mutation_write_timeout_error(block_for, e.handler->_cl_acks));
|
||||
} else {
|
||||
remove_response_handler(id);
|
||||
}
|
||||
}));
|
||||
assert(e.second);
|
||||
e.first->second.expire_timer.arm(std::chrono::seconds(1)/*getWriteRpcTimeout()*/);
|
||||
return id;
|
||||
}
|
||||
|
||||
@@ -183,7 +198,12 @@ void storage_proxy::got_response(storage_proxy::storage_proxy::response_id_type
|
||||
}
|
||||
|
||||
future<> storage_proxy::response_wait(storage_proxy::response_id_type id) {
|
||||
return _response_handlers.find(id)->second.handler->wait();
|
||||
auto& e = _response_handlers.find(id)->second;
|
||||
|
||||
//FIXME: timeout is from DatabaseDescriptor.getWriteRpcTimeout()
|
||||
e.expire_timer.arm(std::chrono::seconds(2));
|
||||
|
||||
return e.handler->wait();
|
||||
}
|
||||
|
||||
abstract_write_response_handler& storage_proxy::get_write_response_handler(storage_proxy::response_id_type id) {
|
||||
@@ -740,20 +760,10 @@ storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl)
|
||||
return;
|
||||
} catch(mutation_write_timeout_error& ex) {
|
||||
// timeout
|
||||
if (cl == db::consistency_level::ANY) {
|
||||
warn(unimplemented::cause::HINT);
|
||||
// for cl==any hint counts towards consistency
|
||||
have_cl->signal();
|
||||
auto& h = get_write_response_handler(response_id);
|
||||
// unlike origin we hint only to those endpoints who did not respond
|
||||
hint_to_dead_endpoints(h.get_mutation(), h.get_targets());
|
||||
// Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
|
||||
} else {
|
||||
// writeMetrics.timeouts.mark();
|
||||
// ClientRequestMetrics.writeTimeouts.inc();
|
||||
// Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
|
||||
have_cl->broken(ex);
|
||||
}
|
||||
// writeMetrics.timeouts.mark();
|
||||
// ClientRequestMetrics.writeTimeouts.inc();
|
||||
// Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
|
||||
have_cl->broken(ex);
|
||||
} catch(...) {
|
||||
have_cl->broken(std::current_exception());
|
||||
}
|
||||
@@ -1098,6 +1108,7 @@ size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) {
|
||||
|
||||
bool storage_proxy::submit_hint(const frozen_mutation& m, gms::inet_address target)
|
||||
{
|
||||
warn(unimplemented::cause::HINT);
|
||||
// local write that time out should be handled by LocalMutationRunnable
|
||||
assert(is_me(target));
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user