From e577f1d1414d640a68fd5adaef8b3220be72bdf7 Mon Sep 17 00:00:00 2001 From: Sergey Zolotukhin Date: Wed, 18 Dec 2024 15:50:03 +0100 Subject: [PATCH 1/3] test: Add test case for checking read repair diff calculation when having conflicting keys. The test updates two rows with keys that result in a Murmur3 hash collision, which is used to generate Scylla tokens. These tokens are involved in read repair diff calculations. Due to the identical token values, a hash map key collision occurs. Consequently, an incorrect value from the second row (with a different primary key) is then sent for writing as 'repaired', causing data corruption. --- .../test_conflicting_keys_read_repair.py | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 test/topology_custom/test_conflicting_keys_read_repair.py diff --git a/test/topology_custom/test_conflicting_keys_read_repair.py b/test/topology_custom/test_conflicting_keys_read_repair.py new file mode 100644 index 0000000000..2c792dfee3 --- /dev/null +++ b/test/topology_custom/test_conflicting_keys_read_repair.py @@ -0,0 +1,82 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import logging +import pytest +import asyncio +import time + +from cassandra import ConsistencyLevel # type: ignore +from cassandra.query import SimpleStatement # type: ignore +from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_cql_and_get_hosts + + +logger = logging.getLogger(__name__) + + +@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19101") +@pytest.mark.asyncio +async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None: + """ + Test that conflicting hash keys are handled correctly during read repair. + Issue https://github.com/scylladb/scylladb/issues/19101 + + 1. Create a new cluster with 3 nodes. + 2. Create a keyspace and a table with replication factor = 3. + 3. Stop one of the nodes. + 4. Add 2 rows that have primary keys causing a hash collision. + 5. Start the offline node. + 6. Run a SELECT query with ALL consistency level, forcing reading from all 3 nodes. + The node that's been offline will not have a value, causing a read repair. + Since difference calculation logic is using a token for it's hashmap key and the + token value is the same for both keys, this causes an incorrect diff calculation + and propagation to the node that was offline. + 7. Run the same SELECT query with ALL consistency level, forcing reading from all 3 nodes. + now there is also a conflict, since the node that was reset got an incorrect value as a + result of and prev step read repair. This incorrect value is newer than others, thus it + will be the result of reconciliation in case the diff calculation algorithm is using a + token as a key. + + """ + logger.info("Creating a new cluster") + srvs = await manager.servers_add(3) + cql, _ = await manager.get_ready_cql(srvs) + + await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") + await cql.run_async("CREATE TABLE ks.t (pk bigint PRIMARY KEY, c int);") + + # Stop one of the nodes. + await manager.server_stop_gracefully(srvs[0].server_id) + + # Add rows with partition kays that cause murmur3 hash collision, token value [6874760189787677834]. + pk1 = -4818441857111425024 + pk2 = -8686612841249112064 + await cql.run_async(SimpleStatement(f"INSERT INTO ks.t (pk, c) VALUES ({pk1}, 111)", consistency_level=ConsistencyLevel.ONE)) + await cql.run_async(SimpleStatement(f"INSERT INTO ks.t (pk, c) VALUES ({pk2}, 222)", consistency_level=ConsistencyLevel.ONE)) + + # Start the offline node. + await manager.server_start(srvs[0].server_id, wait_others=2) + + # Run a SELECT query with ALL consistency level, forcing reading from all 3 nodes. + res = await cql.run_async(SimpleStatement("SELECT * FROM ks.t", consistency_level=ConsistencyLevel.ALL)) + + # Validate the results (should be OK). + assert len(res) == 2 + for row in res: + if (row.pk == pk1): + assert row.c == 111 + elif (row.pk == pk2): + assert row.c == 222 + + res = await cql.run_async(SimpleStatement("SELECT * FROM ks.t", consistency_level=ConsistencyLevel.ALL)) + + # Validate the results (will be wrong in case the diff calculation hash map uses tokens as keys). + assert len(res) == 2 + for row in res: + if (row.pk == pk1): + assert row.c == 111 + elif (row.pk == pk2): + assert row.c == 222 From 39785c6f4e265c736214dd186eb0eae58a9c92d1 Mon Sep 17 00:00:00 2001 From: Sergey Zolotukhin Date: Thu, 19 Dec 2024 17:13:55 +0100 Subject: [PATCH 2/3] storage_proxy/read_repair: Use `partition_key` instead of `token` key for mutation diff calculation hashmap. This update addresses an issue in the mutation diff calculation algorithm used during read repair. Previously, the algorithm used `token` as the hashmap key. Since `token` is calculated basing on the Murmur3 hash function, it could generate duplicate values for different partition keys, causing corruption in the affected rows' values. Fixes scylladb/scylladb#19101 --- service/storage_proxy.cc | 24 +++++++++---------- service/storage_proxy.hh | 4 +++- test/topology/util.py | 3 +-- .../test_conflicting_keys_read_repair.py | 1 - 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 6265191f12..0b187c91c3 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4398,7 +4398,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& m } } -future> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, +future> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit) { if (diffs.empty()) { return make_ready_future>(bo::success()); @@ -4704,7 +4704,7 @@ class data_read_resolver : public abstract_read_resolver { bool _all_reached_end = true; query::short_read _is_short_read; std::vector _data_results; - std::unordered_map>> _diffs; + mutations_per_partition_key_map _diffs; private: void on_timeout() override { fail_request(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0)); @@ -4911,7 +4911,8 @@ private: return got_incomplete_information_across_partitions(s, cmd, last_row, rp, versions, is_reversed); } public: - data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) { + data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout), + _diffs(10, partition_key::hashing(*_schema), partition_key::equality(*_schema)) { _data_results.reserve(targets_count); } void add_mutate_data(locator::host_id from, foreign_ptr> result) { @@ -5047,7 +5048,7 @@ public: bool has_diff = false; - // calculate differences + // Сalculate differences: iterate over the versions from all the nodes and calculate the difference with the reconciled result. for (auto z : std::views::zip(versions, reconciled_partitions)) { const mutation& m = std::get<1>(z).mut; for (const version& v : std::get<0>(z)) { @@ -5059,15 +5060,12 @@ public: has_diff = true; mdiff = mutation(schema, m.decorated_key(), std::move(diff)); } - if (auto [it, added] = _diffs[m.token()].try_emplace(v.from, std::move(mdiff)); !added) { - // should not really happen, but lets try to deal with it - if (mdiff) { - if (it->second) { - it->second.value().apply(std::move(mdiff.value())); - } else { - it->second = std::move(mdiff); - } - } + if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) { + // A collision could happen only in 2 cases: + // 1. We have 2 versions for the same node. + // 2. `versions` (and or) `reconciled_partitions` are not unique per partition key. + // Both cases are not possible unless there is a bug in the reconcilliation code. + on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema->ks_name())); } co_await coroutine::maybe_yield(); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 6689b68c5d..85756c3a09 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -91,6 +91,8 @@ struct hint_wrapper; struct read_repair_mutation; using replicas_per_token_range = std::unordered_map>; +using mutations_per_partition_key_map = + std::unordered_map>, partition_key::hashing, partition_key::equality>; struct query_partition_key_range_concurrent_result { std::vector>> result; @@ -422,7 +424,7 @@ private: future> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); future> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional timeout_opt = { }); future> mutate_end(future> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state); - future> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit); + future> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit); bool need_throttle_writes() const; void unthrottle(); void handle_read_error(std::variant failure, bool range); diff --git a/test/topology/util.py b/test/topology/util.py index aa3e4769bc..6b55d9c000 100644 --- a/test/topology/util.py +++ b/test/topology/util.py @@ -365,8 +365,7 @@ async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3): stream_to_timestamp = { stream: gen.time for gen in generations for stream in gen.streams} - # FIXME: Doesn't work with all_pages=True (https://github.com/scylladb/scylladb/issues/19101) - cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=False) + cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=True) for log_entry in cdc_log: assert log_entry.cdc_stream_id in stream_to_timestamp timestamp = stream_to_timestamp[log_entry.cdc_stream_id] diff --git a/test/topology_custom/test_conflicting_keys_read_repair.py b/test/topology_custom/test_conflicting_keys_read_repair.py index 2c792dfee3..6d01e0f2eb 100644 --- a/test/topology_custom/test_conflicting_keys_read_repair.py +++ b/test/topology_custom/test_conflicting_keys_read_repair.py @@ -17,7 +17,6 @@ from test.pylib.util import wait_for_cql_and_get_hosts logger = logging.getLogger(__name__) -@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19101") @pytest.mark.asyncio async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None: """ From 155480595ff2daea1efa524e4f0e29928668852f Mon Sep 17 00:00:00 2001 From: Sergey Zolotukhin Date: Fri, 20 Dec 2024 16:03:25 +0100 Subject: [PATCH 3/3] storage_proxy/read_repair: Remove redundant 'schema' parameter from `data_read_resolver::resolve` function. The `data_read_resolver` class inherits from `abstract_read_resolver`, which already includes the `schema_ptr _schema` member. Therefore, using a separate function parameter in `data_read_resolver::resolve` initialized with the same variable in `abstract_read_executor` is redundant. --- service/storage_proxy.cc | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0b187c91c3..6a65281322 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4957,7 +4957,7 @@ public: bool all_reached_end() const { return _all_reached_end; } - future> resolve(schema_ptr schema, const query::read_command& cmd, uint64_t original_row_limit, uint64_t original_per_partition_limit, + future> resolve(const query::read_command& cmd, uint64_t original_row_limit, uint64_t original_per_partition_limit, uint32_t original_partition_limit) { SCYLLA_ASSERT(_data_results.size()); @@ -4969,10 +4969,10 @@ public: co_return reconcilable_result(p->row_count(), p->partitions(), p->is_short_read()); } - const auto& s = *schema; + const auto& schema = *_schema; // return true if lh > rh - auto cmp = [&s](reply& lh, reply& rh) { + auto cmp = [&schema](reply& lh, reply& rh) { if (lh.result->partitions().size() == 0) { return false; // reply with empty partition array goes to the end of the sorted array } else if (rh.result->partitions().size() == 0) { @@ -4980,7 +4980,7 @@ public: } else { auto lhk = lh.result->partitions().back().mut().key(); auto rhk = rh.result->partitions().back().mut().key(); - return lhk.ring_order_tri_compare(s, rhk) > 0; + return lhk.ring_order_tri_compare(schema, rhk) > 0; } }; @@ -5010,7 +5010,7 @@ public: v.reserve(_targets_count); for (reply& r : _data_results) { auto pit = r.result->partitions().rbegin(); - if (pit != r.result->partitions().rend() && pit->mut().key().legacy_equal(s, max_key)) { + if (pit != r.result->partitions().rend() && pit->mut().key().legacy_equal(schema, max_key)) { bool reached_partition_end = pit->row_count() < cmd.slice.partition_row_limit(); v.emplace_back(r.from, std::move(*pit), r.reached_end, reached_partition_end); r.result->partitions().pop_back(); @@ -5031,11 +5031,11 @@ public: auto it = std::ranges::find_if(v, [] (auto&& ver) { return bool(ver.par); }); - auto m = mutation(schema, it->par->mut().key()); + auto m = mutation(_schema, it->par->mut().key()); for (const version& ver : v) { if (ver.par) { mutation_application_stats app_stats; - co_await apply_gently(m.partition(), *schema, ver.par->mut().partition(), *schema, app_stats); + co_await apply_gently(m.partition(), schema, ver.par->mut().partition(), schema, app_stats); } } auto live_row_count = m.live_row_count(); @@ -5053,26 +5053,26 @@ public: const mutation& m = std::get<1>(z).mut; for (const version& v : std::get<0>(z)) { auto diff = v.par - ? m.partition().difference(*schema, (co_await unfreeze_gently(v.par->mut(), schema)).partition()) - : mutation_partition(*schema, m.partition()); + ? m.partition().difference(schema, (co_await unfreeze_gently(v.par->mut(), _schema)).partition()) + : mutation_partition(schema, m.partition()); std::optional mdiff; if (!diff.empty()) { has_diff = true; - mdiff = mutation(schema, m.decorated_key(), std::move(diff)); + mdiff = mutation(_schema, m.decorated_key(), std::move(diff)); } if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) { // A collision could happen only in 2 cases: // 1. We have 2 versions for the same node. // 2. `versions` (and or) `reconciled_partitions` are not unique per partition key. // Both cases are not possible unless there is a bug in the reconcilliation code. - on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema->ks_name())); + on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema.ks_name())); } co_await coroutine::maybe_yield(); } } if (has_diff) { - if (got_incomplete_information(*schema, cmd, original_row_limit, original_per_partition_limit, + if (got_incomplete_information(schema, cmd, original_row_limit, original_per_partition_limit, original_partition_limit, reconciled_partitions, versions)) { co_return std::nullopt; } @@ -5361,7 +5361,7 @@ protected: on_read_resolved(); co_return; } - auto rr_opt = co_await data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here + auto rr_opt = co_await data_resolver->resolve(*cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry).