storage_proxy: Propagate timeout to local writes

This commit is contained in:
Tomasz Grabiec
2016-11-15 13:46:12 +01:00
parent 6d195a1538
commit ba3779802f
2 changed files with 38 additions and 22 deletions

View File

@@ -959,26 +959,26 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release(
future<>
storage_proxy::mutate_locally(const mutation& m) {
storage_proxy::mutate_locally(const mutation& m, clock_type::time_point timeout) {
auto shard = _db.local().shard_of(m);
return _db.invoke_on(shard, [s = global_schema_ptr(m.schema()), m = freeze(m)] (database& db) -> future<> {
return db.apply(s, m);
return _db.invoke_on(shard, [s = global_schema_ptr(m.schema()), m = freeze(m), timeout] (database& db) -> future<> {
return db.apply(s, m, timeout);
});
}
future<>
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m) {
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, clock_type::time_point timeout) {
auto shard = _db.local().shard_of(m);
return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s)] (database& db) -> future<> {
return db.apply(gs, m);
return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s), timeout] (database& db) -> future<> {
return db.apply(gs, m, timeout);
});
}
future<>
storage_proxy::mutate_locally(std::vector<mutation> mutations) {
return do_with(std::move(mutations), [this] (std::vector<mutation>& pmut){
return parallel_for_each(pmut.begin(), pmut.end(), [this] (const mutation& m) {
return mutate_locally(m);
storage_proxy::mutate_locally(std::vector<mutation> mutations, clock_type::time_point timeout) {
return do_with(std::move(mutations), [this, timeout] (std::vector<mutation>& pmut){
return parallel_for_each(pmut.begin(), pmut.end(), [this, timeout] (const mutation& m) {
return mutate_locally(m, timeout);
});
});
}
@@ -1347,9 +1347,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto my_address = utils::fb_utilities::get_broadcast_address();
// lambda for applying mutation locally
auto lmutate = [&handler, response_id, this, my_address] (lw_shared_ptr<const frozen_mutation> m) {
auto lmutate = [&handler, response_id, this, my_address, timeout] (lw_shared_ptr<const frozen_mutation> m) {
tracing::trace(handler.get_trace_state(), "Executing a mutation locally");
return mutate_locally(handler.get_schema(), *m).then([response_id, this, my_address, m, p = shared_from_this()] {
return mutate_locally(handler.get_schema(), *m, timeout).then([response_id, this, my_address, m, p = shared_from_this()] {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
got_response(response_id, my_address);
@@ -3257,14 +3257,23 @@ void storage_proxy::init_messaging_service() {
tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr);
}
return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, t] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) mutable {
storage_proxy::clock_type::time_point timeout;
if (!t) {
auto timeout_in_ms = get_local_shared_storage_proxy()->_db.local().get_config().write_request_timeout_in_ms();
timeout = clock_type::now() + std::chrono::milliseconds(timeout_in_ms);
} else {
timeout = *t;
}
return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) mutable {
++p->_stats.received_mutations;
p->_stats.forwarded_mutations += forward.size();
return when_all(
// mutate_locally() may throw, putting it into apply() converts exception to a future.
futurize<void>::apply([&p, &m, reply_to, src_addr = std::move(src_addr)] () mutable {
return get_schema_for_write(m.schema_version(), std::move(src_addr)).then([&m, &p] (schema_ptr s) {
return p->mutate_locally(std::move(s), m);
futurize<void>::apply([timeout, &p, &m, reply_to, src_addr = std::move(src_addr)] () mutable {
// FIXME: get_schema_for_write() doesn't timeout
return get_schema_for_write(m.schema_version(), std::move(src_addr)).then([&m, &p, timeout] (schema_ptr s) {
return p->mutate_locally(std::move(s), m, timeout);
});
}).then([reply_to, shard, response_id, trace_state_ptr] () {
auto& ms = net::get_local_messaging_service();
@@ -3280,11 +3289,10 @@ void storage_proxy::init_messaging_service() {
}).handle_exception([reply_to, shard] (std::exception_ptr eptr) {
logger.warn("Failed to apply mutation from {}#{}: {}", reply_to, shard, eptr);
}),
parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, t] (gms::inet_address forward) {
parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout] (gms::inet_address forward) {
auto& ms = net::get_local_messaging_service();
auto timeout = clock_type::now() + std::chrono::milliseconds(p->_db.local().get_config().write_request_timeout_in_ms());
tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward);
return ms.send_mutation(net::messaging_service::msg_addr{forward, 0}, t.value_or(timeout), m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) {
return ms.send_mutation(net::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) {
if (f.failed()) {
++p->_stats.forwarding_errors;
};

View File

@@ -65,7 +65,9 @@ class abstract_read_executor;
class mutation_holder;
class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*implements StorageProxyMBean*/ {
public:
using clock_type = std::chrono::steady_clock;
private:
struct rh_entry {
::shared_ptr<abstract_write_response_handler> handler;
timer<> expire_timer;
@@ -262,9 +264,15 @@ public:
void init_messaging_service();
future<> mutate_locally(const mutation& m);
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
future<> mutate_locally(std::vector<mutation> mutations);
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
// Applies mutations on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(std::vector<mutation> mutation, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);