Merge "storage_proxy: allow fail request earlier if CL cannot be reached due to errors" from Gleb

"This is CASSANDRA-7886 and CASSANDRA-8592. The patch series detects
that CL of a request can no longer be reached due to errors and fails
the request earlier. New type of errors are reported: read/write failure
which were introduced in cql v4 protocol. For compatibility if older
protocol is used the error is translated to timeout error."

* 'gleb/request-failure_v2' of github.com:scylladb/seastar-dev:
  storage_proxy: fail read/write requests early if it cannot be completed due to errors
  storage_service: add WRITE_FAILURE_REPLY_FEATURE feature
  gossiper: add node_has_feature() function
  cql: add read/write failure exceptions
  storage_proxy: fix data presence reporting in read timeout error during
  storage_proxy: remove inheritance from enable_shared_from_this for abstract_write_response_handler
  storage_proxy: remove unneeded field in abstract_write_response_handler
  storage_proxy: fix pending endpoint accounting for EACH_QUORUM
  consistency_level: constify quorum_for() and local_quorum_for()
This commit is contained in:
Avi Kivity
2017-12-06 16:17:19 +02:00
13 changed files with 280 additions and 53 deletions

View File

@@ -59,18 +59,18 @@ namespace db {
logging::logger cl_logger("consistency");
size_t quorum_for(keyspace& ks) {
size_t quorum_for(const keyspace& ks) {
return (ks.get_replication_strategy().get_replication_factor() / 2) + 1;
}
size_t local_quorum_for(keyspace& ks, const sstring& dc) {
size_t local_quorum_for(const keyspace& ks, const sstring& dc) {
using namespace locator;
auto& rs = ks.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
network_topology_strategy* nrs =
static_cast<network_topology_strategy*>(&rs);
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
return (nrs->get_replication_factor(dc) / 2) + 1;
}

View File

@@ -56,9 +56,9 @@ namespace db {
extern logging::logger cl_logger;
size_t quorum_for(keyspace& ks);
size_t quorum_for(const keyspace& ks);
size_t local_quorum_for(keyspace& ks, const sstring& dc);
size_t local_quorum_for(const keyspace& ks, const sstring& dc);
size_t block_for_local_serial(keyspace& ks);

View File

@@ -62,7 +62,9 @@ enum class exception_code : int32_t {
IS_BOOTSTRAPPING= 0x1002,
TRUNCATE_ERROR = 0x1003,
WRITE_TIMEOUT = 0x1100,
WRITE_FAILURE = 0x1500,
READ_TIMEOUT = 0x1200,
READ_FAILURE = 0x1300,
// 2xx: problem validating the request
SYNTAX_ERROR = 0x2000,
@@ -161,6 +163,40 @@ struct mutation_write_timeout_exception : public request_timeout_exception {
{ }
};
class request_failure_exception : public cassandra_exception {
public:
db::consistency_level consistency;
int32_t received;
int32_t failures;
int32_t block_for;
protected:
request_failure_exception(exception_code code, const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_) noexcept
: cassandra_exception{code, prepare_message("Operation failed for %s.%s - received %d responses and %d failures from %d CL=%s.", ks, cf, received_, failures_, block_for_, consistency_)}
, consistency{consistency_}
, received{received_}
, failures{failures_}
, block_for{block_for_}
{}
};
struct mutation_write_failure_exception : public request_failure_exception {
db::write_type type;
mutation_write_failure_exception(const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_, db::write_type type_) noexcept :
request_failure_exception(exception_code::WRITE_FAILURE, ks, cf, consistency_, received_, failures_, block_for_)
, type{std::move(type_)}
{ }
};
struct read_failure_exception : public request_failure_exception {
bool data_present;
read_failure_exception(const sstring& ks, const sstring& cf, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_, bool data_present_) noexcept
: request_failure_exception{exception_code::READ_FAILURE, ks, cf, consistency_, received_, failures_, block_for_}
, data_present{data_present_}
{ }
};
struct overloaded_exception : public cassandra_exception {
overloaded_exception(size_t c) noexcept :
cassandra_exception(exception_code::OVERLOADED, prepare_message("Too many in flight hints: %lu", c)) {}

View File

@@ -1930,6 +1930,14 @@ std::set<sstring> gossiper::get_supported_features(inet_address endpoint) const
return to_feature_set(app_state->value);
}
bool gossiper::node_has_feature(inet_address endpoint, const feature& f) const {
auto app_state = get_application_state_ptr(endpoint, application_state::SUPPORTED_FEATURES);
if (!app_state) {
return {};
}
return app_state->value.find(f.name()) != sstring::npos;
}
std::set<sstring> gossiper::get_supported_features() const {
std::unordered_map<inet_address, std::set<sstring>> features_map;
std::set<sstring> common_features;

View File

@@ -562,6 +562,8 @@ public:
void check_knows_remote_features(sstring local_features_string) const;
void check_knows_remote_features(sstring local_features_string, std::unordered_map<inet_address, sstring> peer_features_string) const;
void maybe_enable_features();
// Return true if the feature is present on the node
bool node_has_feature(inet_address endpoint, const feature& f) const;
private:
void register_feature(feature* f);
void unregister_feature(feature* f);

View File

@@ -401,7 +401,7 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
verb == messaging_verb::STREAM_MUTATION_DONE ||
verb == messaging_verb::COMPLETE_MESSAGE) {
idx = 2;
} else if (verb == messaging_verb::MUTATION_DONE) {
} else if (verb == messaging_verb::MUTATION_DONE || verb == messaging_verb::MUTATION_FAILED) {
idx = 3;
}
return idx;
@@ -811,6 +811,16 @@ future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, resp
return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id));
}
void messaging_service::register_mutation_failed(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed)>&& func) {
register_handler(this, netw::messaging_verb::MUTATION_FAILED, std::move(func));
}
void messaging_service::unregister_mutation_failed() {
_rpc->unregister_handler(netw::messaging_verb::MUTATION_FAILED);
}
future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed) {
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), std::move(shard), std::move(response_id), num_failed);
}
void messaging_service::register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
register_handler(this, netw::messaging_verb::READ_DATA, std::move(func));
}

View File

@@ -108,7 +108,8 @@ enum class messaging_verb : int32_t {
GET_SCHEMA_VERSION = 21,
SCHEMA_CHECK = 22,
COUNTER_MUTATION = 23,
LAST = 24,
MUTATION_FAILED = 24,
LAST = 25,
};
} // namespace netw
@@ -311,6 +312,11 @@ public:
void unregister_mutation_done();
future<> send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id);
// Wrapper for MUTATION_FAILED
void register_mutation_failed(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed)>&& func);
void unregister_mutation_failed();
future<> send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed);
// Wrapper for READ_DATA
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
void register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);

View File

@@ -180,7 +180,7 @@ public:
}
};
class abstract_write_response_handler : public enable_shared_from_this<abstract_write_response_handler> {
class abstract_write_response_handler {
protected:
storage_proxy::response_id_type _id;
promise<> _ready; // available when cl is achieved
@@ -191,29 +191,40 @@ protected:
db::write_type _type;
std::unique_ptr<mutation_holder> _mutation_holder;
std::unordered_set<gms::inet_address> _targets; // who we sent this mutation to
size_t _pending_endpoints; // how many endpoints in bootstrap state there is
// added dead_endpoints as a memeber here as well. This to be able to carry the info across
// calls in helper methods in a convinient way. Since we hope this will be empty most of the time
// it should not be a huge burden. (flw)
std::vector<gms::inet_address> _dead_endpoints;
size_t _cl_acks = 0;
bool _cl_achieved = false;
bool _timedout = false;
bool _throttled = false;
enum class error : uint8_t {
NONE,
TIMEOUT,
FAILURE,
};
error _error = error::NONE;
size_t _failed = 0;
size_t _total_endpoints = 0;
protected:
virtual bool waited_for(gms::inet_address from) = 0;
virtual void signal(gms::inet_address from) {
signal();
if (waited_for(from)) {
signal();
}
}
public:
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
: _id(p->_next_response_id++), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) {
_dead_endpoints(std::move(dead_endpoints)) {
// original comment from cassandra:
// during bootstrap, include pending endpoints in the count
// or we may fail the consistency level guarantees (see #833, #8058)
_total_block_for = db::block_for(ks, _cl) + _pending_endpoints;
_total_block_for = db::block_for(ks, _cl) + pending_endpoints;
++_proxy->_stats.writes;
}
virtual ~abstract_write_response_handler() {
@@ -226,8 +237,10 @@ public:
_proxy->_stats.background_write_bytes -= _mutation_holder->size();
_proxy->unthrottle();
}
} else if (_timedout) {
} else if (_error == error::TIMEOUT) {
_ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _total_block_for, _type));
} else if (_error == error::FAILURE) {
_ready.set_exception(mutation_write_failure_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _failed, _total_block_for, _type));
}
};
bool is_counter() const {
@@ -252,11 +265,21 @@ public:
}
}
}
virtual bool failure(gms::inet_address from, size_t count) {
if (waited_for(from)) {
_failed += count;
if (_total_block_for + _failed > _total_endpoints) {
_error = error::FAILURE;
return true;
}
}
return false;
}
void on_timeout() {
if (_cl_achieved) {
slogger.trace("Write is not acknowledged by {} replicas after achieving CL", get_targets());
}
_timedout = true;
_error = error::TIMEOUT;
}
// return true on last ack
bool response(gms::inet_address from) {
@@ -294,39 +317,52 @@ public:
};
class datacenter_write_response_handler : public abstract_write_response_handler {
void signal(gms::inet_address from) override {
if (is_me(from) || db::is_local(from)) {
abstract_write_response_handler::signal();
}
bool waited_for(gms::inet_address from) override {
return is_me(from) || db::is_local(from);
}
public:
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {}
std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
_total_endpoints = db::count_local_endpoints(_targets);
}
};
class write_response_handler : public abstract_write_response_handler {
bool waited_for(gms::inet_address from) override {
return true;
}
public:
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) {}
std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) {
_total_endpoints = _targets.size();
}
};
class datacenter_sync_write_response_handler : public abstract_write_response_handler {
std::unordered_map<sstring, size_t> _dc_responses;
void signal(gms::inet_address from) override {
struct dc_info {
size_t acks;
size_t total_block_for;
size_t total_endpoints;
size_t failures;
};
std::unordered_map<sstring, dc_info> _dc_responses;
bool waited_for(gms::inet_address from) override {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
sstring data_center = snitch_ptr->get_datacenter(from);
auto dc_resp = _dc_responses.find(data_center);
if (dc_resp->second > 0) {
--dc_resp->second;
abstract_write_response_handler::signal();
if (dc_resp->second.acks < dc_resp->second.total_block_for) {
++dc_resp->second.acks;
return true;
}
return false;
}
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
@@ -342,11 +378,27 @@ public:
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (const gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc);
_pending_endpoints += pending_for_dc;
size_t total_endpoints_for_dc = boost::range::count_if(targets, [&snitch_ptr, &dc] (const gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0});
_total_block_for += pending_for_dc;
}
}
}
bool failure(gms::inet_address from, size_t count) override {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
const sstring& dc = snitch_ptr->get_datacenter(from);
auto dc_resp = _dc_responses.find(dc);
dc_resp->second.failures += count;
_failed += count;
if (dc_resp->second.total_block_for + dc_resp->second.failures > dc_resp->second.total_endpoints) {
_error = error::FAILURE;
return true;
}
return false;
}
};
bool storage_proxy::need_throttle_writes() const {
@@ -400,6 +452,16 @@ void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_a
}
}
void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count) {
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
tracing::trace(it->second.handler->get_trace_state(), "Got {} failures from /{}", count, from);
if (it->second.handler->failure(from, count)) {
remove_response_handler(id); // last one, remove entry. Will cancel expiration timer too.
}
}
}
future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_type::time_point timeout) {
auto& e = _response_handlers.find(id)->second;
@@ -1539,6 +1601,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto coordinator = forward.back();
forward.pop_back();
size_t forward_size = forward.size();
future<> f = make_ready_future<>();
@@ -1560,8 +1623,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
}
}
f.handle_exception([coordinator, p = shared_from_this()] (std::exception_ptr eptr) {
f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this()] (std::exception_ptr eptr) {
++p->_stats.writes_errors.get_ep_stat(coordinator);
p->got_failure_response(response_id, coordinator, forward_size + 1);
try {
std::rethrow_exception(eptr);
} catch(rpc::closed_error&) {
@@ -1790,13 +1854,20 @@ protected:
db::consistency_level _cl;
size_t _targets_count;
promise<> _done_promise; // all target responded
bool _timedout = false; // will be true if request timeouts
bool _request_failed = false; // will be true if request fails or timeouts
timer<storage_proxy::clock_type> _timeout;
size_t _responses = 0;
schema_ptr _schema;
size_t _failed = 0;
virtual void on_timeout() {}
virtual void on_failure(std::exception_ptr ex) = 0;
virtual void on_timeout() = 0;
virtual size_t response_count() const = 0;
virtual void fail_request(std::exception_ptr ex) {
_request_failed = true;
_done_promise.set_exception(ex);
_timeout.cancel();
on_failure(ex);
}
public:
abstract_read_resolver(schema_ptr schema, db::consistency_level cl, size_t target_count, storage_proxy::clock_type::time_point timeout)
: _cl(cl)
@@ -1804,17 +1875,16 @@ public:
, _schema(std::move(schema))
{
_timeout.set_callback([this] {
_timedout = true;
_done_promise.set_exception(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, _responses != 0));
on_timeout();
});
_timeout.arm(timeout);
}
virtual ~abstract_read_resolver() {};
virtual void on_error(gms::inet_address ep) = 0;
future<> done() {
return _done_promise.get_future();
}
virtual void error(gms::inet_address ep, std::exception_ptr eptr) {
void error(gms::inet_address ep, std::exception_ptr eptr) {
sstring why;
try {
std::rethrow_exception(eptr);
@@ -1828,7 +1898,10 @@ public:
why = "Unknown exception";
}
// do nothing other than log for now, request will timeout eventually
if (!_request_failed) { // request may fail only once.
on_error(ep);
}
slogger.error("Exception when communicating with {}: {}", ep, why);
}
};
@@ -1841,10 +1914,14 @@ class digest_read_resolver : public abstract_read_resolver {
foreign_ptr<lw_shared_ptr<query::result>> _data_result;
std::vector<query::result_digest> _digest_results;
api::timestamp_type _last_modified = api::missing_timestamp;
size_t _target_count_for_cl; // _target_count_for_cl < _targets_count if CL=LOCAL and RRD.GLOBAL
virtual void on_timeout() override {
void on_timeout() override {
fail_request(std::make_exception_ptr(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _block_for, _data_result)));
}
void on_failure(std::exception_ptr ex) override {
if (!_cl_reported) {
_cl_promise.set_exception(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _block_for, _data_result));
_cl_promise.set_exception(ex);
}
// we will not need them any more
_data_result = foreign_ptr<lw_shared_ptr<query::result>>();
@@ -1854,9 +1931,10 @@ class digest_read_resolver : public abstract_read_resolver {
return _digest_results.size();
}
public:
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout), _block_for(block_for) {}
digest_read_resolver(schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout),
_block_for(block_for), _target_count_for_cl(target_count_for_cl) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
if (!_request_failed) {
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
_digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : *result->digest());
_last_modified = std::max(_last_modified, result->last_modified());
@@ -1867,7 +1945,7 @@ public:
}
}
void add_digest(gms::inet_address from, query::result_digest digest, api::timestamp_type last_modified) {
if (!_timedout) {
if (!_request_failed) {
_digest_results.emplace_back(std::move(digest));
_last_modified = std::max(_last_modified, last_modified);
got_response(from);
@@ -1899,6 +1977,14 @@ public:
_done_promise.set_value();
}
}
void on_error(gms::inet_address ep) override {
if (waiting_for(ep)) {
_failed++;
}
if (_block_for + _failed > _target_count_for_cl) {
fail_request(std::make_exception_ptr(read_failure_exception(_schema->ks_name(), _schema->cf_name(), _cl, _cl_responses, _failed, _block_for, _data_result)));
}
}
future<foreign_ptr<lw_shared_ptr<query::result>>, bool> has_cl() {
return _cl_promise.get_future();
}
@@ -1991,10 +2077,14 @@ class data_read_resolver : public abstract_read_resolver {
std::vector<reply> _data_results;
std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>> _diffs;
private:
virtual void on_timeout() override {
void on_timeout() override {
fail_request(std::make_exception_ptr(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0)));
}
void on_failure(std::exception_ptr ex) override {
// we will not need them any more
_data_results.clear();
}
virtual size_t response_count() const override {
return _data_results.size();
}
@@ -2207,7 +2297,7 @@ public:
_data_results.reserve(targets_count);
}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
if (!_timedout) {
if (!_request_failed) {
_max_live_count = std::max(result->row_count(), _max_live_count);
_data_results.emplace_back(std::move(from), std::move(result));
if (_data_results.size() == _targets_count) {
@@ -2216,6 +2306,9 @@ public:
}
}
}
void on_error(gms::inet_address ep) override {
fail_request(std::make_exception_ptr(read_failure_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), 1, _targets_count, response_count() != 0)));
}
uint32_t max_live_count() const {
return _max_live_count;
}
@@ -2620,7 +2713,8 @@ protected:
public:
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for, timeout);
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for,
db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout);
auto exec = shared_from_this();
make_requests(digest_resolver, timeout).finally([exec]() {
@@ -3675,7 +3769,8 @@ void storage_proxy::init_messaging_service() {
timeout = *t;
}
return do_with(std::move(in), get_local_shared_storage_proxy(), [src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) mutable {
return do_with(std::move(in), get_local_shared_storage_proxy(), size_t(0),
[src_addr = std::move(src_addr), &cinfo, forward = std::move(forward), reply_to, shard, response_id, trace_state_ptr, timeout] (const frozen_mutation& m, shared_ptr<storage_proxy>& p, size_t& errors) mutable {
++p->_stats.received_mutations;
p->_stats.forwarded_mutations += forward.size();
return when_all(
@@ -3696,7 +3791,7 @@ void storage_proxy::init_messaging_service() {
return ms.send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, shard, response_id).then_wrapped([] (future<> f) {
f.ignore_ready_future();
});
}).handle_exception([reply_to, shard, &p] (std::exception_ptr eptr) {
}).handle_exception([reply_to, shard, &p, &errors] (std::exception_ptr eptr) {
seastar::log_level l = seastar::log_level::warn;
try {
std::rethrow_exception(eptr);
@@ -3708,21 +3803,35 @@ void storage_proxy::init_messaging_service() {
// ignore
}
slogger.log(l, "Failed to apply mutation from {}#{}: {}", reply_to, shard, eptr);
errors++;
}),
parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout] (gms::inet_address forward) {
parallel_for_each(forward.begin(), forward.end(), [reply_to, shard, response_id, &m, &p, trace_state_ptr, timeout, &errors] (gms::inet_address forward) {
auto& ms = netw::get_local_messaging_service();
tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward);
return ms.send_mutation(netw::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p] (future<> f) {
return ms.send_mutation(netw::messaging_service::msg_addr{forward, 0}, timeout, m, {}, reply_to, shard, response_id, tracing::make_trace_info(trace_state_ptr)).then_wrapped([&p, &errors] (future<> f) {
if (f.failed()) {
++p->_stats.forwarding_errors;
errors++;
};
f.ignore_ready_future();
});
})
).then_wrapped([trace_state_ptr] (future<std::tuple<future<>, future<>>>&& f) {
// ignore ressult, since we'll be returning them via MUTATION_DONE verbs
tracing::trace(trace_state_ptr, "Mutation handling is done");
return netw::messaging_service::no_wait();
).then_wrapped([trace_state_ptr, reply_to, shard, response_id, &errors] (future<std::tuple<future<>, future<>>>&& f) {
// ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs
auto fut = make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
if (errors) {
if (get_local_storage_service().node_supports_write_failure_reply(reply_to)) {
tracing::trace(trace_state_ptr, "Sending mutation_failure with {} failures to /{}", errors, reply_to);
auto& ms = netw::get_local_messaging_service();
fut = ms.send_mutation_failed(netw::messaging_service::msg_addr{reply_to, shard}, shard, response_id, errors).then_wrapped([] (future<> f) {
f.ignore_ready_future();
return netw::messaging_service::no_wait();
});
}
}
return fut.finally([trace_state_ptr] {
tracing::trace(trace_state_ptr, "Mutation handling is done");
});
});
});
});
@@ -3733,6 +3842,13 @@ void storage_proxy::init_messaging_service() {
return netw::messaging_service::no_wait();
});
});
ms.register_mutation_failed([] (const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, size_t num_failed) {
auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return get_storage_proxy().invoke_on(shard, [from, response_id, num_failed] (storage_proxy& sp) {
sp.got_failure_response(response_id, from, num_failed);
return netw::messaging_service::no_wait();
});
});
ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
@@ -3840,6 +3956,7 @@ void storage_proxy::uninit_messaging_service() {
auto& ms = netw::get_local_messaging_service();
ms.unregister_mutation();
ms.unregister_mutation_done();
ms.unregister_mutation_failed();
ms.unregister_read_data();
ms.unregister_read_mutation_data();
ms.unregister_read_digest();

View File

@@ -229,6 +229,7 @@ private:
response_id_type register_response_handler(shared_ptr<abstract_write_response_handler>&& h);
void remove_response_handler(response_id_type id);
void got_response(response_id_type id, gms::inet_address from);
void got_failure_response(response_id_type id, gms::inet_address from, size_t count);
future<> response_wait(response_id_type id, clock_type::time_point timeout);
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, std::unordered_set<gms::inet_address> targets,

View File

@@ -90,6 +90,7 @@ static const sstring DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION
static const sstring CORRECT_COUNTER_ORDER_FEATURE = "CORRECT_COUNTER_ORDER";
static const sstring SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3";
static const sstring CORRECT_NON_COMPOUND_RANGE_TOMBSTONES = "CORRECT_NON_COMPOUND_RANGE_TOMBSTONES";
static const sstring WRITE_FAILURE_REPLY_FEATURE = "WRITE_FAILURE_REPLY";
distributed<storage_service> _the_storage_service;
@@ -136,6 +137,7 @@ sstring storage_service::get_config_supported_features() {
CORRECT_COUNTER_ORDER_FEATURE,
SCHEMA_TABLES_V3,
CORRECT_NON_COMPOUND_RANGE_TOMBSTONES,
WRITE_FAILURE_REPLY_FEATURE,
};
if (service::get_local_storage_service()._db.local().get_config().experimental()) {
features.push_back(MATERIALIZED_VIEWS_FEATURE);
@@ -347,6 +349,7 @@ void storage_service::register_features() {
_correct_counter_order_feature = gms::feature(CORRECT_COUNTER_ORDER_FEATURE);
_schema_tables_v3 = gms::feature(SCHEMA_TABLES_V3);
_correct_non_compound_range_tombstones = gms::feature(CORRECT_NON_COMPOUND_RANGE_TOMBSTONES);
_write_failure_reply_feature = gms::feature(WRITE_FAILURE_REPLY_FEATURE);
if (_db.local().get_config().experimental()) {
_materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);

View File

@@ -270,6 +270,7 @@ private:
gms::feature _correct_counter_order_feature;
gms::feature _schema_tables_v3;
gms::feature _correct_non_compound_range_tombstones;
gms::feature _write_failure_reply_feature;
public:
void enable_all_features() {
_range_tombstones_feature.enable();
@@ -281,6 +282,7 @@ public:
_correct_counter_order_feature.enable();
_schema_tables_v3.enable();
_correct_non_compound_range_tombstones.enable();
_write_failure_reply_feature.enable();
}
void finish_bootstrapping() {
@@ -2249,6 +2251,10 @@ public:
bool cluster_supports_reading_correctly_serialized_range_tombstones() const {
return bool(_correct_non_compound_range_tombstones);
}
bool node_supports_write_failure_reply(gms::inet_address ep) const {
return gms::get_local_gossiper().node_has_feature(ep, _write_failure_reply_feature);
}
};
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {

View File

@@ -539,8 +539,12 @@ future<response_type>
return make_ready_future<response_type>(std::make_pair(make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state()), client_state));
} catch (const exceptions::read_timeout_exception& ex) {
return make_ready_future<response_type>(std::make_pair(make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state()), client_state));
} catch (const exceptions::read_failure_exception& ex) {
return make_ready_future<response_type>(std::make_pair(make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state()), client_state));
} catch (const exceptions::mutation_write_timeout_exception& ex) {
return make_ready_future<response_type>(std::make_pair(make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state()), client_state));
} catch (const exceptions::mutation_write_failure_exception& ex) {
return make_ready_future<response_type>(std::make_pair(make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state()), client_state));
} catch (const exceptions::already_exists_exception& ex) {
return make_ready_future<response_type>(std::make_pair(make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state()), client_state));
} catch (const exceptions::prepared_query_not_found_exception& ex) {
@@ -1029,6 +1033,22 @@ shared_ptr<cql_server::response> cql_server::connection::make_read_timeout_error
return response;
}
shared_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state)
{
if (_version < 4) {
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
}
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_consistency(cl);
response->write_int(received);
response->write_int(blockfor);
response->write_int(numfailures);
response->write_byte(data_present);
return response;
}
shared_ptr<cql_server::response> cql_server::connection::make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state)
{
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
@@ -1041,6 +1061,22 @@ shared_ptr<cql_server::response> cql_server::connection::make_mutation_write_tim
return response;
}
shared_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state)
{
if (_version < 4) {
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
}
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_consistency(cl);
response->write_int(received);
response->write_int(blockfor);
response->write_int(numfailures);
response->write_string(sprint("%s", type));
return response;
}
shared_ptr<cql_server::response> cql_server::connection::make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state)
{
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);

View File

@@ -185,7 +185,9 @@ private:
shared_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state);
shared_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state);