mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 03:56:42 +00:00
Merge 'paxos_response_handler: carry effective replication map' from Benny Halevy
As `create_write_response_handler` on this path accepts an `inet_address_vector_replica_set` that corresponds to the effective_replication_map_ptr in the paxos_response_handler, but currently, the function retrieves a new effective_replication_map_ptr that may not hold all the said endpoints. Fixes scylladb/scylladb#15138 Closes #15141 * github.com:scylladb/scylladb: storage_proxy: create_write_response_handler: carry effective_replication_map_ptr from paxos_response_handler storage_proxy: send_to_live_endpoints: throw on_internal_error if node not found
This commit is contained in:
@@ -1216,6 +1216,10 @@ public:
|
||||
// this is called with an id of a replica that replied to learn request
|
||||
// adn returns true when quorum of such requests are accumulated
|
||||
bool learned(gms::inet_address ep);
|
||||
|
||||
const locator::effective_replication_map_ptr& get_effective_replication_map() const noexcept {
|
||||
return _effective_replication_map_ptr;
|
||||
}
|
||||
};
|
||||
|
||||
thread_local uint64_t paxos_response_handler::next_id = 0;
|
||||
@@ -1862,8 +1866,8 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
|
||||
if (missing_mrc.size() > 0) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id);
|
||||
tracing::trace(tr_state, "Repairing replicas that missed the most recent commit");
|
||||
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>, 1>
|
||||
m{std::make_tuple(make_lw_shared<paxos::proposal>(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))};
|
||||
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token, inet_address_vector_replica_set>, 1>
|
||||
m{std::make_tuple(make_lw_shared<paxos::proposal>(std::move(*summary.most_recent_commit)), _schema, shared_from_this(), _key.token(), std::move(missing_mrc))};
|
||||
// create_write_response_handler is overloaded for paxos::proposal and will
|
||||
// create cas_mutation holder, which consequently will ensure paxos::learn is
|
||||
// used.
|
||||
@@ -3115,16 +3119,14 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
|
||||
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token, inet_address_vector_replica_set>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
auto& [commit, s, token, endpoints] = meta;
|
||||
auto& [commit, s, paxos_handler, token, endpoints] = meta;
|
||||
|
||||
slogger.trace("creating write handler for paxos repair token: {} endpoint: {}", token, endpoints);
|
||||
tracing::trace(tr_state, "Creating write handler for paxos repair token: {} endpoint: {}", token, endpoints);
|
||||
|
||||
auto keyspace_name = s->ks_name();
|
||||
replica::table& table = _db.local().find_column_family(s->id());
|
||||
auto ermp = table.get_effective_replication_map();
|
||||
auto ermp = paxos_handler->get_effective_replication_map();
|
||||
|
||||
// No rate limiting for paxos (yet)
|
||||
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
||||
@@ -3923,6 +3925,15 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
|
||||
for(auto dest: handler.get_targets()) {
|
||||
auto node = topology.find_node(dest);
|
||||
if (!node) {
|
||||
// The caller is supposed to pick target nodes from the topology
|
||||
// contained in the effective_replication_map that is kept in the handler.
|
||||
// If the e_r_m is not in sync with the topology used to pick the targets
|
||||
// endpoints may be missing here and we better off returning an error
|
||||
// (or aborting in testing) rather than segfaulting here
|
||||
// (See https://github.com/scylladb/scylladb/issues/15138)
|
||||
on_internal_error(slogger, fmt::format("Node {} was not found in topology", dest));
|
||||
}
|
||||
const auto& dc = node->dc_rack().dc;
|
||||
// read repair writes do not go through coordinator since mutations are per destination
|
||||
if (handler.read_repair_write() || dc == local_dc) {
|
||||
|
||||
@@ -319,7 +319,7 @@ private:
|
||||
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
|
||||
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token, inet_address_vector_replica_set>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
|
||||
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
|
||||
|
||||
Reference in New Issue
Block a user