diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f9b79eccfc..1c1c9be21b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -959,26 +959,26 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release( future<> -storage_proxy::mutate_locally(const mutation& m) { +storage_proxy::mutate_locally(const mutation& m, clock_type::time_point timeout) { auto shard = _db.local().shard_of(m); - return _db.invoke_on(shard, [s = global_schema_ptr(m.schema()), m = freeze(m)] (database& db) -> future<> { - return db.apply(s, m); + return _db.invoke_on(shard, [s = global_schema_ptr(m.schema()), m = freeze(m), timeout] (database& db) -> future<> { + return db.apply(s, m, timeout); }); } future<> -storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m) { +storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, clock_type::time_point timeout) { auto shard = _db.local().shard_of(m); - return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s)] (database& db) -> future<> { - return db.apply(gs, m); + return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s), timeout] (database& db) -> future<> { + return db.apply(gs, m, timeout); }); } future<> -storage_proxy::mutate_locally(std::vector mutations) { - return do_with(std::move(mutations), [this] (std::vector& pmut){ - return parallel_for_each(pmut.begin(), pmut.end(), [this] (const mutation& m) { - return mutate_locally(m); +storage_proxy::mutate_locally(std::vector mutations, clock_type::time_point timeout) { + return do_with(std::move(mutations), [this, timeout] (std::vector& pmut){ + return parallel_for_each(pmut.begin(), pmut.end(), [this, timeout] (const mutation& m) { + return mutate_locally(m, timeout); }); }); } @@ -1347,9 +1347,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [&handler, response_id, this, my_address] (lw_shared_ptr m) { + auto lmutate = [&handler, response_id, this, my_address, timeout] (lw_shared_ptr m) { tracing::trace(handler.get_trace_state(), "Executing a mutation locally"); - return mutate_locally(handler.get_schema(), *m).then([response_id, this, my_address, m, p = shared_from_this()] { + return mutate_locally(handler.get_schema(), *m, timeout).then([response_id, this, my_address, m, p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address); @@ -3257,14 +3257,23 @@ void storage_proxy::init_messaging_service() { tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr); } - return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, t] (const frozen_mutation& m, shared_ptr& p) mutable { + storage_proxy::clock_type::time_point timeout; + if (!t) { + auto timeout_in_ms = get_local_shared_storage_proxy()->_db.local().get_config().write_request_timeout_in_ms(); + timeout = clock_type::now() + std::chrono::milliseconds(timeout_in_ms); + } else { + timeout = *t; + } + + return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr& p) mutable { ++p->_stats.received_mutations; p->_stats.forwarded_mutations += forward.size(); return when_all( // mutate_locally() may throw, putting it into apply() converts exception to a future. - futurize::apply([&p, &m, reply_to, src_addr = std::move(src_addr)] () mutable { - return get_schema_for_write(m.schema_version(), std::move(src_addr)).then([&m, &p] (schema_ptr s) { - return p->mutate_locally(std::move(s), m); + futurize::apply([timeout, &p, &m, reply_to, src_addr = std::move(src_addr)] () mutable { + // FIXME: get_schema_for_write() doesn't timeout + return get_schema_for_write(m.schema_version(), std::move(src_addr)).then([&m, &p, timeout] (schema_ptr s) { + return p->mutate_locally(std::move(s), m, timeout); }); }).then([reply_to, shard, response_id, trace_state_ptr] () { auto& ms = net::get_local_messaging_service(); @@ -3280,11 +3289,10 @@ void storage_proxy::init_messaging_service() { }).handle_exception([reply_to, shard] (std::exception_ptr eptr) { logger.warn("Failed to apply mutation from {}#{}: {}", reply_to, shard, eptr); }), - parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, t] (gms::inet_address forward) { + parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout] (gms::inet_address forward) { auto& ms = net::get_local_messaging_service(); - auto timeout = clock_type::now() + std::chrono::milliseconds(p->_db.local().get_config().write_request_timeout_in_ms()); tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward); - return ms.send_mutation(net::messaging_service::msg_addr{forward, 0}, t.value_or(timeout), m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) { + return ms.send_mutation(net::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) { if (f.failed()) { ++p->_stats.forwarding_errors; }; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 8c6e246818..6a9fd3ea7d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -65,7 +65,9 @@ class abstract_read_executor; class mutation_holder; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { +public: using clock_type = std::chrono::steady_clock; +private: struct rh_entry { ::shared_ptr handler; timer<> expire_timer; @@ -262,9 +264,15 @@ public: void init_messaging_service(); - future<> mutate_locally(const mutation& m); - future<> mutate_locally(const schema_ptr&, const frozen_mutation& m); - future<> mutate_locally(std::vector mutations); + // Applies mutation on this node. + // Resolves with timed_out_error when timeout is reached. + future<> mutate_locally(const mutation& m, clock_type::time_point timeout = clock_type::time_point::max()); + // Applies mutation on this node. + // Resolves with timed_out_error when timeout is reached. + future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, clock_type::time_point timeout = clock_type::time_point::max()); + // Applies mutations on this node. + // Resolves with timed_out_error when timeout is reached. + future<> mutate_locally(std::vector mutation, clock_type::time_point timeout = clock_type::time_point::max()); future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);