|
|
|
|
@@ -1095,6 +1095,7 @@ protected:
|
|
|
|
|
storage_proxy::response_id_type _id;
|
|
|
|
|
promise<result<>> _ready; // available when cl is achieved
|
|
|
|
|
shared_ptr<storage_proxy> _proxy;
|
|
|
|
|
locator::effective_replication_map_ptr _effective_replication_map_ptr;
|
|
|
|
|
tracing::trace_state_ptr _trace_state;
|
|
|
|
|
db::consistency_level _cl;
|
|
|
|
|
size_t _total_block_for = 0;
|
|
|
|
|
@@ -1129,17 +1130,21 @@ protected:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
abstract_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
|
|
|
|
|
abstract_write_response_handler(shared_ptr<storage_proxy> p,
|
|
|
|
|
locator::effective_replication_map_ptr erm,
|
|
|
|
|
db::consistency_level cl, db::write_type type,
|
|
|
|
|
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state,
|
|
|
|
|
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, size_t pending_endpoints = 0,
|
|
|
|
|
inet_address_vector_topology_change dead_endpoints = {})
|
|
|
|
|
: _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
|
|
|
|
|
: _id(p->get_next_response_id()), _proxy(std::move(p))
|
|
|
|
|
, _effective_replication_map_ptr(std::move(erm))
|
|
|
|
|
, _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
|
|
|
|
|
_dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)),
|
|
|
|
|
_rate_limit_info(rate_limit_info) {
|
|
|
|
|
// original comment from cassandra:
|
|
|
|
|
// during bootstrap, include pending endpoints in the count
|
|
|
|
|
// or we may fail the consistency level guarantees (see #833, #8058)
|
|
|
|
|
_total_block_for = db::block_for(ks, _cl) + pending_endpoints;
|
|
|
|
|
_total_block_for = db::block_for(*_effective_replication_map_ptr, _cl) + pending_endpoints;
|
|
|
|
|
++_stats.writes;
|
|
|
|
|
}
|
|
|
|
|
virtual ~abstract_write_response_handler() {
|
|
|
|
|
@@ -1280,7 +1285,7 @@ public:
|
|
|
|
|
}
|
|
|
|
|
if (_cl_achieved) { // For CL=ANY this can still be false
|
|
|
|
|
for (auto&& ep : get_targets()) {
|
|
|
|
|
++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep);
|
|
|
|
|
++stats().background_replica_writes_failed.get_ep_stat(_effective_replication_map_ptr->get_topology(), ep);
|
|
|
|
|
}
|
|
|
|
|
stats().background_writes_failed += int(!_targets.empty());
|
|
|
|
|
}
|
|
|
|
|
@@ -1379,19 +1384,21 @@ public:
|
|
|
|
|
|
|
|
|
|
class datacenter_write_response_handler : public abstract_write_response_handler {
|
|
|
|
|
bool waited_for(gms::inet_address from) override {
|
|
|
|
|
const auto& topo = _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
const auto& topo = _effective_replication_map_ptr->get_topology();
|
|
|
|
|
return fbu::is_me(from) || (topo.get_datacenter(from) == topo.get_datacenter());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
datacenter_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
|
|
|
|
|
datacenter_write_response_handler(shared_ptr<storage_proxy> p,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
db::consistency_level cl, db::write_type type,
|
|
|
|
|
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
|
|
|
|
|
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
|
|
|
|
|
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
|
|
|
|
|
abstract_write_response_handler(p, ks, cl, type, std::move(mh),
|
|
|
|
|
abstract_write_response_handler(p, ermp, cl, type, std::move(mh), // can't move ermp, it's used below
|
|
|
|
|
std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info,
|
|
|
|
|
p->get_token_metadata_ptr()->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
|
|
|
|
|
_total_endpoints = p->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets);
|
|
|
|
|
ermp->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
|
|
|
|
|
_total_endpoints = _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -1400,11 +1407,13 @@ class write_response_handler : public abstract_write_response_handler {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
public:
|
|
|
|
|
write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
|
|
|
|
|
write_response_handler(shared_ptr<storage_proxy> p,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
db::consistency_level cl, db::write_type type,
|
|
|
|
|
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
|
|
|
|
|
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
|
|
|
|
|
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
|
|
|
|
|
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
|
|
|
|
|
abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh),
|
|
|
|
|
std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints)) {
|
|
|
|
|
_total_endpoints = _targets.size();
|
|
|
|
|
}
|
|
|
|
|
@@ -1412,11 +1421,13 @@ public:
|
|
|
|
|
|
|
|
|
|
class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
|
|
|
|
|
public:
|
|
|
|
|
view_update_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl,
|
|
|
|
|
view_update_write_response_handler(shared_ptr<storage_proxy> p,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
db::consistency_level cl,
|
|
|
|
|
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
|
|
|
|
|
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
|
|
|
|
|
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info):
|
|
|
|
|
write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh),
|
|
|
|
|
write_response_handler(p, std::move(ermp), cl, db::write_type::VIEW, std::move(mh),
|
|
|
|
|
std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info) {
|
|
|
|
|
register_in_intrusive_list(*p);
|
|
|
|
|
}
|
|
|
|
|
@@ -1481,7 +1492,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
|
|
|
|
|
};
|
|
|
|
|
std::unordered_map<sstring, dc_info> _dc_responses;
|
|
|
|
|
bool waited_for(gms::inet_address from) override {
|
|
|
|
|
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
auto& topology = _effective_replication_map_ptr->get_topology();
|
|
|
|
|
sstring data_center = topology.get_datacenter(from);
|
|
|
|
|
auto dc_resp = _dc_responses.find(data_center);
|
|
|
|
|
|
|
|
|
|
@@ -1492,12 +1503,13 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
public:
|
|
|
|
|
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
|
|
|
|
|
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type,
|
|
|
|
|
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints,
|
|
|
|
|
inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit,
|
|
|
|
|
db::per_partition_rate_limit::info rate_limit_info) :
|
|
|
|
|
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) {
|
|
|
|
|
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) {
|
|
|
|
|
auto* erm = _effective_replication_map_ptr.get();
|
|
|
|
|
auto& topology = erm->get_topology();
|
|
|
|
|
|
|
|
|
|
for (auto& target : targets) {
|
|
|
|
|
auto dc = topology.get_datacenter(target);
|
|
|
|
|
@@ -1509,13 +1521,13 @@ public:
|
|
|
|
|
size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){
|
|
|
|
|
return topology.get_datacenter(ep) == dc;
|
|
|
|
|
});
|
|
|
|
|
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0});
|
|
|
|
|
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(*erm, dc) + pending_for_dc, total_endpoints_for_dc, 0});
|
|
|
|
|
_total_block_for += pending_for_dc;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
bool failure(gms::inet_address from, size_t count, error err) override {
|
|
|
|
|
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
auto& topology = _effective_replication_map_ptr->get_topology();
|
|
|
|
|
const sstring& dc = topology.get_datacenter(from);
|
|
|
|
|
auto dc_resp = _dc_responses.find(dc);
|
|
|
|
|
|
|
|
|
|
@@ -2185,21 +2197,22 @@ future<result<>> storage_proxy::response_wait(storage_proxy::response_id_type id
|
|
|
|
|
return _response_handlers.find(id)->second;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
|
|
|
|
|
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
|
|
|
|
|
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
|
|
|
|
|
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
|
|
|
|
|
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info)
|
|
|
|
|
{
|
|
|
|
|
shared_ptr<abstract_write_response_handler> h;
|
|
|
|
|
auto& rs = ks.get_replication_strategy();
|
|
|
|
|
auto& rs = ermp->get_replication_strategy();
|
|
|
|
|
|
|
|
|
|
if (db::is_datacenter_local(cl)) {
|
|
|
|
|
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
|
|
|
|
|
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
} else if (type == db::write_type::VIEW) {
|
|
|
|
|
h = ::make_shared<view_update_write_response_handler>(shared_from_this(), ks, cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
h = ::make_shared<view_update_write_response_handler>(shared_from_this(), std::move(ermp), cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
} else {
|
|
|
|
|
h = ::make_shared<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
|
|
|
|
}
|
|
|
|
|
return bo::success(register_response_handler(std::move(h)));
|
|
|
|
|
}
|
|
|
|
|
@@ -2769,9 +2782,9 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
|
|
|
|
slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
|
|
|
|
|
tracing::trace(tr_state, "Creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
|
|
|
|
|
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints);
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);
|
|
|
|
|
|
|
|
|
|
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
|
|
|
|
|
return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
|
|
|
|
|
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2806,9 +2819,10 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
|
|
|
|
|
|
|
|
|
|
auto keyspace_name = mh->schema()->ks_name();
|
|
|
|
|
replica::keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
|
|
|
|
auto ermp = ks.get_effective_replication_map();
|
|
|
|
|
|
|
|
|
|
// No rate limiting for read repair
|
|
|
|
|
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
|
|
|
|
|
return create_write_response_handler(std::move(ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result<storage_proxy::response_id_type>
|
|
|
|
|
@@ -2830,9 +2844,10 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
|
|
|
|
|
|
|
|
|
|
auto keyspace_name = s->ks_name();
|
|
|
|
|
replica::keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
|
|
|
|
auto ermp = ks.get_effective_replication_map();
|
|
|
|
|
|
|
|
|
|
// No rate limiting for paxos (yet)
|
|
|
|
|
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
|
|
|
|
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
|
|
|
|
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2968,10 +2983,11 @@ future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, utils
|
|
|
|
|
|
|
|
|
|
gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& m, db::consistency_level cl) {
|
|
|
|
|
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
|
|
|
|
|
auto live_endpoints = get_live_endpoints(ks, m.token());
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
auto live_endpoints = get_live_endpoints(*erm, m.token());
|
|
|
|
|
|
|
|
|
|
if (live_endpoints.empty()) {
|
|
|
|
|
throw exceptions::unavailable_exception(cl, block_for(ks, cl), 0);
|
|
|
|
|
throw exceptions::unavailable_exception(cl, block_for(*erm, cl), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const auto my_address = utils::fb_utilities::get_broadcast_address();
|
|
|
|
|
@@ -2982,7 +2998,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation&
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const auto local_endpoints = boost::copy_range<inet_address_vector_replica_set>(live_endpoints
|
|
|
|
|
| boost::adaptors::filtered(get_token_metadata_ptr()->get_topology().get_local_dc_filter()));
|
|
|
|
|
| boost::adaptors::filtered(erm->get_topology().get_local_dc_filter()));
|
|
|
|
|
|
|
|
|
|
if (local_endpoints.empty()) {
|
|
|
|
|
// FIXME: O(n log n) to get maximum
|
|
|
|
|
@@ -3025,14 +3041,15 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
|
|
|
|
// Let's just use the schema of the first mutation in a vector.
|
|
|
|
|
auto handle_error = [this, sp = this->shared_from_this(), s = endpoint_and_mutations.second[0].s, cl, permit] (std::exception_ptr exp) {
|
|
|
|
|
auto& ks = _db.local().find_keyspace(s->ks_name());
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
try {
|
|
|
|
|
std::rethrow_exception(std::move(exp));
|
|
|
|
|
} catch (rpc::timeout_error&) {
|
|
|
|
|
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER));
|
|
|
|
|
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER));
|
|
|
|
|
} catch (timed_out_error&) {
|
|
|
|
|
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER));
|
|
|
|
|
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER));
|
|
|
|
|
} catch (rpc::closed_error&) {
|
|
|
|
|
return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(ks, cl), db::write_type::COUNTER));
|
|
|
|
|
return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -3065,7 +3082,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
|
|
|
|
inet_address_vector_topology_change pending_endpoints = erm->get_token_metadata_ptr()->pending_endpoints_for(token, ks_name);
|
|
|
|
|
|
|
|
|
|
if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) {
|
|
|
|
|
auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter();
|
|
|
|
|
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
|
|
|
|
|
auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(local_dc_filter)));
|
|
|
|
|
natural_endpoints.erase(itend, natural_endpoints.end());
|
|
|
|
|
itend = boost::range::remove_if(pending_endpoints, std::not_fn(std::cref(local_dc_filter)));
|
|
|
|
|
@@ -3346,7 +3363,8 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
|
|
|
|
|
future<result<>> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
|
|
|
|
|
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
|
|
|
|
|
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
|
|
|
|
|
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
|
|
|
|
|
auto ermp = ks.get_effective_replication_map();
|
|
|
|
|
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
|
|
|
|
|
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
|
|
|
|
|
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
|
|
|
|
|
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
|
|
|
|
|
@@ -3476,10 +3494,11 @@ future<> storage_proxy::send_to_endpoint(
|
|
|
|
|
std::back_inserter(dead_endpoints),
|
|
|
|
|
std::bind_front(&storage_proxy::is_alive, this));
|
|
|
|
|
auto& ks = _db.local().find_keyspace(m->schema()->ks_name());
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints);
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, ks, targets, pending_endpoints);
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints);
|
|
|
|
|
return create_write_response_handler(
|
|
|
|
|
ks,
|
|
|
|
|
std::move(erm),
|
|
|
|
|
cl,
|
|
|
|
|
type,
|
|
|
|
|
std::move(m),
|
|
|
|
|
@@ -3592,7 +3611,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
|
|
|
|
auto& stats = handler_ptr->stats();
|
|
|
|
|
auto& handler = *handler_ptr;
|
|
|
|
|
auto& global_stats = handler._proxy->_global_stats;
|
|
|
|
|
auto& topology = get_token_metadata_ptr()->get_topology();
|
|
|
|
|
auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology();
|
|
|
|
|
auto local_dc = topology.get_datacenter();
|
|
|
|
|
|
|
|
|
|
for(auto dest: handler.get_targets()) {
|
|
|
|
|
@@ -3645,9 +3664,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
|
|
|
|
got_response(response_id, coordinator, std::nullopt);
|
|
|
|
|
} else {
|
|
|
|
|
if (!handler.read_repair_write()) {
|
|
|
|
|
++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
|
|
|
|
|
++stats.writes_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
|
|
|
|
|
} else {
|
|
|
|
|
++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
|
|
|
|
|
++stats.read_repair_write_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (coordinator == my_address) {
|
|
|
|
|
@@ -3659,7 +3678,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
|
|
|
|
|
|
|
|
|
// Waited on indirectly.
|
|
|
|
|
(void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
|
|
|
|
|
++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator);
|
|
|
|
|
++stats.writes_errors.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
|
|
|
|
|
error err = error::FAILURE;
|
|
|
|
|
std::optional<sstring> msg;
|
|
|
|
|
if (try_catch<replica::rate_limit_exception>(eptr)) {
|
|
|
|
|
@@ -3801,6 +3820,7 @@ class digest_read_resolver : public abstract_read_resolver {
|
|
|
|
|
};
|
|
|
|
|
private:
|
|
|
|
|
shared_ptr<storage_proxy> _proxy;
|
|
|
|
|
locator::effective_replication_map_ptr _effective_replication_map_ptr;
|
|
|
|
|
size_t _block_for;
|
|
|
|
|
size_t _cl_responses = 0;
|
|
|
|
|
promise<result<digest_read_result>> _cl_promise; // cl is reached
|
|
|
|
|
@@ -3822,9 +3842,12 @@ private:
|
|
|
|
|
_digest_results.clear();
|
|
|
|
|
}
|
|
|
|
|
public:
|
|
|
|
|
digest_read_resolver(shared_ptr<storage_proxy> proxy, schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout)
|
|
|
|
|
digest_read_resolver(shared_ptr<storage_proxy> proxy,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout)
|
|
|
|
|
: abstract_read_resolver(std::move(schema), cl, 0, timeout)
|
|
|
|
|
, _proxy(std::move(proxy))
|
|
|
|
|
, _effective_replication_map_ptr(std::move(ermp))
|
|
|
|
|
, _block_for(block_for)
|
|
|
|
|
, _target_count_for_cl(target_count_for_cl)
|
|
|
|
|
{}
|
|
|
|
|
@@ -3868,7 +3891,7 @@ public:
|
|
|
|
|
}
|
|
|
|
|
private:
|
|
|
|
|
bool waiting_for(gms::inet_address ep) {
|
|
|
|
|
const auto& topo = _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
const auto& topo = _effective_replication_map_ptr->get_topology();
|
|
|
|
|
return db::is_datacenter_local(_cl) ? fbu::is_me(ep) || (topo.get_datacenter(ep) == topo.get_datacenter()) : true;
|
|
|
|
|
}
|
|
|
|
|
void got_response(gms::inet_address ep) {
|
|
|
|
|
@@ -4430,6 +4453,7 @@ protected:
|
|
|
|
|
|
|
|
|
|
schema_ptr _schema;
|
|
|
|
|
shared_ptr<storage_proxy> _proxy;
|
|
|
|
|
locator::effective_replication_map_ptr _effective_replication_map_ptr;
|
|
|
|
|
lw_shared_ptr<query::read_command> _cmd;
|
|
|
|
|
lw_shared_ptr<query::read_command> _retry_cmd;
|
|
|
|
|
dht::partition_range _partition_range;
|
|
|
|
|
@@ -4453,13 +4477,17 @@ private:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const locator::topology& get_topology() const noexcept {
|
|
|
|
|
return _proxy->get_token_metadata_ptr()->get_topology();
|
|
|
|
|
return _effective_replication_map_ptr->get_topology();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
abstract_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
|
|
|
|
|
abstract_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
|
|
|
|
|
inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
|
|
|
|
|
_schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)),
|
|
|
|
|
_schema(std::move(s)), _proxy(std::move(proxy))
|
|
|
|
|
, _effective_replication_map_ptr(std::move(ermp))
|
|
|
|
|
, _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)),
|
|
|
|
|
_cf(std::move(cf)), _permit(std::move(permit)), _rate_limit_info(rate_limit_info) {
|
|
|
|
|
_proxy->get_stats().reads++;
|
|
|
|
|
_proxy->get_stats().foreground_reads++;
|
|
|
|
|
@@ -4723,8 +4751,8 @@ public:
|
|
|
|
|
// Return an empty result in this case
|
|
|
|
|
return make_ready_future<result<foreign_ptr<lw_shared_ptr<query::result>>>>(make_foreign(make_lw_shared(query::result())));
|
|
|
|
|
}
|
|
|
|
|
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_proxy, _schema, _cl, _block_for,
|
|
|
|
|
db::is_datacenter_local(_cl) ? _proxy->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets): _targets.size(), timeout);
|
|
|
|
|
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_proxy, _effective_replication_map_ptr, _schema, _cl, _block_for,
|
|
|
|
|
db::is_datacenter_local(_cl) ? _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets): _targets.size(), timeout);
|
|
|
|
|
auto exec = shared_from_this();
|
|
|
|
|
|
|
|
|
|
make_requests(digest_resolver, timeout);
|
|
|
|
|
@@ -4765,7 +4793,7 @@ public:
|
|
|
|
|
if (std::abs(delta) <= write_timeout) {
|
|
|
|
|
exec->_proxy->get_stats().global_read_repairs_canceled_due_to_concurrent_write++;
|
|
|
|
|
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
|
|
|
|
|
auto local_dc_filter = exec->_proxy->get_token_metadata_ptr()->get_topology().get_local_dc_filter();
|
|
|
|
|
auto local_dc_filter = exec->_effective_replication_map_ptr->get_topology().get_local_dc_filter();
|
|
|
|
|
auto i = boost::range::remove_if(exec->_targets, std::not_fn(std::cref(local_dc_filter)));
|
|
|
|
|
exec->_targets.erase(i, exec->_targets.end());
|
|
|
|
|
}
|
|
|
|
|
@@ -4823,9 +4851,11 @@ private:
|
|
|
|
|
|
|
|
|
|
class never_speculating_read_executor : public abstract_read_executor {
|
|
|
|
|
public:
|
|
|
|
|
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit,
|
|
|
|
|
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy,
|
|
|
|
|
locator::effective_replication_map_ptr ermp,
|
|
|
|
|
lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit,
|
|
|
|
|
db::per_partition_rate_limit::info rate_limit_info) :
|
|
|
|
|
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) {
|
|
|
|
|
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(ermp), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) {
|
|
|
|
|
_block_for = _targets.size();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -4923,10 +4953,11 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
service_permit permit) {
|
|
|
|
|
const dht::token& token = pr.start()->value().token();
|
|
|
|
|
replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
speculative_retry::type retry_type = schema->speculative_retry().get_type();
|
|
|
|
|
gms::inet_address extra_replica;
|
|
|
|
|
|
|
|
|
|
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token);
|
|
|
|
|
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
|
|
|
|
|
// Check for a non-local read before heat-weighted load balancing
|
|
|
|
|
// reordering of endpoints happens. The local endpoint, if
|
|
|
|
|
// present, is always first in the list, as get_live_sorted_endpoints()
|
|
|
|
|
@@ -4935,7 +4966,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
|
|
|
|
|
auto cf = _db.local().find_column_family(schema).shared_from_this();
|
|
|
|
|
auto& gossiper = _remote->gossiper();
|
|
|
|
|
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
|
|
|
|
|
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, *erm, all_replicas, preferred_endpoints, repair_decision,
|
|
|
|
|
gossiper,
|
|
|
|
|
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
|
|
|
|
|
_db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr);
|
|
|
|
|
@@ -4945,7 +4976,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
|
|
|
|
|
// Throw UAE early if we don't have enough replicas.
|
|
|
|
|
try {
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, ks, target_replicas);
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, *erm, target_replicas);
|
|
|
|
|
} catch (exceptions::unavailable_exception& ex) {
|
|
|
|
|
slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive);
|
|
|
|
|
get_stats().read_unavailables.mark();
|
|
|
|
|
@@ -4956,7 +4987,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
get_stats().read_repair_attempts++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t block_for = db::block_for(ks, cl);
|
|
|
|
|
size_t block_for = db::block_for(*erm, cl);
|
|
|
|
|
auto p = shared_from_this();
|
|
|
|
|
|
|
|
|
|
db::per_partition_rate_limit::info rate_limit_info;
|
|
|
|
|
@@ -4973,22 +5004,22 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
|
|
|
|
|
if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size()
|
|
|
|
|
|| (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) {
|
|
|
|
|
return ::make_shared<never_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (target_replicas.size() == all_replicas.size()) {
|
|
|
|
|
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
|
|
|
|
|
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
|
|
|
|
|
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
|
|
|
|
|
return ::make_shared<always_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
|
|
|
|
|
if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present
|
|
|
|
|
auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter();
|
|
|
|
|
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
|
|
|
|
|
if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) {
|
|
|
|
|
slogger.trace("read executor no extra target to speculate");
|
|
|
|
|
return ::make_shared<never_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
} else {
|
|
|
|
|
target_replicas.push_back(extra_replica);
|
|
|
|
|
slogger.trace("creating read executor with extra target {}", extra_replica);
|
|
|
|
|
@@ -4996,9 +5027,9 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (retry_type == speculative_retry::type::ALWAYS) {
|
|
|
|
|
return ::make_shared<always_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
} else {// PERCENTILE or CUSTOM.
|
|
|
|
|
return ::make_shared<speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
return ::make_shared<speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -5180,6 +5211,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
|
|
|
|
service_permit permit) {
|
|
|
|
|
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
|
|
|
|
replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
|
|
|
|
auto p = shared_from_this();
|
|
|
|
|
auto& cf= _db.local().find_column_family(schema);
|
|
|
|
|
@@ -5209,9 +5241,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
|
|
|
|
auto& gossiper = _remote->gossiper();
|
|
|
|
|
while (i != ranges.end()) {
|
|
|
|
|
dht::partition_range& range = *i;
|
|
|
|
|
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
|
|
|
|
|
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(*erm, end_token(range));
|
|
|
|
|
inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
|
|
|
|
|
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, gossiper, pcf);
|
|
|
|
|
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, *erm, live_endpoints, merged_preferred_replicas, gossiper, pcf);
|
|
|
|
|
std::vector<dht::token_range> merged_ranges{to_token_range(range)};
|
|
|
|
|
++i;
|
|
|
|
|
|
|
|
|
|
@@ -5222,8 +5254,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
|
|
|
|
{
|
|
|
|
|
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
|
|
|
|
|
dht::partition_range& next_range = *i;
|
|
|
|
|
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range));
|
|
|
|
|
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
|
|
|
|
|
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(*erm, end_token(next_range));
|
|
|
|
|
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, *erm, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
|
|
|
|
|
|
|
|
|
|
// Origin has this to say here:
|
|
|
|
|
// * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
|
|
|
|
|
@@ -5264,11 +5296,11 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
|
|
|
|
inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas);
|
|
|
|
|
|
|
|
|
|
// Check if there is enough endpoint for the merge to be possible.
|
|
|
|
|
if (!is_sufficient_live_nodes(cl, ks, merged)) {
|
|
|
|
|
if (!is_sufficient_live_nodes(cl, *erm, merged)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, gossiper, pcf);
|
|
|
|
|
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, *erm, merged, current_merged_preferred_replicas, gossiper, pcf);
|
|
|
|
|
|
|
|
|
|
// Estimate whether merging will be a win or not
|
|
|
|
|
if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
|
|
|
|
|
@@ -5305,14 +5337,14 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
|
|
|
|
}
|
|
|
|
|
slogger.trace("creating range read executor with targets {}", filtered_endpoints);
|
|
|
|
|
try {
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints);
|
|
|
|
|
db::assure_sufficient_live_nodes(cl, *erm, filtered_endpoints);
|
|
|
|
|
} catch(exceptions::unavailable_exception& ex) {
|
|
|
|
|
slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive);
|
|
|
|
|
get_stats().range_slice_unavailables.mark();
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf.shared_from_this(), p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
|
|
|
|
|
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf.shared_from_this(), p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
|
|
|
|
|
ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -5789,9 +5821,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
|
|
|
|
co_return condition_met;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inet_address_vector_replica_set storage_proxy::get_live_endpoints(replica::keyspace& ks, const dht::token& token) const {
|
|
|
|
|
auto erm = ks.get_effective_replication_map();
|
|
|
|
|
inet_address_vector_replica_set eps = erm->get_natural_endpoints_without_node_being_replaced(token);
|
|
|
|
|
inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
|
|
|
|
|
inet_address_vector_replica_set eps = erm.get_natural_endpoints_without_node_being_replaced(token);
|
|
|
|
|
auto itend = boost::range::remove_if(eps, std::not_fn(std::bind_front(&storage_proxy::is_alive, this)));
|
|
|
|
|
eps.erase(itend, eps.end());
|
|
|
|
|
return eps;
|
|
|
|
|
@@ -5806,8 +5837,8 @@ void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set&
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const {
|
|
|
|
|
auto eps = get_live_endpoints(ks, token);
|
|
|
|
|
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
|
|
|
|
|
auto eps = get_live_endpoints(erm, token);
|
|
|
|
|
sort_endpoints_by_proximity(eps);
|
|
|
|
|
return eps;
|
|
|
|
|
}
|
|
|
|
|
|