diff --git a/db/consistency_level.cc b/db/consistency_level.cc index 2e5715d630..4eaec70d5d 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -29,15 +29,15 @@ namespace db { logging::logger cl_logger("consistency"); -size_t quorum_for(const replica::keyspace& ks) { - size_t replication_factor = ks.get_effective_replication_map()->get_replication_factor(); +size_t quorum_for(const locator::effective_replication_map& erm) { + size_t replication_factor = erm.get_replication_factor(); return replication_factor ? (replication_factor / 2) + 1 : 0; } -size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc) { +size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) { using namespace locator; - auto& rs = ks.get_replication_strategy(); + auto& rs = erm.get_replication_strategy(); if (rs.get_type() == replication_strategy_type::network_topology) { const network_topology_strategy* nrs = @@ -46,10 +46,10 @@ size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc) { return replication_factor ? (replication_factor / 2) + 1 : 0; } - return quorum_for(ks); + return quorum_for(erm); } -size_t block_for_local_serial(replica::keyspace& ks) { +size_t block_for_local_serial(const locator::effective_replication_map& erm) { using namespace locator; // @@ -59,31 +59,31 @@ size_t block_for_local_serial(replica::keyspace& ks) { // and the snitch itself (and thus its output) may change dynamically. // - const auto& topo = ks.get_effective_replication_map()->get_topology(); - return local_quorum_for(ks, topo.get_datacenter()); + const auto& topo = erm.get_topology(); + return local_quorum_for(erm, topo.get_datacenter()); } -size_t block_for_each_quorum(replica::keyspace& ks) { +size_t block_for_each_quorum(const locator::effective_replication_map& erm) { using namespace locator; - auto& rs = ks.get_replication_strategy(); + auto& rs = erm.get_replication_strategy(); if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); + const network_topology_strategy* nrs = + static_cast(&rs); size_t n = 0; for (auto& dc : nrs->get_datacenters()) { - n += local_quorum_for(ks, dc); + n += local_quorum_for(erm, dc); } return n; } else { - return quorum_for(ks); + return quorum_for(erm); } } -size_t block_for(replica::keyspace& ks, consistency_level cl) { +size_t block_for(const locator::effective_replication_map& erm, consistency_level cl) { switch (cl) { case consistency_level::ONE: case consistency_level::LOCAL_ONE: @@ -96,14 +96,14 @@ size_t block_for(replica::keyspace& ks, consistency_level cl) { return 3; case consistency_level::QUORUM: case consistency_level::SERIAL: - return quorum_for(ks); + return quorum_for(erm); case consistency_level::ALL: - return ks.get_effective_replication_map()->get_replication_factor(); + return erm.get_replication_factor(); case consistency_level::LOCAL_QUORUM: case consistency_level::LOCAL_SERIAL: - return block_for_local_serial(ks); + return block_for_local_serial(erm); case consistency_level::EACH_QUORUM: - return block_for_each_quorum(ks); + return block_for_each_quorum(erm); default: abort(); } @@ -115,16 +115,16 @@ bool is_datacenter_local(consistency_level l) { template > std::unordered_map count_per_dc_endpoints( - replica::keyspace& ks, + const locator::effective_replication_map& erm, const Range& live_endpoints, const PendingRange& pending_endpoints = std::array()) { using namespace locator; - auto& rs = ks.get_replication_strategy(); - const auto& topo = ks.get_effective_replication_map()->get_topology(); + auto& rs = erm.get_replication_strategy(); + const auto& topo = erm.get_topology(); - network_topology_strategy* nrs = - static_cast(&rs); + const network_topology_strategy* nrs = + static_cast(&rs); std::unordered_map dc_endpoints; for (auto& dc : nrs->get_datacenters()) { @@ -151,16 +151,16 @@ std::unordered_map count_per_dc_endpoints( template bool assure_sufficient_live_nodes_each_quorum( consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, const Range& live_endpoints, const PendingRange& pending_endpoints) { using namespace locator; - auto& rs = ks.get_replication_strategy(); + auto& rs = erm.get_replication_strategy(); if (rs.get_type() == replication_strategy_type::network_topology) { - for (auto& entry : count_per_dc_endpoints(ks, live_endpoints, pending_endpoints)) { - auto dc_block_for = local_quorum_for(ks, entry.first); + for (auto& entry : count_per_dc_endpoints(erm, live_endpoints, pending_endpoints)) { + auto dc_block_for = local_quorum_for(erm, entry.first); auto dc_live = entry.second.live; auto dc_pending = entry.second.pending; @@ -178,10 +178,10 @@ bool assure_sufficient_live_nodes_each_quorum( template void assure_sufficient_live_nodes( consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, const Range& live_endpoints, const PendingRange& pending_endpoints) { - size_t need = block_for(ks, cl); + size_t need = block_for(erm, cl); auto adjust_live_for_error = [] (size_t live, size_t pending) { // DowngradingConsistencyRetryPolicy uses alive replicas count from Unavailable @@ -191,7 +191,7 @@ void assure_sufficient_live_nodes( return pending <= live ? live - pending : 0; }; - const auto& topo = ks.get_effective_replication_map()->get_topology(); + const auto& topo = erm.get_topology(); switch (cl) { case consistency_level::ANY: @@ -212,7 +212,7 @@ void assure_sufficient_live_nodes( break; } case consistency_level::EACH_QUORUM: - if (assure_sufficient_live_nodes_each_quorum(cl, ks, live_endpoints, pending_endpoints)) { + if (assure_sufficient_live_nodes_each_quorum(cl, erm, live_endpoints, pending_endpoints)) { break; } // Fallthough on purpose for SimpleStrategy @@ -227,12 +227,12 @@ void assure_sufficient_live_nodes( } } -template void assure_sufficient_live_nodes(consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const std::array&); -template void assure_sufficient_live_nodes(db::consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); +template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array&); +template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector&); inet_address_vector_replica_set filter_for_query(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, inet_address_vector_replica_set live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, @@ -246,7 +246,7 @@ filter_for_query(consistency_level cl, } if (read_repair == read_repair_decision::DC_LOCAL || is_datacenter_local(cl)) { - const auto& topo = ks.get_effective_replication_map()->get_topology(); + const auto& topo = erm.get_topology(); auto it = boost::range::stable_partition(live_endpoints, topo.get_local_dc_filter()); local_count = std::distance(live_endpoints.begin(), it); if (is_datacenter_local(cl)) { @@ -254,10 +254,10 @@ filter_for_query(consistency_level cl, } } - size_t bf = block_for(ks, cl); + size_t bf = block_for(erm, cl); if (read_repair == read_repair_decision::DC_LOCAL) { - bf = std::max(block_for(ks, cl), local_count); + bf = std::max(block_for(erm, cl), local_count); } if (bf >= live_endpoints.size()) { // RRD.DC_LOCAL + CL.LOCAL or CL.ALL @@ -345,20 +345,20 @@ filter_for_query(consistency_level cl, } inet_address_vector_replica_set filter_for_query(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, inet_address_vector_replica_set& live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, const gms::gossiper& g, replica::column_family* cf) { - return filter_for_query(cl, ks, live_endpoints, preferred_endpoints, read_repair_decision::NONE, g, nullptr, cf); + return filter_for_query(cl, erm, live_endpoints, preferred_endpoints, read_repair_decision::NONE, g, nullptr, cf); } bool is_sufficient_live_nodes(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, const inet_address_vector_replica_set& live_endpoints) { using namespace locator; - const auto& topo = ks.get_effective_replication_map()->get_topology(); + const auto& topo = erm.get_topology(); switch (cl) { case consistency_level::ANY: @@ -367,14 +367,14 @@ is_sufficient_live_nodes(consistency_level cl, case consistency_level::LOCAL_ONE: return topo.count_local_endpoints(live_endpoints) >= 1; case consistency_level::LOCAL_QUORUM: - return topo.count_local_endpoints(live_endpoints) >= block_for(ks, cl); + return topo.count_local_endpoints(live_endpoints) >= block_for(erm, cl); case consistency_level::EACH_QUORUM: { - auto& rs = ks.get_replication_strategy(); + auto& rs = erm.get_replication_strategy(); if (rs.get_type() == replication_strategy_type::network_topology) { - for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) { - if (entry.second.live < local_quorum_for(ks, entry.first)) { + for (auto& entry : count_per_dc_endpoints(erm, live_endpoints)) { + if (entry.second.live < local_quorum_for(erm, entry.first)) { return false; } } @@ -384,7 +384,7 @@ is_sufficient_live_nodes(consistency_level cl, } // Fallthough on purpose for SimpleStrategy default: - return live_endpoints.size() >= block_for(ks, cl); + return live_endpoints.size() >= block_for(erm, cl); } } diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 1dedac7684..cba1af8446 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -26,25 +26,29 @@ namespace gms { class gossiper; }; +namespace locator { +class effective_replication_map; +} + namespace db { extern logging::logger cl_logger; -size_t quorum_for(const replica::keyspace& ks); +size_t quorum_for(const locator::effective_replication_map& erm); -size_t local_quorum_for(const replica::keyspace& ks, const sstring& dc); +size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc); -size_t block_for_local_serial(replica::keyspace& ks); +size_t block_for_local_serial(const locator::effective_replication_map& erm); -size_t block_for_each_quorum(replica::keyspace& ks); +size_t block_for_each_quorum(const locator::effective_replication_map& erm); -size_t block_for(replica::keyspace& ks, consistency_level cl); +size_t block_for(const locator::effective_replication_map& erm, consistency_level cl); bool is_datacenter_local(consistency_level l); inet_address_vector_replica_set filter_for_query(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, inet_address_vector_replica_set live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, @@ -53,7 +57,7 @@ filter_for_query(consistency_level cl, replica::column_family* cf); inet_address_vector_replica_set filter_for_query(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, inet_address_vector_replica_set& live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, const gms::gossiper& g, @@ -66,17 +70,17 @@ struct dc_node_count { bool is_sufficient_live_nodes(consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, const inet_address_vector_replica_set& live_endpoints); template> void assure_sufficient_live_nodes( consistency_level cl, - replica::keyspace& ks, + const locator::effective_replication_map& erm, const Range& live_endpoints, const PendingRange& pending_endpoints = std::array()); -extern template void assure_sufficient_live_nodes(consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const std::array&); -extern template void assure_sufficient_live_nodes(db::consistency_level, replica::keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); +extern template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array&); +extern template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector&); } diff --git a/service/forward_service.cc b/service/forward_service.cc index 144ae62ad0..b409434a02 100644 --- a/service/forward_service.cc +++ b/service/forward_service.cc @@ -493,7 +493,8 @@ future forward_service::dispatch(query::forward_request r std::map vnodes_per_addr; const auto& topo = get_token_metadata_ptr()->get_topology(); while (std::optional vnode = next_vnode()) { - inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(ks, end_token(*vnode)); + auto erm = ks.get_effective_replication_map(); + inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode)); // Do not choose an endpoint outside the current datacenter if a request has a local consistency if (db::is_datacenter_local(req.cl)) { retain_local_endpoints(topo, live_endpoints); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f9fa5c3f3d..f3abe5ae3f 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1095,6 +1095,7 @@ protected: storage_proxy::response_id_type _id; promise> _ready; // available when cl is achieved shared_ptr _proxy; + locator::effective_replication_map_ptr _effective_replication_map_ptr; tracing::trace_state_ptr _trace_state; db::consistency_level _cl; size_t _total_block_for = 0; @@ -1129,17 +1130,21 @@ protected: } public: - abstract_write_response_handler(shared_ptr p, replica::keyspace& ks, db::consistency_level cl, db::write_type type, + abstract_write_response_handler(shared_ptr p, + locator::effective_replication_map_ptr erm, + db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {}) - : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), + : _id(p->get_next_response_id()), _proxy(std::move(p)) + , _effective_replication_map_ptr(std::move(erm)) + , _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)), _rate_limit_info(rate_limit_info) { // original comment from cassandra: // during bootstrap, include pending endpoints in the count // or we may fail the consistency level guarantees (see #833, #8058) - _total_block_for = db::block_for(ks, _cl) + pending_endpoints; + _total_block_for = db::block_for(*_effective_replication_map_ptr, _cl) + pending_endpoints; ++_stats.writes; } virtual ~abstract_write_response_handler() { @@ -1280,7 +1285,7 @@ public: } if (_cl_achieved) { // For CL=ANY this can still be false for (auto&& ep : get_targets()) { - ++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep); + ++stats().background_replica_writes_failed.get_ep_stat(_effective_replication_map_ptr->get_topology(), ep); } stats().background_writes_failed += int(!_targets.empty()); } @@ -1379,19 +1384,21 @@ public: class datacenter_write_response_handler : public abstract_write_response_handler { bool waited_for(gms::inet_address from) override { - const auto& topo = _proxy->get_token_metadata_ptr()->get_topology(); + const auto& topo = _effective_replication_map_ptr->get_topology(); return fbu::is_me(from) || (topo.get_datacenter(from) == topo.get_datacenter()); } public: - datacenter_write_response_handler(shared_ptr p, replica::keyspace& ks, db::consistency_level cl, db::write_type type, + datacenter_write_response_handler(shared_ptr p, + locator::effective_replication_map_ptr ermp, + db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : - abstract_write_response_handler(p, ks, cl, type, std::move(mh), + abstract_write_response_handler(p, ermp, cl, type, std::move(mh), // can't move ermp, it's used below std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, - p->get_token_metadata_ptr()->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { - _total_endpoints = p->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets); + ermp->get_topology().count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { + _total_endpoints = _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets); } }; @@ -1400,11 +1407,13 @@ class write_response_handler : public abstract_write_response_handler { return true; } public: - write_response_handler(shared_ptr p, replica::keyspace& ks, db::consistency_level cl, db::write_type type, + write_response_handler(shared_ptr p, + locator::effective_replication_map_ptr ermp, + db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), + abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints)) { _total_endpoints = _targets.size(); } @@ -1412,11 +1421,13 @@ public: class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook> { public: - view_update_write_response_handler(shared_ptr p, replica::keyspace& ks, db::consistency_level cl, + view_update_write_response_handler(shared_ptr p, + locator::effective_replication_map_ptr ermp, + db::consistency_level cl, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info): - write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh), + write_response_handler(p, std::move(ermp), cl, db::write_type::VIEW, std::move(mh), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info) { register_in_intrusive_list(*p); } @@ -1481,7 +1492,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha }; std::unordered_map _dc_responses; bool waited_for(gms::inet_address from) override { - auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + auto& topology = _effective_replication_map_ptr->get_topology(); sstring data_center = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(data_center); @@ -1492,12 +1503,13 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha return false; } public: - datacenter_sync_write_response_handler(shared_ptr p, replica::keyspace& ks, db::consistency_level cl, db::write_type type, + datacenter_sync_write_response_handler(shared_ptr p, locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) { - auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), rate_limit_info, 0, dead_endpoints) { + auto* erm = _effective_replication_map_ptr.get(); + auto& topology = erm->get_topology(); for (auto& target : targets) { auto dc = topology.get_datacenter(target); @@ -1509,13 +1521,13 @@ public: size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){ return topology.get_datacenter(ep) == dc; }); - _dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0}); + _dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(*erm, dc) + pending_for_dc, total_endpoints_for_dc, 0}); _total_block_for += pending_for_dc; } } } bool failure(gms::inet_address from, size_t count, error err) override { - auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + auto& topology = _effective_replication_map_ptr->get_topology(); const sstring& dc = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(dc); @@ -2185,21 +2197,22 @@ future> storage_proxy::response_wait(storage_proxy::response_id_type id return _response_handlers.find(id)->second; } -result storage_proxy::create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, +result storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp, + db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) { shared_ptr h; - auto& rs = ks.get_replication_strategy(); + auto& rs = ermp->get_replication_strategy(); if (db::is_datacenter_local(cl)) { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else if (type == db::write_type::VIEW) { - h = ::make_shared(shared_from_this(), ks, cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + h = ::make_shared(shared_from_this(), std::move(ermp), cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } return bo::success(register_response_handler(std::move(h))); } @@ -2769,9 +2782,9 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); tracing::trace(tr_state, "Creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); - db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints); + db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints); - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, + return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info); } @@ -2806,9 +2819,10 @@ storage_proxy::create_write_response_handler(const std::unordered_mapschema()->ks_name(); replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); + auto ermp = ks.get_effective_replication_map(); // No rate limiting for read repair - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); + return create_write_response_handler(std::move(ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); } result @@ -2830,9 +2844,10 @@ storage_proxy::create_write_response_handler(const std::tupleks_name(); replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); + auto ermp = ks.get_effective_replication_map(); // No rate limiting for paxos (yet) - return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), + return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); } @@ -2968,10 +2983,11 @@ future> storage_proxy::mutate_end(future> mutate_result, utils gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& m, db::consistency_level cl) { auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - auto live_endpoints = get_live_endpoints(ks, m.token()); + auto erm = ks.get_effective_replication_map(); + auto live_endpoints = get_live_endpoints(*erm, m.token()); if (live_endpoints.empty()) { - throw exceptions::unavailable_exception(cl, block_for(ks, cl), 0); + throw exceptions::unavailable_exception(cl, block_for(*erm, cl), 0); } const auto my_address = utils::fb_utilities::get_broadcast_address(); @@ -2982,7 +2998,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& } const auto local_endpoints = boost::copy_range(live_endpoints - | boost::adaptors::filtered(get_token_metadata_ptr()->get_topology().get_local_dc_filter())); + | boost::adaptors::filtered(erm->get_topology().get_local_dc_filter())); if (local_endpoints.empty()) { // FIXME: O(n log n) to get maximum @@ -3025,14 +3041,15 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level // Let's just use the schema of the first mutation in a vector. auto handle_error = [this, sp = this->shared_from_this(), s = endpoint_and_mutations.second[0].s, cl, permit] (std::exception_ptr exp) { auto& ks = _db.local().find_keyspace(s->ks_name()); + auto erm = ks.get_effective_replication_map(); try { std::rethrow_exception(std::move(exp)); } catch (rpc::timeout_error&) { - return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER)); + return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER)); } catch (timed_out_error&) { - return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER)); + return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER)); } catch (rpc::closed_error&) { - return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(ks, cl), db::write_type::COUNTER)); + return make_exception_future<>(mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER)); } }; @@ -3065,7 +3082,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token & inet_address_vector_topology_change pending_endpoints = erm->get_token_metadata_ptr()->pending_endpoints_for(token, ks_name); if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) { - auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter(); + auto local_dc_filter = erm->get_topology().get_local_dc_filter(); auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(local_dc_filter))); natural_endpoints.erase(itend, natural_endpoints.end()); itend = boost::range::remove_if(pending_endpoints, std::not_fn(std::cref(local_dc_filter))); @@ -3346,7 +3363,8 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con future> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); + auto ermp = ks.get_effective_replication_map(); + return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout); @@ -3476,10 +3494,11 @@ future<> storage_proxy::send_to_endpoint( std::back_inserter(dead_endpoints), std::bind_front(&storage_proxy::is_alive, this)); auto& ks = _db.local().find_keyspace(m->schema()->ks_name()); + auto erm = ks.get_effective_replication_map(); slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); - db::assure_sufficient_live_nodes(cl, ks, targets, pending_endpoints); + db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints); return create_write_response_handler( - ks, + std::move(erm), cl, type, std::move(m), @@ -3592,7 +3611,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto& stats = handler_ptr->stats(); auto& handler = *handler_ptr; auto& global_stats = handler._proxy->_global_stats; - auto& topology = get_token_metadata_ptr()->get_topology(); + auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology(); auto local_dc = topology.get_datacenter(); for(auto dest: handler.get_targets()) { @@ -3645,9 +3664,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo got_response(response_id, coordinator, std::nullopt); } else { if (!handler.read_repair_write()) { - ++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); + ++stats.writes_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator); } else { - ++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); + ++stats.read_repair_write_attempts.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator); } if (coordinator == my_address) { @@ -3659,7 +3678,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // Waited on indirectly. (void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { - ++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator); + ++stats.writes_errors.get_ep_stat(handler_ptr->_effective_replication_map_ptr->get_topology(), coordinator); error err = error::FAILURE; std::optional msg; if (try_catch(eptr)) { @@ -3801,6 +3820,7 @@ class digest_read_resolver : public abstract_read_resolver { }; private: shared_ptr _proxy; + locator::effective_replication_map_ptr _effective_replication_map_ptr; size_t _block_for; size_t _cl_responses = 0; promise> _cl_promise; // cl is reached @@ -3822,9 +3842,12 @@ private: _digest_results.clear(); } public: - digest_read_resolver(shared_ptr proxy, schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout) + digest_read_resolver(shared_ptr proxy, + locator::effective_replication_map_ptr ermp, + schema_ptr schema, db::consistency_level cl, size_t block_for, size_t target_count_for_cl, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, 0, timeout) , _proxy(std::move(proxy)) + , _effective_replication_map_ptr(std::move(ermp)) , _block_for(block_for) , _target_count_for_cl(target_count_for_cl) {} @@ -3868,7 +3891,7 @@ public: } private: bool waiting_for(gms::inet_address ep) { - const auto& topo = _proxy->get_token_metadata_ptr()->get_topology(); + const auto& topo = _effective_replication_map_ptr->get_topology(); return db::is_datacenter_local(_cl) ? fbu::is_me(ep) || (topo.get_datacenter(ep) == topo.get_datacenter()) : true; } void got_response(gms::inet_address ep) { @@ -4430,6 +4453,7 @@ protected: schema_ptr _schema; shared_ptr _proxy; + locator::effective_replication_map_ptr _effective_replication_map_ptr; lw_shared_ptr _cmd; lw_shared_ptr _retry_cmd; dht::partition_range _partition_range; @@ -4453,13 +4477,17 @@ private: } const locator::topology& get_topology() const noexcept { - return _proxy->get_token_metadata_ptr()->get_topology(); + return _effective_replication_map_ptr->get_topology(); } public: - abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, + abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, + locator::effective_replication_map_ptr ermp, + lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : - _schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)), + _schema(std::move(s)), _proxy(std::move(proxy)) + , _effective_replication_map_ptr(std::move(ermp)) + , _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)), _cf(std::move(cf)), _permit(std::move(permit)), _rate_limit_info(rate_limit_info) { _proxy->get_stats().reads++; _proxy->get_stats().foreground_reads++; @@ -4723,8 +4751,8 @@ public: // Return an empty result in this case return make_ready_future>>>(make_foreign(make_lw_shared(query::result()))); } - digest_resolver_ptr digest_resolver = ::make_shared(_proxy, _schema, _cl, _block_for, - db::is_datacenter_local(_cl) ? _proxy->get_token_metadata_ptr()->get_topology().count_local_endpoints(_targets): _targets.size(), timeout); + digest_resolver_ptr digest_resolver = ::make_shared(_proxy, _effective_replication_map_ptr, _schema, _cl, _block_for, + db::is_datacenter_local(_cl) ? _effective_replication_map_ptr->get_topology().count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); make_requests(digest_resolver, timeout); @@ -4765,7 +4793,7 @@ public: if (std::abs(delta) <= write_timeout) { exec->_proxy->get_stats().global_read_repairs_canceled_due_to_concurrent_write++; // if CL is local and non matching data is modified less then write_timeout ms ago do only local repair - auto local_dc_filter = exec->_proxy->get_token_metadata_ptr()->get_topology().get_local_dc_filter(); + auto local_dc_filter = exec->_effective_replication_map_ptr->get_topology().get_local_dc_filter(); auto i = boost::range::remove_if(exec->_targets, std::not_fn(std::cref(local_dc_filter))); exec->_targets.erase(i, exec->_targets.end()); } @@ -4823,9 +4851,11 @@ private: class never_speculating_read_executor : public abstract_read_executor { public: - never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit, + never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, + locator::effective_replication_map_ptr ermp, + lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : - abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) { + abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(ermp), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit), rate_limit_info) { _block_for = _targets.size(); } }; @@ -4923,10 +4953,11 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw service_permit permit) { const dht::token& token = pr.start()->value().token(); replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + auto erm = ks.get_effective_replication_map(); speculative_retry::type retry_type = schema->speculative_retry().get_type(); gms::inet_address extra_replica; - inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token); + inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token); // Check for a non-local read before heat-weighted load balancing // reordering of endpoints happens. The local endpoint, if // present, is always first in the list, as get_live_sorted_endpoints() @@ -4935,7 +4966,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw auto cf = _db.local().find_column_family(schema).shared_from_this(); auto& gossiper = _remote->gossiper(); - inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, + inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, *erm, all_replicas, preferred_endpoints, repair_decision, gossiper, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr); @@ -4945,7 +4976,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // Throw UAE early if we don't have enough replicas. try { - db::assure_sufficient_live_nodes(cl, ks, target_replicas); + db::assure_sufficient_live_nodes(cl, *erm, target_replicas); } catch (exceptions::unavailable_exception& ex) { slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive); get_stats().read_unavailables.mark(); @@ -4956,7 +4987,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw get_stats().read_repair_attempts++; } - size_t block_for = db::block_for(ks, cl); + size_t block_for = db::block_for(*erm, cl); auto p = shared_from_this(); db::per_partition_rate_limit::info rate_limit_info; @@ -4973,22 +5004,22 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size() || (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) { - return ::make_shared(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); + return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } if (target_replicas.size() == all_replicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return ::make_shared(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); + return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present - auto local_dc_filter = get_token_metadata_ptr()->get_topology().get_local_dc_filter(); + auto local_dc_filter = erm->get_topology().get_local_dc_filter(); if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) { slogger.trace("read executor no extra target to speculate"); - return ::make_shared(schema, cf, p, cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); + return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else { target_replicas.push_back(extra_replica); slogger.trace("creating read executor with extra target {}", extra_replica); @@ -4996,9 +5027,9 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw } if (retry_type == speculative_retry::type::ALWAYS) { - return ::make_shared(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); + return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } else {// PERCENTILE or CUSTOM. - return ::make_shared(schema, cf, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); + return ::make_shared(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info); } } @@ -5180,6 +5211,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t service_permit permit) { schema_ptr schema = local_schema_registry().get(cmd->schema_version); replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + auto erm = ks.get_effective_replication_map(); std::vector<::shared_ptr> exec; auto p = shared_from_this(); auto& cf= _db.local().find_column_family(schema); @@ -5209,9 +5241,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t auto& gossiper = _remote->gossiper(); while (i != ranges.end()) { dht::partition_range& range = *i; - inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); + inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(*erm, end_token(range)); inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i); - inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, gossiper, pcf); + inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, *erm, live_endpoints, merged_preferred_replicas, gossiper, pcf); std::vector merged_ranges{to_token_range(range)}; ++i; @@ -5222,8 +5254,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t { const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); dht::partition_range& next_range = *i; - inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); - inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, gossiper, pcf); + inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(*erm, end_token(next_range)); + inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, *erm, next_endpoints, current_range_preferred_replicas, gossiper, pcf); // Origin has this to say here: // * If the current range right is the min token, we should stop merging because CFS.getRangeSlice @@ -5264,11 +5296,11 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); // Check if there is enough endpoint for the merge to be possible. - if (!is_sufficient_live_nodes(cl, ks, merged)) { + if (!is_sufficient_live_nodes(cl, *erm, merged)) { break; } - inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, gossiper, pcf); + inet_address_vector_replica_set filtered_merged = filter_for_query(cl, *erm, merged, current_merged_preferred_replicas, gossiper, pcf); // Estimate whether merging will be a win or not if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) { @@ -5305,14 +5337,14 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t } slogger.trace("creating range read executor with targets {}", filtered_endpoints); try { - db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints); + db::assure_sufficient_live_nodes(cl, *erm, filtered_endpoints); } catch(exceptions::unavailable_exception& ex) { slogger.debug("Read unavailable: cl={} required {} alive {}", ex.consistency, ex.required, ex.alive); get_stats().range_slice_unavailables.mark(); throw; } - exec.push_back(::make_shared(schema, cf.shared_from_this(), p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate())); + exec.push_back(::make_shared(schema, cf.shared_from_this(), p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate())); ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges)); } @@ -5789,9 +5821,8 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque co_return condition_met; } -inet_address_vector_replica_set storage_proxy::get_live_endpoints(replica::keyspace& ks, const dht::token& token) const { - auto erm = ks.get_effective_replication_map(); - inet_address_vector_replica_set eps = erm->get_natural_endpoints_without_node_being_replaced(token); +inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const { + inet_address_vector_replica_set eps = erm.get_natural_endpoints_without_node_being_replaced(token); auto itend = boost::range::remove_if(eps, std::not_fn(std::bind_front(&storage_proxy::is_alive, this))); eps.erase(itend, eps.end()); return eps; @@ -5806,8 +5837,8 @@ void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& } } -inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const { - auto eps = get_live_endpoints(ks, token); +inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const { + auto eps = get_live_endpoints(erm, token); sort_endpoints_by_proximity(eps); return eps; } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 3e5a86595e..e29f06044f 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -226,7 +226,7 @@ public: query::max_result_size get_max_result_size(const query::partition_slice& slice) const; query::tombstone_limit get_tombstone_limit() const; - inet_address_vector_replica_set get_live_endpoints(replica::keyspace& ks, const dht::token& token) const; + inet_address_vector_replica_set get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const; private: distributed& _db; @@ -313,7 +313,7 @@ private: result create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); - result create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, + result create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); @@ -332,7 +332,7 @@ private: bool hints_enabled(db::write_type type) const noexcept; db::hints::manager& hints_manager_for(db::write_type type); static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps); - inet_address_vector_replica_set get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const; + inet_address_vector_replica_set get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const; bool is_alive(const gms::inet_address&) const; db::read_repair_decision new_read_repair_decision(const schema& s); result<::shared_ptr> get_read_executor(lw_shared_ptr cmd,