diff --git a/db/consistency_level.cc b/db/consistency_level.cc index 465b440ee0..fd46d8ea23 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -59,18 +59,18 @@ namespace db { logging::logger cl_logger("consistency"); -size_t quorum_for(keyspace& ks) { +size_t quorum_for(const keyspace& ks) { return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; } -size_t local_quorum_for(keyspace& ks, const sstring& dc) { +size_t local_quorum_for(const keyspace& ks, const sstring& dc) { using namespace locator; auto& rs = ks.get_replication_strategy(); if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); + const network_topology_strategy* nrs = + static_cast(&rs); return (nrs->get_replication_factor(dc) / 2) + 1; } diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 2156b2fba0..fedf437ef0 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -56,9 +56,9 @@ namespace db { extern logging::logger cl_logger; -size_t quorum_for(keyspace& ks); +size_t quorum_for(const keyspace& ks); -size_t local_quorum_for(keyspace& ks, const sstring& dc); +size_t local_quorum_for(const keyspace& ks, const sstring& dc); size_t block_for_local_serial(keyspace& ks); diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index 7e34bc1cc8..5bb1778b55 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -62,7 +62,9 @@ enum class exception_code : int32_t { IS_BOOTSTRAPPING= 0x1002, TRUNCATE_ERROR = 0x1003, WRITE_TIMEOUT = 0x1100, + WRITE_FAILURE = 0x1500, READ_TIMEOUT = 0x1200, + READ_FAILURE = 0x1300, // 2xx: problem validating the request SYNTAX_ERROR = 0x2000, @@ -161,6 +163,40 @@ struct mutation_write_timeout_exception : public request_timeout_exception { { } }; +class request_failure_exception : public cassandra_exception { +public: + db::consistency_level consistency; + int32_t received; + int32_t failures; + int32_t block_for; + +protected: + request_failure_exception(exception_code code, const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_) noexcept + : cassandra_exception{code, prepare_message("Operation failed for %s.%s - received %d responses and %d failures from %d CL=%s.", ks, cf, received_, failures_, block_for_, consistency_)} + , consistency{consistency_} + , received{received_} + , failures{failures_} + , block_for{block_for_} + {} +}; + +struct mutation_write_failure_exception : public request_failure_exception { + db::write_type type; + mutation_write_failure_exception(const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_, db::write_type type_) noexcept : + request_failure_exception(exception_code::WRITE_FAILURE, ks, cf, consistency_, received_, failures_, block_for_) + , type{std::move(type_)} + { } +}; + +struct read_failure_exception : public request_failure_exception { + bool data_present; + + read_failure_exception(const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_, bool data_present_) noexcept + : request_failure_exception{exception_code::READ_FAILURE, ks, cf, consistency_, received_, failures_, block_for_} + , data_present{data_present_} + { } +}; + struct overloaded_exception : public cassandra_exception { overloaded_exception(size_t c) noexcept : cassandra_exception(exception_code::OVERLOADED, prepare_message("Too many in flight hints: %lu", c)) {} diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 162481513a..b1920b3cd6 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1930,6 +1930,14 @@ std::set gossiper::get_supported_features(inet_address endpoint) const return to_feature_set(app_state->value); } +bool gossiper::node_has_feature(inet_address endpoint, const feature& f) const { + auto app_state = get_application_state_ptr(endpoint, application_state::SUPPORTED_FEATURES); + if (!app_state) { + return {}; + } + return app_state->value.find(f.name()) != sstring::npos; +} + std::set gossiper::get_supported_features() const { std::unordered_map> features_map; std::set common_features; diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a032190e5c..cf7ef5c816 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -562,6 +562,8 @@ public: void check_knows_remote_features(sstring local_features_string) const; void check_knows_remote_features(sstring local_features_string, std::unordered_map peer_features_string) const; void maybe_enable_features(); + // Return true if the feature is present on the node + bool node_has_feature(inet_address endpoint, const feature& f) const; private: void register_feature(feature* f); void unregister_feature(feature* f); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 75be7b52aa..a1c609dc05 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -401,7 +401,7 @@ static unsigned get_rpc_client_idx(messaging_verb verb) { verb == messaging_verb::STREAM_MUTATION_DONE || verb == messaging_verb::COMPLETE_MESSAGE) { idx = 2; - } else if (verb == messaging_verb::MUTATION_DONE) { + } else if (verb == messaging_verb::MUTATION_DONE || verb == messaging_verb::MUTATION_FAILED) { idx = 3; } return idx; @@ -811,6 +811,16 @@ future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, resp return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id)); } +void messaging_service::register_mutation_failed(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed)>&& func) { + register_handler(this, netw::messaging_verb::MUTATION_FAILED, std::move(func)); +} +void messaging_service::unregister_mutation_failed() { + _rpc->unregister_handler(netw::messaging_verb::MUTATION_FAILED); +} +future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed) { + return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), std::move(shard), std::move(response_id), num_failed); +} + void messaging_service::register_read_data(std::function>, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional oda)>&& func) { register_handler(this, netw::messaging_verb::READ_DATA, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 0fa2c0c420..2989aa85d2 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -108,7 +108,8 @@ enum class messaging_verb : int32_t { GET_SCHEMA_VERSION = 21, SCHEMA_CHECK = 22, COUNTER_MUTATION = 23, - LAST = 24, + MUTATION_FAILED = 24, + LAST = 25, }; } // namespace netw @@ -311,6 +312,11 @@ public: void unregister_mutation_done(); future<> send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id); + // Wrapper for MUTATION_FAILED + void register_mutation_failed(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed)>&& func); + void unregister_mutation_failed(); + future<> send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed); + // Wrapper for READ_DATA // Note: WTH is future> void register_read_data(std::function>, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional digest)>&& func); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d392048c2e..34029443cd 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -180,7 +180,7 @@ public: } }; -class abstract_write_response_handler : public enable_shared_from_this { +class abstract_write_response_handler { protected: storage_proxy::response_id_type _id; promise<> _ready; // available when cl is achieved @@ -191,29 +191,40 @@ protected: db::write_type _type; std::unique_ptr _mutation_holder; std::unordered_set _targets; // who we sent this mutation to - size_t _pending_endpoints; // how many endpoints in bootstrap state there is // added dead_endpoints as a memeber here as well. This to be able to carry the info across // calls in helper methods in a convinient way. Since we hope this will be empty most of the time // it should not be a huge burden. (flw) std::vector _dead_endpoints; size_t _cl_acks = 0; bool _cl_achieved = false; - bool _timedout = false; bool _throttled = false; + enum class error : uint8_t { + NONE, + TIMEOUT, + FAILURE, + }; + error _error = error::NONE; + size_t _failed = 0; + size_t _total_endpoints = 0; + protected: + virtual bool waited_for(gms::inet_address from) = 0; virtual void signal(gms::inet_address from) { - signal(); + if (waited_for(from)) { + signal(); + } } + public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) : _id(p->_next_response_id++), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), - _pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) { + _dead_endpoints(std::move(dead_endpoints)) { // original comment from cassandra: // during bootstrap, include pending endpoints in the count // or we may fail the consistency level guarantees (see #833, #8058) - _total_block_for = db::block_for(ks, _cl) + _pending_endpoints; + _total_block_for = db::block_for(ks, _cl) + pending_endpoints; ++_proxy->_stats.writes; } virtual ~abstract_write_response_handler() { @@ -226,8 +237,10 @@ public: _proxy->_stats.background_write_bytes -= _mutation_holder->size(); _proxy->unthrottle(); } - } else if (_timedout) { + } else if (_error == error::TIMEOUT) { _ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _total_block_for, _type)); + } else if (_error == error::FAILURE) { + _ready.set_exception(mutation_write_failure_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _failed, _total_block_for, _type)); } }; bool is_counter() const { @@ -252,11 +265,21 @@ public: } } } + virtual bool failure(gms::inet_address from, size_t count) { + if (waited_for(from)) { + _failed += count; + if (_total_block_for + _failed > _total_endpoints) { + _error = error::FAILURE; + return true; + } + } + return false; + } void on_timeout() { if (_cl_achieved) { slogger.trace("Write is not acknowledged by {} replicas after achieving CL", get_targets()); } - _timedout = true; + _error = error::TIMEOUT; } // return true on last ack bool response(gms::inet_address from) { @@ -294,39 +317,52 @@ public: }; class datacenter_write_response_handler : public abstract_write_response_handler { - void signal(gms::inet_address from) override { - if (is_me(from) || db::is_local(from)) { - abstract_write_response_handler::signal(); - } + bool waited_for(gms::inet_address from) override { + return is_me(from) || db::is_local(from); } + public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {} + std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { + _total_endpoints = db::count_local_endpoints(_targets); + } }; class write_response_handler : public abstract_write_response_handler { + bool waited_for(gms::inet_address from) override { + return true; + } public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) {} + std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) { + _total_endpoints = _targets.size(); + } }; class datacenter_sync_write_response_handler : public abstract_write_response_handler { - std::unordered_map _dc_responses; - void signal(gms::inet_address from) override { + struct dc_info { + size_t acks; + size_t total_block_for; + size_t total_endpoints; + size_t failures; + }; + std::unordered_map _dc_responses; + bool waited_for(gms::inet_address from) override { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); sstring data_center = snitch_ptr->get_datacenter(from); auto dc_resp = _dc_responses.find(data_center); - if (dc_resp->second > 0) { - --dc_resp->second; - abstract_write_response_handler::signal(); + if (dc_resp->second.acks < dc_resp->second.total_block_for) { + ++dc_resp->second.acks; + return true; } + return false; } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, @@ -342,11 +378,27 @@ public: auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (const gms::inet_address& ep){ return snitch_ptr->get_datacenter(ep) == dc; }); - _dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc); - _pending_endpoints += pending_for_dc; + size_t total_endpoints_for_dc = boost::range::count_if(targets, [&snitch_ptr, &dc] (const gms::inet_address& ep){ + return snitch_ptr->get_datacenter(ep) == dc; + }); + _dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0}); + _total_block_for += pending_for_dc; } } } + bool failure(gms::inet_address from, size_t count) override { + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + const sstring& dc = snitch_ptr->get_datacenter(from); + auto dc_resp = _dc_responses.find(dc); + + dc_resp->second.failures += count; + _failed += count; + if (dc_resp->second.total_block_for + dc_resp->second.failures > dc_resp->second.total_endpoints) { + _error = error::FAILURE; + return true; + } + return false; + } }; bool storage_proxy::need_throttle_writes() const { @@ -400,6 +452,16 @@ void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_a } } +void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count) { + auto it = _response_handlers.find(id); + if (it != _response_handlers.end()) { + tracing::trace(it->second.handler->get_trace_state(), "Got {} failures from /{}", count, from); + if (it->second.handler->failure(from, count)) { + remove_response_handler(id); // last one, remove entry. Will cancel expiration timer too. + } + } +} + future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_type::time_point timeout) { auto& e = _response_handlers.find(id)->second; @@ -1539,6 +1601,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto coordinator = forward.back(); forward.pop_back(); + size_t forward_size = forward.size(); future<> f = make_ready_future<>(); @@ -1560,8 +1623,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } } - f.handle_exception([coordinator, p = shared_from_this()] (std::exception_ptr eptr) { + f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this()] (std::exception_ptr eptr) { ++p->_stats.writes_errors.get_ep_stat(coordinator); + p->got_failure_response(response_id, coordinator, forward_size + 1); try { std::rethrow_exception(eptr); } catch(rpc::closed_error&) { @@ -1790,13 +1854,20 @@ protected: db::consistency_level _cl; size_t _targets_count; promise<> _done_promise; // all target responded - bool _timedout = false; // will be true if request timeouts + bool _request_failed = false; // will be true if request fails or timeouts timer _timeout; - size_t _responses = 0; schema_ptr _schema; + size_t _failed = 0; - virtual void on_timeout() {} + virtual void on_failure(std::exception_ptr ex) = 0; + virtual void on_timeout() = 0; virtual size_t response_count() const = 0; + virtual void fail_request(std::exception_ptr ex) { + _request_failed = true; + _done_promise.set_exception(ex); + _timeout.cancel(); + on_failure(ex); + } public: abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, storage_proxy::clock_type::time_point timeout) : _cl(cl) @@ -1804,17 +1875,16 @@ public: , _schema(std::move(schema)) { _timeout.set_callback([this] { - _timedout = true; - _done_promise.set_exception(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, _responses != 0)); on_timeout(); }); _timeout.arm(timeout); } virtual ~abstract_read_resolver() {}; + virtual void on_error(gms::inet_address ep) = 0; future<> done() { return _done_promise.get_future(); } - virtual void error(gms::inet_address ep, std::exception_ptr eptr) { + void error(gms::inet_address ep, std::exception_ptr eptr) { sstring why; try { std::rethrow_exception(eptr); @@ -1828,7 +1898,10 @@ public: why = "Unknown exception"; } - // do nothing other than log for now, request will timeout eventually + if (!_request_failed) { // request may fail only once. + on_error(ep); + } + slogger.error("Exception when communicating with {}: {}", ep, why); } }; @@ -1841,10 +1914,14 @@ class digest_read_resolver : public abstract_read_resolver { foreign_ptr> _data_result; std::vector _digest_results; api::timestamp_type _last_modified = api::missing_timestamp; + size_t _target_count_for_cl; // _target_count_for_cl < _targets_count if CL=LOCAL and RRD.GLOBAL - virtual void on_timeout() override { + void on_timeout() override { + fail_request(std::make_exception_ptr(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _block_for, _data_result))); + } + void on_failure(std::exception_ptr ex) override { if (!_cl_reported) { - _cl_promise.set_exception(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _block_for, _data_result)); + _cl_promise.set_exception(ex); } // we will not need them any more _data_result = foreign_ptr>(); @@ -1854,9 +1931,10 @@ class digest_read_resolver : public abstract_read_resolver { return _digest_results.size(); } public: - digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {} + digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), + _block_for(block_for), _target_count_for_cl(target_count_for_cl) {} void add_data(gms::inet_address from, foreign_ptr> result) { - if (!_timedout) { + if (!_request_failed) { // if only one target was queried digest_check() will be skipped so we can also skip digest calculation _digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : *result->digest()); _last_modified = std::max(_last_modified, result->last_modified()); @@ -1867,7 +1945,7 @@ public: } } void add_digest(gms::inet_address from, query::result_digest digest, api::timestamp_type last_modified) { - if (!_timedout) { + if (!_request_failed) { _digest_results.emplace_back(std::move(digest)); _last_modified = std::max(_last_modified, last_modified); got_response(from); @@ -1899,6 +1977,14 @@ public: _done_promise.set_value(); } } + void on_error(gms::inet_address ep) override { + if (waiting_for(ep)) { + _failed++; + } + if (_block_for + _failed > _target_count_for_cl) { + fail_request(std::make_exception_ptr(read_failure_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _failed, _block_for, _data_result))); + } + } future>, bool> has_cl() { return _cl_promise.get_future(); } @@ -1991,10 +2077,14 @@ class data_read_resolver : public abstract_read_resolver { std::vector _data_results; std::unordered_map>> _diffs; private: - virtual void on_timeout() override { + void on_timeout() override { + fail_request(std::make_exception_ptr(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0))); + } + void on_failure(std::exception_ptr ex) override { // we will not need them any more _data_results.clear(); } + virtual size_t response_count() const override { return _data_results.size(); } @@ -2207,7 +2297,7 @@ public: _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { - if (!_timedout) { + if (!_request_failed) { _max_live_count = std::max(result->row_count(), _max_live_count); _data_results.emplace_back(std::move(from), std::move(result)); if (_data_results.size() == _targets_count) { @@ -2216,6 +2306,9 @@ public: } } } + void on_error(gms::inet_address ep) override { + fail_request(std::make_exception_ptr(read_failure_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), 1, _targets_count, response_count() != 0))); + } uint32_t max_live_count() const { return _max_live_count; } @@ -2620,7 +2713,8 @@ protected: public: virtual future>> execute(storage_proxy::clock_type::time_point timeout) { - digest_resolver_ptr digest_resolver = ::make_shared(_schema, _cl, _block_for, timeout); + digest_resolver_ptr digest_resolver = ::make_shared(_schema, _cl, _block_for, + db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); make_requests(digest_resolver, timeout).finally([exec]() { @@ -3675,7 +3769,8 @@ void storage_proxy::init_messaging_service() { timeout = *t; } - return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr& p) mutable { + return do_with(std::move(in), get_local_shared_storage_proxy(), size_t(0), + [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr& p, size_t& errors) mutable { ++p->_stats.received_mutations; p->_stats.forwarded_mutations += forward.size(); return when_all( @@ -3696,7 +3791,7 @@ void storage_proxy::init_messaging_service() { return ms.send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, shard, response_id).then_wrapped([] (future<> f) { f.ignore_ready_future(); }); - }).handle_exception([reply_to, shard, &p] (std::exception_ptr eptr) { + }).handle_exception([reply_to, shard, &p, &errors] (std::exception_ptr eptr) { seastar::log_level l = seastar::log_level::warn; try { std::rethrow_exception(eptr); @@ -3708,21 +3803,35 @@ void storage_proxy::init_messaging_service() { // ignore } slogger.log(l, "Failed to apply mutation from {}#{}: {}", reply_to, shard, eptr); + errors++; }), - parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout] (gms::inet_address forward) { + parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout, &errors] (gms::inet_address forward) { auto& ms = netw::get_local_messaging_service(); tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward); - return ms.send_mutation(netw::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) { + return ms.send_mutation(netw::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p, &errors] (future<> f) { if (f.failed()) { ++p->_stats.forwarding_errors; + errors++; }; f.ignore_ready_future(); }); }) - ).then_wrapped([trace_state_ptr] (future, future<>>>&& f) { - // ignore ressult, since we'll be returning them via MUTATION_DONE verbs - tracing::trace(trace_state_ptr, "Mutation handling is done"); - return netw::messaging_service::no_wait(); + ).then_wrapped([trace_state_ptr, reply_to, shard, response_id, &errors] (future, future<>>>&& f) { + // ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs + auto fut = make_ready_future(netw::messaging_service::no_wait()); + if (errors) { + if (get_local_storage_service().node_supports_write_failure_reply(reply_to)) { + tracing::trace(trace_state_ptr, "Sending mutation_failure with {} failures to /{}", errors, reply_to); + auto& ms = netw::get_local_messaging_service(); + fut = ms.send_mutation_failed(netw::messaging_service::msg_addr{reply_to, shard}, shard, response_id, errors).then_wrapped([] (future<> f) { + f.ignore_ready_future(); + return netw::messaging_service::no_wait(); + }); + } + } + return fut.finally([trace_state_ptr] { + tracing::trace(trace_state_ptr, "Mutation handling is done"); + }); }); }); }); @@ -3733,6 +3842,13 @@ void storage_proxy::init_messaging_service() { return netw::messaging_service::no_wait(); }); }); + ms.register_mutation_failed([] (const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, size_t num_failed) { + auto& from = cinfo.retrieve_auxiliary("baddr"); + return get_storage_proxy().invoke_on(shard, [from, response_id, num_failed] (storage_proxy& sp) { + sp.got_failure_response(response_id, from, num_failed); + return netw::messaging_service::no_wait(); + }); + }); ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional oda) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = netw::messaging_service::get_source(cinfo); @@ -3840,6 +3956,7 @@ void storage_proxy::uninit_messaging_service() { auto& ms = netw::get_local_messaging_service(); ms.unregister_mutation(); ms.unregister_mutation_done(); + ms.unregister_mutation_failed(); ms.unregister_read_data(); ms.unregister_read_mutation_data(); ms.unregister_read_digest(); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 0bb4f50332..15aff58f9d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -229,6 +229,7 @@ private: response_id_type register_response_handler(shared_ptr&& h); void remove_response_handler(response_id_type id); void got_response(response_id_type id, gms::inet_address from); + void got_failure_response(response_id_type id, gms::inet_address from, size_t count); future<> response_wait(response_id_type id, clock_type::time_point timeout); ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, diff --git a/service/storage_service.cc b/service/storage_service.cc index 4116809530..88af2ba710 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -90,6 +90,7 @@ static const sstring DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION static const sstring CORRECT_COUNTER_ORDER_FEATURE = "CORRECT_COUNTER_ORDER"; static const sstring SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3"; static const sstring CORRECT_NON_COMPOUND_RANGE_TOMBSTONES = "CORRECT_NON_COMPOUND_RANGE_TOMBSTONES"; +static const sstring WRITE_FAILURE_REPLY_FEATURE = "WRITE_FAILURE_REPLY"; distributed _the_storage_service; @@ -136,6 +137,7 @@ sstring storage_service::get_config_supported_features() { CORRECT_COUNTER_ORDER_FEATURE, SCHEMA_TABLES_V3, CORRECT_NON_COMPOUND_RANGE_TOMBSTONES, + WRITE_FAILURE_REPLY_FEATURE, }; if (service::get_local_storage_service()._db.local().get_config().experimental()) { features.push_back(MATERIALIZED_VIEWS_FEATURE); @@ -347,6 +349,7 @@ void storage_service::register_features() { _correct_counter_order_feature = gms::feature(CORRECT_COUNTER_ORDER_FEATURE); _schema_tables_v3 = gms::feature(SCHEMA_TABLES_V3); _correct_non_compound_range_tombstones = gms::feature(CORRECT_NON_COMPOUND_RANGE_TOMBSTONES); + _write_failure_reply_feature = gms::feature(WRITE_FAILURE_REPLY_FEATURE); if (_db.local().get_config().experimental()) { _materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE); diff --git a/service/storage_service.hh b/service/storage_service.hh index 4ddfa485cc..c5d85edcdd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -270,6 +270,7 @@ private: gms::feature _correct_counter_order_feature; gms::feature _schema_tables_v3; gms::feature _correct_non_compound_range_tombstones; + gms::feature _write_failure_reply_feature; public: void enable_all_features() { _range_tombstones_feature.enable(); @@ -281,6 +282,7 @@ public: _correct_counter_order_feature.enable(); _schema_tables_v3.enable(); _correct_non_compound_range_tombstones.enable(); + _write_failure_reply_feature.enable(); } void finish_bootstrapping() { @@ -2249,6 +2251,10 @@ public: bool cluster_supports_reading_correctly_serialized_range_tombstones() const { return bool(_correct_non_compound_range_tombstones); } + + bool node_supports_write_failure_reply(gms::inet_address ep) const { + return gms::get_local_gossiper().node_has_feature(ep, _write_failure_reply_feature); + } }; inline future<> init_storage_service(distributed& db, sharded& auth_service) { diff --git a/transport/server.cc b/transport/server.cc index 19bd48deba..417d44d8ad 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -539,8 +539,12 @@ future return make_ready_future(std::make_pair(make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state()), client_state)); } catch (const exceptions::read_timeout_exception& ex) { return make_ready_future(std::make_pair(make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state()), client_state)); + } catch (const exceptions::read_failure_exception& ex) { + return make_ready_future(std::make_pair(make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state()), client_state)); } catch (const exceptions::mutation_write_timeout_exception& ex) { return make_ready_future(std::make_pair(make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state()), client_state)); + } catch (const exceptions::mutation_write_failure_exception& ex) { + return make_ready_future(std::make_pair(make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state()), client_state)); } catch (const exceptions::already_exists_exception& ex) { return make_ready_future(std::make_pair(make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state()), client_state)); } catch (const exceptions::prepared_query_not_found_exception& ex) { @@ -1029,6 +1033,22 @@ shared_ptr cql_server::connection::make_read_timeout_error return response; } +shared_ptr cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) +{ + if (_version < 4) { + return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state); + } + auto response = make_shared(stream, cql_binary_opcode::ERROR, tr_state); + response->write_int(static_cast(err)); + response->write_string(msg); + response->write_consistency(cl); + response->write_int(received); + response->write_int(blockfor); + response->write_int(numfailures); + response->write_byte(data_present); + return response; +} + shared_ptr cql_server::connection::make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) { auto response = make_shared(stream, cql_binary_opcode::ERROR, tr_state); @@ -1041,6 +1061,22 @@ shared_ptr cql_server::connection::make_mutation_write_tim return response; } +shared_ptr cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) +{ + if (_version < 4) { + return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state); + } + auto response = make_shared(stream, cql_binary_opcode::ERROR, tr_state); + response->write_int(static_cast(err)); + response->write_string(msg); + response->write_consistency(cl); + response->write_int(received); + response->write_int(blockfor); + response->write_int(numfailures); + response->write_string(sprint("%s", type)); + return response; +} + shared_ptr cql_server::connection::make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) { auto response = make_shared(stream, cql_binary_opcode::ERROR, tr_state); diff --git a/transport/server.hh b/transport/server.hh index 7cfcbcb1e5..d2a2ef5f98 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -185,7 +185,9 @@ private: shared_ptr make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state); shared_ptr make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state); + shared_ptr make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state); shared_ptr make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state); + shared_ptr make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state); shared_ptr make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state); shared_ptr make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state); shared_ptr make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state);