diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9da8cafa86..73d6d74494 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4319,7 +4319,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& m } } -future> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, +future> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit) { if (diffs.empty()) { return make_ready_future>(bo::success()); @@ -4625,7 +4625,7 @@ class data_read_resolver : public abstract_read_resolver { bool _all_reached_end = true; query::short_read _is_short_read; std::vector _data_results; - std::unordered_map>> _diffs; + mutations_per_partition_key_map _diffs; private: void on_timeout() override { fail_request(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0)); @@ -4832,7 +4832,8 @@ private: return got_incomplete_information_across_partitions(s, cmd, last_row, rp, versions, is_reversed); } public: - data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { + data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout), + _diffs(10, partition_key::hashing(*_schema), partition_key::equality(*_schema)) { _data_results.reserve(targets_count); } void add_mutate_data(gms::inet_address from, foreign_ptr> result) { @@ -4983,15 +4984,12 @@ public: has_diff = true; mdiff = mutation(schema, m.decorated_key(), std::move(diff)); } - if (auto [it, added] = _diffs[m.token()].try_emplace(v.from, std::move(mdiff)); !added) { - // should not really happen, but lets try to deal with it - if (mdiff) { - if (it->second) { - it->second.value().apply(std::move(mdiff.value())); - } else { - it->second = std::move(mdiff); - } - } + if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) { + // A collision could happen only in 2 cases: + // 1. We have 2 versions for the same node. + // 2. `versions` (and or) `reconciled_partitions` are not unique per partition key. + // Both cases are not possible unless there is a bug in the reconcilliation code. + on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema->ks_name())); } co_await coroutine::maybe_yield(); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 734b73ac55..ea2eb27c17 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -89,6 +89,8 @@ struct hint_wrapper; struct read_repair_mutation; using replicas_per_token_range = std::unordered_map>; +using mutations_per_partition_key_map = + std::unordered_map>, partition_key::hashing, partition_key::equality>; struct query_partition_key_range_concurrent_result { std::vector>> result; @@ -419,7 +421,7 @@ private: future> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); future> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional timeout_opt = { }); future> mutate_end(future> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state); - future> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit); + future> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit); bool need_throttle_writes() const; void unthrottle(); void handle_read_error(std::variant failure, bool range); diff --git a/test/topology/util.py b/test/topology/util.py index cd95610e1d..979ca911dd 100644 --- a/test/topology/util.py +++ b/test/topology/util.py @@ -365,8 +365,7 @@ async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3): stream_to_timestamp = { stream: gen.time for gen in generations for stream in gen.streams} - # FIXME: Doesn't work with all_pages=True (https://github.com/scylladb/scylladb/issues/19101) - cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=False) + cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=True) for log_entry in cdc_log: assert log_entry.cdc_stream_id in stream_to_timestamp timestamp = stream_to_timestamp[log_entry.cdc_stream_id] diff --git a/test/topology_custom/test_conflicting_keys_read_repair.py b/test/topology_custom/test_conflicting_keys_read_repair.py index 2c792dfee3..6d01e0f2eb 100644 --- a/test/topology_custom/test_conflicting_keys_read_repair.py +++ b/test/topology_custom/test_conflicting_keys_read_repair.py @@ -17,7 +17,6 @@ from test.pylib.util import wait_for_cql_and_get_hosts logger = logging.getLogger(__name__) -@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19101") @pytest.mark.asyncio async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None: """