Merge 'storage_proxy: use consistent topology, prepare for fencing' from Avi Kivity

Replication is a mix of several inputs: tokens and token->node mappings (topology),
the replication strategy, replication strategy parameters. These are all captured
in effective_replication_map.

However, if we use effective_replication_map:s captured at different times in a single
query, then different uses may see different inputs to effective_replication_map.

This series protects against that by capturing an effective_replication_map just
once in a query, and then using it. Furthermore, the captured effective_replication_map
is held until the query completes, so topology code can know when a topology is no
longer is use (although this isn't exploited in this series).

Only the simple read and write paths are covered. Counters and paxos are left for
later.

I don't think the series fixes any bugs - as far as I could tell everything was happening
in the same continuation. But this series ensures it.

Closes #11259

* github.com:scylladb/scylladb:
  storage_proxy: use consistent topology
  storage_proxy: use consistent replication map on read path
  storage_proxy: use consistent replication map on write path
  storage_proxy: convert get_live{,_sorted}_endpoints() to accept an effective_replication_map
  consistency_level: accept effective_replication_map as parameter, rather than keyspace
  consistency_level: be more const when using replication_strategy
This commit is contained in:
Botond Dénes
2022-08-12 06:00:30 +03:00
5 changed files with 174 additions and 138 deletions

View File

@@ -29,15 +29,15 @@ namespace db {
logging::logger cl_logger("consistency");
size_t quorum_for(const replica::keyspace& ks) {
size_t replication_factor = ks.get_effective_replication_map()->get_replication_factor();
size_t quorum_for(const locator::effective_replication_map& erm) {
size_t replication_factor = erm.get_replication_factor();
return replication_factor ? (replication_factor / 2) + 1 : 0;
}
size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc) {
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
using namespace locator;
auto& rs = ks.get_replication_strategy();
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
const network_topology_strategy* nrs =
@@ -46,10 +46,10 @@ size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc) {
return replication_factor ? (replication_factor / 2) + 1 : 0;
}
return quorum_for(ks);
return quorum_for(erm);
}
size_t block_for_local_serial(replica::keyspace& ks) {
size_t block_for_local_serial(const locator::effective_replication_map& erm) {
using namespace locator;
//
@@ -59,31 +59,31 @@ size_t block_for_local_serial(replica::keyspace& ks) {
// and the snitch itself (and thus its output) may change dynamically.
//
const auto& topo = ks.get_effective_replication_map()->get_topology();
return local_quorum_for(ks, topo.get_datacenter());
const auto& topo = erm.get_topology();
return local_quorum_for(erm, topo.get_datacenter());
}
size_t block_for_each_quorum(replica::keyspace& ks) {
size_t block_for_each_quorum(const locator::effective_replication_map& erm) {
using namespace locator;
auto& rs = ks.get_replication_strategy();
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
network_topology_strategy* nrs =
static_cast<network_topology_strategy*>(&rs);
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
size_t n = 0;
for (auto& dc : nrs->get_datacenters()) {
n += local_quorum_for(ks, dc);
n += local_quorum_for(erm, dc);
}
return n;
} else {
return quorum_for(ks);
return quorum_for(erm);
}
}
size_t block_for(replica::keyspace& ks, consistency_level cl) {
size_t block_for(const locator::effective_replication_map& erm, consistency_level cl) {
switch (cl) {
case consistency_level::ONE:
case consistency_level::LOCAL_ONE:
@@ -96,14 +96,14 @@ size_t block_for(replica::keyspace& ks, consistency_level cl) {
return 3;
case consistency_level::QUORUM:
case consistency_level::SERIAL:
return quorum_for(ks);
return quorum_for(erm);
case consistency_level::ALL:
return ks.get_effective_replication_map()->get_replication_factor();
return erm.get_replication_factor();
case consistency_level::LOCAL_QUORUM:
case consistency_level::LOCAL_SERIAL:
return block_for_local_serial(ks);
return block_for_local_serial(erm);
case consistency_level::EACH_QUORUM:
return block_for_each_quorum(ks);
return block_for_each_quorum(erm);
default:
abort();
}
@@ -115,16 +115,16 @@ bool is_datacenter_local(consistency_level l) {
template <typename Range, typename PendingRange = std::array<gms::inet_address, 0>>
std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints = std::array<gms::inet_address, 0>()) {
using namespace locator;
auto& rs = ks.get_replication_strategy();
const auto& topo = ks.get_effective_replication_map()->get_topology();
auto& rs = erm.get_replication_strategy();
const auto& topo = erm.get_topology();
network_topology_strategy* nrs =
static_cast<network_topology_strategy*>(&rs);
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
std::unordered_map<sstring, dc_node_count> dc_endpoints;
for (auto& dc : nrs->get_datacenters()) {
@@ -151,16 +151,16 @@ std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
template<typename Range, typename PendingRange>
bool assure_sufficient_live_nodes_each_quorum(
consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints) {
using namespace locator;
auto& rs = ks.get_replication_strategy();
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
for (auto& entry : count_per_dc_endpoints(ks, live_endpoints, pending_endpoints)) {
auto dc_block_for = local_quorum_for(ks, entry.first);
for (auto& entry : count_per_dc_endpoints(erm, live_endpoints, pending_endpoints)) {
auto dc_block_for = local_quorum_for(erm, entry.first);
auto dc_live = entry.second.live;
auto dc_pending = entry.second.pending;
@@ -178,10 +178,10 @@ bool assure_sufficient_live_nodes_each_quorum(
template<typename Range, typename PendingRange>
void assure_sufficient_live_nodes(
consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints) {
size_t need = block_for(ks, cl);
size_t need = block_for(erm, cl);
auto adjust_live_for_error = [] (size_t live, size_t pending) {
// DowngradingConsistencyRetryPolicy uses alive replicas count from Unavailable
@@ -191,7 +191,7 @@ void assure_sufficient_live_nodes(
return pending <= live ? live - pending : 0;
};
const auto& topo = ks.get_effective_replication_map()->get_topology();
const auto& topo = erm.get_topology();
switch (cl) {
case consistency_level::ANY:
@@ -212,7 +212,7 @@ void assure_sufficient_live_nodes(
break;
}
case consistency_level::EACH_QUORUM:
if (assure_sufficient_live_nodes_each_quorum(cl, ks, live_endpoints, pending_endpoints)) {
if (assure_sufficient_live_nodes_each_quorum(cl, erm, live_endpoints, pending_endpoints)) {
break;
}
// Fallthough on purpose for SimpleStrategy
@@ -227,12 +227,12 @@ void assure_sufficient_live_nodes(
}
}
template void assure_sufficient_live_nodes(consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
template void assure_sufficient_live_nodes(db::consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
inet_address_vector_replica_set
filter_for_query(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
@@ -246,7 +246,7 @@ filter_for_query(consistency_level cl,
}
if (read_repair == read_repair_decision::DC_LOCAL || is_datacenter_local(cl)) {
const auto& topo = ks.get_effective_replication_map()->get_topology();
const auto& topo = erm.get_topology();
auto it = boost::range::stable_partition(live_endpoints, topo.get_local_dc_filter());
local_count = std::distance(live_endpoints.begin(), it);
if (is_datacenter_local(cl)) {
@@ -254,10 +254,10 @@ filter_for_query(consistency_level cl,
}
}
size_t bf = block_for(ks, cl);
size_t bf = block_for(erm, cl);
if (read_repair == read_repair_decision::DC_LOCAL) {
bf = std::max(block_for(ks, cl), local_count);
bf = std::max(block_for(erm, cl), local_count);
}
if (bf >= live_endpoints.size()) { // RRD.DC_LOCAL + CL.LOCAL or CL.ALL
@@ -345,20 +345,20 @@ filter_for_query(consistency_level cl,
}
inet_address_vector_replica_set filter_for_query(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
const gms::gossiper& g,
replica::column_family* cf) {
return filter_for_query(cl, ks, live_endpoints, preferred_endpoints, read_repair_decision::NONE, g, nullptr, cf);
return filter_for_query(cl, erm, live_endpoints, preferred_endpoints, read_repair_decision::NONE, g, nullptr, cf);
}
bool
is_sufficient_live_nodes(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const inet_address_vector_replica_set& live_endpoints) {
using namespace locator;
const auto& topo = ks.get_effective_replication_map()->get_topology();
const auto& topo = erm.get_topology();
switch (cl) {
case consistency_level::ANY:
@@ -367,14 +367,14 @@ is_sufficient_live_nodes(consistency_level cl,
case consistency_level::LOCAL_ONE:
return topo.count_local_endpoints(live_endpoints) >= 1;
case consistency_level::LOCAL_QUORUM:
return topo.count_local_endpoints(live_endpoints) >= block_for(ks, cl);
return topo.count_local_endpoints(live_endpoints) >= block_for(erm, cl);
case consistency_level::EACH_QUORUM:
{
auto& rs = ks.get_replication_strategy();
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) {
if (entry.second.live < local_quorum_for(ks, entry.first)) {
for (auto& entry : count_per_dc_endpoints(erm, live_endpoints)) {
if (entry.second.live < local_quorum_for(erm, entry.first)) {
return false;
}
}
@@ -384,7 +384,7 @@ is_sufficient_live_nodes(consistency_level cl,
}
// Fallthough on purpose for SimpleStrategy
default:
return live_endpoints.size() >= block_for(ks, cl);
return live_endpoints.size() >= block_for(erm, cl);
}
}

View File

@@ -26,25 +26,29 @@ namespace gms {
class gossiper;
};
namespace locator {
class effective_replication_map;
}
namespace db {
extern logging::logger cl_logger;
size_t quorum_for(const replica::keyspace& ks);
size_t quorum_for(const locator::effective_replication_map& erm);
size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc);
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc);
size_t block_for_local_serial(replica::keyspace& ks);
size_t block_for_local_serial(const locator::effective_replication_map& erm);
size_t block_for_each_quorum(replica::keyspace& ks);
size_t block_for_each_quorum(const locator::effective_replication_map& erm);
size_t block_for(replica::keyspace& ks, consistency_level cl);
size_t block_for(const locator::effective_replication_map& erm, consistency_level cl);
bool is_datacenter_local(consistency_level l);
inet_address_vector_replica_set
filter_for_query(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
@@ -53,7 +57,7 @@ filter_for_query(consistency_level cl,
replica::column_family* cf);
inet_address_vector_replica_set filter_for_query(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
const gms::gossiper& g,
@@ -66,17 +70,17 @@ struct dc_node_count {
bool
is_sufficient_live_nodes(consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const inet_address_vector_replica_set& live_endpoints);
template<typename Range, typename PendingRange = std::array<gms::inet_address, 0>>
void assure_sufficient_live_nodes(
consistency_level cl,
replica::keyspace& ks,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints = std::array<gms::inet_address, 0>());
extern template void assure_sufficient_live_nodes(consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
extern template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
}

View File

@@ -493,7 +493,8 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
std::map<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_per_addr;
const auto& topo = get_token_metadata_ptr()->get_topology();
while (std::optional<dht::partition_range> vnode = next_vnode()) {
inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(ks, end_token(*vnode));
auto erm = ks.get_effective_replication_map();
inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode));
// Do not choose an endpoint outside the current datacenter if a request has a local consistency
if (db::is_datacenter_local(req.cl)) {
retain_local_endpoints(topo, live_endpoints);

View File

@@ -1095,6 +1095,7 @@ protected:
storage_proxy::response_id_type _id;
promise<result<>> _ready; // available when cl is achieved
shared_ptr<storage_proxy> _proxy;
locator::effective_replication_map_ptr _effective_replication_map_ptr;
tracing::trace_state_ptr _trace_state;
db::consistency_level _cl;
size_t _total_block_for = 0;
@@ -1129,17 +1130,21 @@ protected:
}
public:
abstract_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
abstract_write_response_handler(shared_ptr<storage_proxy> p,
locator::effective_replication_map_ptr erm,
db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, size_t pending_endpoints = 0,
inet_address_vector_topology_change dead_endpoints = {})
: _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
: _id(p->get_next_response_id()), _proxy(std::move(p))
, _effective_replication_map_ptr(std::move(erm))
, _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)),
_rate_limit_info(rate_limit_info) {
// original comment from cassandra:
// during bootstrap, include pending endpoints in the count
// or we may fail the consistency level guarantees (see #833, #8058)
_total_block_for = db::block_for(ks, _cl) + pending_endpoints;
_total_block_for = db::block_for(*_effective_replication_map_ptr, _cl) + pending_endpoints;
++_stats.writes;
}
virtual ~abstract_write_response_handler() {
@@ -1280,7 +1285,7 @@ public:
}
if (_cl_achieved) { // For CL=ANY this can still be false
for (auto&& ep : get_targets()) {
++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep);
++stats().background_replica_writes_failed.get_ep_stat(_effective_replication_map_ptr->get_topology(), ep);
}
stats().background_writes_failed += int(!_targets.empty());
}
@@ -1379,19 +1384,21 @@ public:
class datacenter_write_response_handler : public abstract_write_response_handler {
bool waited_for(gms::inet_address from) override {
const auto& topo = _proxy->get_token_metadata_ptr()->get_topology();
const auto& topo = _effective_replication_map_ptr->get_topology();
return fbu::is_me(from) || (topo.get_datacenter(from) == topo.get_datacenter());
}
public:
datacenter_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
datacenter_write_response_handler(shared_ptr<storage_proxy> p,
locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
abstract_write_response_handler(p, ks, cl, type, std::move(mh),
abstract_write_response_handler(p, ermp, cl, type, std::move(mh), // can't move ermp, it's used below
std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info,
p->get_token_metadata_ptr()->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
_total_endpoints = p->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets);
ermp->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
_total_endpoints = _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets);
}
};
@@ -1400,11 +1407,13 @@ class write_response_handler : public abstract_write_response_handler {
return true;
}
public:
write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
write_response_handler(shared_ptr<storage_proxy> p,
locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh),
std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints)) {
_total_endpoints = _targets.size();
}
@@ -1412,11 +1421,13 @@ public:
class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
public:
view_update_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl,
view_update_write_response_handler(shared_ptr<storage_proxy> p,
locator::effective_replication_map_ptr ermp,
db::consistency_level cl,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info):
write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh),
write_response_handler(p, std::move(ermp), cl, db::write_type::VIEW, std::move(mh),
std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info) {
register_in_intrusive_list(*p);
}
@@ -1481,7 +1492,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
};
std::unordered_map<sstring, dc_info> _dc_responses;
bool waited_for(gms::inet_address from) override {
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
auto& topology = _effective_replication_map_ptr->get_topology();
sstring data_center = topology.get_datacenter(from);
auto dc_resp = _dc_responses.find(data_center);
@@ -1492,12 +1503,13 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
return false;
}
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, replica::keyspace& ks, db::consistency_level cl, db::write_type type,
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints,
inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit,
db::per_partition_rate_limit::info rate_limit_info) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) {
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) {
auto* erm = _effective_replication_map_ptr.get();
auto& topology = erm->get_topology();
for (auto& target : targets) {
auto dc = topology.get_datacenter(target);
@@ -1509,13 +1521,13 @@ public:
size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){
return topology.get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0});
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(*erm, dc) + pending_for_dc, total_endpoints_for_dc, 0});
_total_block_for += pending_for_dc;
}
}
}
bool failure(gms::inet_address from, size_t count, error err) override {
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
auto& topology = _effective_replication_map_ptr->get_topology();
const sstring& dc = topology.get_datacenter(from);
auto dc_resp = _dc_responses.find(dc);
@@ -2185,21 +2197,22 @@ future<result<>> storage_proxy::response_wait(storage_proxy::response_id_type id
return _response_handlers.find(id)->second;
}
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info)
{
shared_ptr<abstract_write_response_handler> h;
auto& rs = ks.get_replication_strategy();
auto& rs = ermp->get_replication_strategy();
if (db::is_datacenter_local(cl)) {
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
} else if (type == db::write_type::VIEW) {
h = ::make_shared<view_update_write_response_handler>(shared_from_this(), ks, cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
h = ::make_shared<view_update_write_response_handler>(shared_from_this(), std::move(ermp), cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
} else {
h = ::make_shared<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
}
return bo::success(register_response_handler(std::move(h)));
}
@@ -2769,9 +2782,9 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
tracing::trace(tr_state, "Creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints);
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info);
}
@@ -2806,9 +2819,10 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
auto keyspace_name = mh->schema()->ks_name();
replica::keyspace& ks = _db.local().find_keyspace(keyspace_name);
auto ermp = ks.get_effective_replication_map();
// No rate limiting for read repair
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
return create_write_response_handler(std::move(ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
}
result<storage_proxy::response_id_type>
@@ -2830,9 +2844,10 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
auto keyspace_name = s->ks_name();
replica::keyspace& ks = _db.local().find_keyspace(keyspace_name);
auto ermp = ks.get_effective_replication_map();
// No rate limiting for paxos (yet)
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
}
@@ -2968,10 +2983,11 @@ future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, utils
gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& m, db::consistency_level cl) {
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
auto live_endpoints = get_live_endpoints(ks, m.token());
auto erm = ks.get_effective_replication_map();
auto live_endpoints = get_live_endpoints(*erm, m.token());
if (live_endpoints.empty()) {
throw exceptions::unavailable_exception(cl, block_for(ks, cl), 0);
throw exceptions::unavailable_exception(cl, block_for(*erm, cl), 0);
}
const auto my_address = utils::fb_utilities::get_broadcast_address();
@@ -2982,7 +2998,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation&
}
const auto local_endpoints = boost::copy_range<inet_address_vector_replica_set>(live_endpoints
| boost::adaptors::filtered(get_token_metadata_ptr()->get_topology().get_local_dc_filter()));
| boost::adaptors::filtered(erm->get_topology().get_local_dc_filter()));
if (local_endpoints.empty()) {
// FIXME: O(n log n) to get maximum
@@ -3025,14 +3041,15 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
// Let's just use the schema of the first mutation in a vector.
auto handle_error = [this, sp = this->shared_from_this(), s = endpoint_and_mutations.second[0].s, cl, permit] (std::exception_ptr exp) {
auto& ks = _db.local().find_keyspace(s->ks_name());
auto erm = ks.get_effective_replication_map();
try {
std::rethrow_exception(std::move(exp));
} catch (rpc::timeout_error&) {
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER));
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER));
} catch (timed_out_error&) {
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER));
return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER));
} catch (rpc::closed_error&) {
return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(ks, cl), db::write_type::COUNTER));
return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER));
}
};
@@ -3065,7 +3082,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
inet_address_vector_topology_change pending_endpoints = erm->get_token_metadata_ptr()->pending_endpoints_for(token, ks_name);
if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) {
auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter();
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(local_dc_filter)));
natural_endpoints.erase(itend, natural_endpoints.end());
itend = boost::range::remove_if(pending_endpoints, std::not_fn(std::cref(local_dc_filter)));
@@ -3346,7 +3363,8 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
future<result<>> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
auto ermp = ks.get_effective_replication_map();
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
@@ -3476,10 +3494,11 @@ future<> storage_proxy::send_to_endpoint(
std::back_inserter(dead_endpoints),
std::bind_front(&storage_proxy::is_alive, this));
auto& ks = _db.local().find_keyspace(m->schema()->ks_name());
auto erm = ks.get_effective_replication_map();
slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints);
db::assure_sufficient_live_nodes(cl, ks, targets, pending_endpoints);
db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints);
return create_write_response_handler(
ks,
std::move(erm),
cl,
type,
std::move(m),
@@ -3592,7 +3611,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto& stats = handler_ptr->stats();
auto& handler = *handler_ptr;
auto& global_stats = handler._proxy->_global_stats;
auto& topology = get_token_metadata_ptr()->get_topology();
auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology();
auto local_dc = topology.get_datacenter();
for(auto dest: handler.get_targets()) {
@@ -3645,9 +3664,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
got_response(response_id, coordinator, std::nullopt);
} else {
if (!handler.read_repair_write()) {
++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
++stats.writes_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
} else {
++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
++stats.read_repair_write_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
}
if (coordinator == my_address) {
@@ -3659,7 +3678,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
// Waited on indirectly.
(void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator);
++stats.writes_errors.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator);
error err = error::FAILURE;
std::optional<sstring> msg;
if (try_catch<replica::rate_limit_exception>(eptr)) {
@@ -3801,6 +3820,7 @@ class digest_read_resolver : public abstract_read_resolver {
};
private:
shared_ptr<storage_proxy> _proxy;
locator::effective_replication_map_ptr _effective_replication_map_ptr;
size_t _block_for;
size_t _cl_responses = 0;
promise<result<digest_read_result>> _cl_promise; // cl is reached
@@ -3822,9 +3842,12 @@ private:
_digest_results.clear();
}
public:
digest_read_resolver(shared_ptr<storage_proxy> proxy, schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout)
digest_read_resolver(shared_ptr<storage_proxy> proxy,
locator::effective_replication_map_ptr ermp,
schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout)
: abstract_read_resolver(std::move(schema), cl, 0, timeout)
, _proxy(std::move(proxy))
, _effective_replication_map_ptr(std::move(ermp))
, _block_for(block_for)
, _target_count_for_cl(target_count_for_cl)
{}
@@ -3868,7 +3891,7 @@ public:
}
private:
bool waiting_for(gms::inet_address ep) {
const auto& topo = _proxy->get_token_metadata_ptr()->get_topology();
const auto& topo = _effective_replication_map_ptr->get_topology();
return db::is_datacenter_local(_cl) ? fbu::is_me(ep) || (topo.get_datacenter(ep) == topo.get_datacenter()) : true;
}
void got_response(gms::inet_address ep) {
@@ -4430,6 +4453,7 @@ protected:
schema_ptr _schema;
shared_ptr<storage_proxy> _proxy;
locator::effective_replication_map_ptr _effective_replication_map_ptr;
lw_shared_ptr<query::read_command> _cmd;
lw_shared_ptr<query::read_command> _retry_cmd;
dht::partition_range _partition_range;
@@ -4453,13 +4477,17 @@ private:
}
const locator::topology& get_topology() const noexcept {
return _proxy->get_token_metadata_ptr()->get_topology();
return _effective_replication_map_ptr->get_topology();
}
public:
abstract_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
abstract_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy,
locator::effective_replication_map_ptr ermp,
lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) :
_schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)),
_schema(std::move(s)), _proxy(std::move(proxy))
, _effective_replication_map_ptr(std::move(ermp))
, _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)),
_cf(std::move(cf)), _permit(std::move(permit)), _rate_limit_info(rate_limit_info) {
_proxy->get_stats().reads++;
_proxy->get_stats().foreground_reads++;
@@ -4723,8 +4751,8 @@ public:
// Return an empty result in this case
return make_ready_future<result<foreign_ptr<lw_shared_ptr<query::result>>>>(make_foreign(make_lw_shared(query::result())));
}
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_proxy, _schema, _cl, _block_for,
db::is_datacenter_local(_cl) ? _proxy->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets): _targets.size(), timeout);
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_proxy, _effective_replication_map_ptr, _schema, _cl, _block_for,
db::is_datacenter_local(_cl) ? _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets): _targets.size(), timeout);
auto exec = shared_from_this();
make_requests(digest_resolver, timeout);
@@ -4765,7 +4793,7 @@ public:
if (std::abs(delta) <= write_timeout) {
exec->_proxy->get_stats().global_read_repairs_canceled_due_to_concurrent_write++;
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
auto local_dc_filter = exec->_proxy->get_token_metadata_ptr()->get_topology().get_local_dc_filter();
auto local_dc_filter = exec->_effective_replication_map_ptr->get_topology().get_local_dc_filter();
auto i = boost::range::remove_if(exec->_targets, std::not_fn(std::cref(local_dc_filter)));
exec->_targets.erase(i, exec->_targets.end());
}
@@ -4823,9 +4851,11 @@ private:
class never_speculating_read_executor : public abstract_read_executor {
public:
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit,
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy,
locator::effective_replication_map_ptr ermp,
lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit,
db::per_partition_rate_limit::info rate_limit_info) :
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) {
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(ermp), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) {
_block_for = _targets.size();
}
};
@@ -4923,10 +4953,11 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
service_permit permit) {
const dht::token& token = pr.start()->value().token();
replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name());
auto erm = ks.get_effective_replication_map();
speculative_retry::type retry_type = schema->speculative_retry().get_type();
gms::inet_address extra_replica;
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token);
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
// Check for a non-local read before heat-weighted load balancing
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_live_sorted_endpoints()
@@ -4935,7 +4966,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
auto cf = _db.local().find_column_family(schema).shared_from_this();
auto& gossiper = _remote->gossiper();
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, *erm, all_replicas, preferred_endpoints, repair_decision,
gossiper,
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
_db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr);
@@ -4945,7 +4976,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// Throw UAE early if we don't have enough replicas.
try {
db::assure_sufficient_live_nodes(cl, ks, target_replicas);
db::assure_sufficient_live_nodes(cl, *erm, target_replicas);
} catch (exceptions::unavailable_exception& ex) {
slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive);
get_stats().read_unavailables.mark();
@@ -4956,7 +4987,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
get_stats().read_repair_attempts++;
}
size_t block_for = db::block_for(ks, cl);
size_t block_for = db::block_for(*erm, cl);
auto p = shared_from_this();
db::per_partition_rate_limit::info rate_limit_info;
@@ -4973,22 +5004,22 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size()
|| (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) {
return ::make_shared<never_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}
if (target_replicas.size() == all_replicas.size()) {
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
return ::make_shared<always_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present
auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter();
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) {
slogger.trace("read executor no extra target to speculate");
return ::make_shared<never_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {
target_replicas.push_back(extra_replica);
slogger.trace("creating read executor with extra target {}", extra_replica);
@@ -4996,9 +5027,9 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
}
if (retry_type == speculative_retry::type::ALWAYS) {
return ::make_shared<always_speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {// PERCENTILE or CUSTOM.
return ::make_shared<speculating_read_executor>(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
return ::make_shared<speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}
}
@@ -5180,6 +5211,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
service_permit permit) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name());
auto erm = ks.get_effective_replication_map();
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto p = shared_from_this();
auto& cf= _db.local().find_column_family(schema);
@@ -5209,9 +5241,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
auto& gossiper = _remote->gossiper();
while (i != ranges.end()) {
dht::partition_range& range = *i;
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(*erm, end_token(range));
inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, *erm, live_endpoints, merged_preferred_replicas, gossiper, pcf);
std::vector<dht::token_range> merged_ranges{to_token_range(range)};
++i;
@@ -5222,8 +5254,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
{
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
dht::partition_range& next_range = *i;
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range));
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(*erm, end_token(next_range));
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, *erm, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
// Origin has this to say here:
// * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
@@ -5264,11 +5296,11 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas);
// Check if there is enough endpoint for the merge to be possible.
if (!is_sufficient_live_nodes(cl, ks, merged)) {
if (!is_sufficient_live_nodes(cl, *erm, merged)) {
break;
}
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, *erm, merged, current_merged_preferred_replicas, gossiper, pcf);
// Estimate whether merging will be a win or not
if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
@@ -5305,14 +5337,14 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
}
slogger.trace("creating range read executor with targets {}", filtered_endpoints);
try {
db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints);
db::assure_sufficient_live_nodes(cl, *erm, filtered_endpoints);
} catch(exceptions::unavailable_exception& ex) {
slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive);
get_stats().range_slice_unavailables.mark();
throw;
}
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf.shared_from_this(), p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf.shared_from_this(), p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges));
}
@@ -5789,9 +5821,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
co_return condition_met;
}
inet_address_vector_replica_set storage_proxy::get_live_endpoints(replica::keyspace& ks, const dht::token& token) const {
auto erm = ks.get_effective_replication_map();
inet_address_vector_replica_set eps = erm->get_natural_endpoints_without_node_being_replaced(token);
inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
inet_address_vector_replica_set eps = erm.get_natural_endpoints_without_node_being_replaced(token);
auto itend = boost::range::remove_if(eps, std::not_fn(std::bind_front(&storage_proxy::is_alive, this)));
eps.erase(itend, eps.end());
return eps;
@@ -5806,8 +5837,8 @@ void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set&
}
}
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const {
auto eps = get_live_endpoints(ks, token);
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
auto eps = get_live_endpoints(erm, token);
sort_endpoints_by_proximity(eps);
return eps;
}

View File

@@ -226,7 +226,7 @@ public:
query::max_result_size get_max_result_size(const query::partition_slice& slice) const;
query::tombstone_limit get_tombstone_limit() const;
inet_address_vector_replica_set get_live_endpoints(replica::keyspace& ks, const dht::token& token) const;
inet_address_vector_replica_set get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const;
private:
distributed<replica::database>& _db;
@@ -313,7 +313,7 @@ private:
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
result<response_id_type> create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
@@ -332,7 +332,7 @@ private:
bool hints_enabled(db::write_type type) const noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps);
inet_address_vector_replica_set get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const;
inet_address_vector_replica_set get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const;
bool is_alive(const gms::inet_address&) const;
db::read_repair_decision new_read_repair_decision(const schema& s);
result<::shared_ptr<abstract_read_executor>> get_read_executor(lw_shared_ptr<query::read_command> cmd,