From 27c33015a5c300f3fa34165eb8980fed77970814 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 23 Aug 2023 20:34:32 +0300 Subject: [PATCH 1/2] storage_proxy: send_to_live_endpoints: throw on_internal_error if node not found Return error in production rather than crashing as in https://github.com/scylladb/scylladb/issues/15138 Signed-off-by: Benny Halevy --- service/storage_proxy.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index ba7eeb8e65..ca21f80e3c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3920,6 +3920,15 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo for(auto dest: handler.get_targets()) { auto node = topology.find_node(dest); + if (!node) { + // The caller is supposed to pick target nodes from the topology + // contained in the effective_replication_map that is kept in the handler. + // If the e_r_m is not in sync with the topology used to pick the targets + // endpoints may be missing here and we better off returning an error + // (or aborting in testing) rather than segfaulting here + // (See https://github.com/scylladb/scylladb/issues/15138) + on_internal_error(slogger, fmt::format("Node {} was not found in topology", dest)); + } const auto& dc = node->dc_rack().dc; // read repair writes do not go through coordinator since mutations are per destination if (handler.read_repair_write() || dc == local_dc) { From 4a2e367e92fc48bda04952db3ab7c44e37470c40 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 23 Aug 2023 16:29:32 +0300 Subject: [PATCH 2/2] storage_proxy: create_write_response_handler: carry effective_replication_map_ptr from paxos_response_handler As `create_write_response_handler` on this path accepts an `inet_address_vector_replica_set` that corresponds to the effective_replication_map_ptr in the paxos_response_handler, but currently, the function retrieves a new effective_replication_map_ptr that may not hold all the said endpoints. Fixes scylladb/scylladb#15138 Signed-off-by: Benny Halevy --- service/storage_proxy.cc | 16 +++++++++------- service/storage_proxy.hh | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index ca21f80e3c..540e748986 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1216,6 +1216,10 @@ public: // this is called with an id of a replica that replied to learn request // adn returns true when quorum of such requests are accumulated bool learned(gms::inet_address ep); + + const locator::effective_replication_map_ptr& get_effective_replication_map() const noexcept { + return _effective_replication_map_ptr; + } }; thread_local uint64_t paxos_response_handler::next_id = 0; @@ -1862,8 +1866,8 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte if (missing_mrc.size() > 0) { paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id); tracing::trace(tr_state, "Repairing replicas that missed the most recent commit"); - std::array, schema_ptr, dht::token, inet_address_vector_replica_set>, 1> - m{std::make_tuple(make_lw_shared(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))}; + std::array, schema_ptr, shared_ptr, dht::token, inet_address_vector_replica_set>, 1> + m{std::make_tuple(make_lw_shared(std::move(*summary.most_recent_commit)), _schema, shared_from_this(), _key.token(), std::move(missing_mrc))}; // create_write_response_handler is overloaded for paxos::proposal and will // create cas_mutation holder, which consequently will ensure paxos::learn is // used. @@ -3115,16 +3119,14 @@ storage_proxy::create_write_response_handler(const std::tuple -storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, +storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { - auto& [commit, s, token, endpoints] = meta; + auto& [commit, s, paxos_handler, token, endpoints] = meta; slogger.trace("creating write handler for paxos repair token: {} endpoint: {}", token, endpoints); tracing::trace(tr_state, "Creating write handler for paxos repair token: {} endpoint: {}", token, endpoints); - auto keyspace_name = s->ks_name(); - replica::table& table = _db.local().find_column_family(s->id()); - auto ermp = table.get_effective_replication_map(); + auto ermp = paxos_handler->get_effective_replication_map(); // No rate limiting for paxos (yet) return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 01f2699a8c..36b323780e 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -319,7 +319,7 @@ private: result create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token>& proposal, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); - result create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, + result create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);