Merge 'Fix possible data corruption due to token keys clashing in read repair.' from Sergey Zolotukhin
This update addresses an issue in the mutation diff calculation algorithm used during read repair. Previously, the algorithm used `token` as the hashmap key. Since `token` is calculated basing on the Murmur3 hash function, it could generate duplicate values for different partition keys, causing corruption in the affected rows' values. Fixes scylladb/scylladb#19101 Since the issue affects all the relevant scylla versions, backport to: 6.1, 6.2 Closes scylladb/scylladb#21996 * github.com:scylladb/scylladb: storage_proxy/read_repair: Remove redundant 'schema' parameter from `data_read_resolver::resolve` function. storage_proxy/read_repair: Use `partition_key` instead of `token` key for mutation diff calculation hashmap. test: Add test case for checking read repair diff calculation when having conflicting keys.
This commit is contained in:
@@ -4398,7 +4398,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& m
|
||||
}
|
||||
}
|
||||
|
||||
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
|
||||
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
if (diffs.empty()) {
|
||||
return make_ready_future<result<>>(bo::success());
|
||||
@@ -4704,7 +4704,7 @@ class data_read_resolver : public abstract_read_resolver {
|
||||
bool _all_reached_end = true;
|
||||
query::short_read _is_short_read;
|
||||
std::vector<reply> _data_results;
|
||||
std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> _diffs;
|
||||
mutations_per_partition_key_map _diffs;
|
||||
private:
|
||||
void on_timeout() override {
|
||||
fail_request(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0));
|
||||
@@ -4911,7 +4911,8 @@ private:
|
||||
return got_incomplete_information_across_partitions(s, cmd, last_row, rp, versions, is_reversed);
|
||||
}
|
||||
public:
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout),
|
||||
_diffs(10, partition_key::hashing(*_schema), partition_key::equality(*_schema)) {
|
||||
_data_results.reserve(targets_count);
|
||||
}
|
||||
void add_mutate_data(locator::host_id from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
|
||||
@@ -4956,7 +4957,7 @@ public:
|
||||
bool all_reached_end() const {
|
||||
return _all_reached_end;
|
||||
}
|
||||
future<std::optional<reconcilable_result>> resolve(schema_ptr schema, const query::read_command& cmd, uint64_t original_row_limit, uint64_t original_per_partition_limit,
|
||||
future<std::optional<reconcilable_result>> resolve(const query::read_command& cmd, uint64_t original_row_limit, uint64_t original_per_partition_limit,
|
||||
uint32_t original_partition_limit) {
|
||||
SCYLLA_ASSERT(_data_results.size());
|
||||
|
||||
@@ -4968,10 +4969,10 @@ public:
|
||||
co_return reconcilable_result(p->row_count(), p->partitions(), p->is_short_read());
|
||||
}
|
||||
|
||||
const auto& s = *schema;
|
||||
const auto& schema = *_schema;
|
||||
|
||||
// return true if lh > rh
|
||||
auto cmp = [&s](reply& lh, reply& rh) {
|
||||
auto cmp = [&schema](reply& lh, reply& rh) {
|
||||
if (lh.result->partitions().size() == 0) {
|
||||
return false; // reply with empty partition array goes to the end of the sorted array
|
||||
} else if (rh.result->partitions().size() == 0) {
|
||||
@@ -4979,7 +4980,7 @@ public:
|
||||
} else {
|
||||
auto lhk = lh.result->partitions().back().mut().key();
|
||||
auto rhk = rh.result->partitions().back().mut().key();
|
||||
return lhk.ring_order_tri_compare(s, rhk) > 0;
|
||||
return lhk.ring_order_tri_compare(schema, rhk) > 0;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -5009,7 +5010,7 @@ public:
|
||||
v.reserve(_targets_count);
|
||||
for (reply& r : _data_results) {
|
||||
auto pit = r.result->partitions().rbegin();
|
||||
if (pit != r.result->partitions().rend() && pit->mut().key().legacy_equal(s, max_key)) {
|
||||
if (pit != r.result->partitions().rend() && pit->mut().key().legacy_equal(schema, max_key)) {
|
||||
bool reached_partition_end = pit->row_count() < cmd.slice.partition_row_limit();
|
||||
v.emplace_back(r.from, std::move(*pit), r.reached_end, reached_partition_end);
|
||||
r.result->partitions().pop_back();
|
||||
@@ -5030,11 +5031,11 @@ public:
|
||||
auto it = std::ranges::find_if(v, [] (auto&& ver) {
|
||||
return bool(ver.par);
|
||||
});
|
||||
auto m = mutation(schema, it->par->mut().key());
|
||||
auto m = mutation(_schema, it->par->mut().key());
|
||||
for (const version& ver : v) {
|
||||
if (ver.par) {
|
||||
mutation_application_stats app_stats;
|
||||
co_await apply_gently(m.partition(), *schema, ver.par->mut().partition(), *schema, app_stats);
|
||||
co_await apply_gently(m.partition(), schema, ver.par->mut().partition(), schema, app_stats);
|
||||
}
|
||||
}
|
||||
auto live_row_count = m.live_row_count();
|
||||
@@ -5047,34 +5048,31 @@ public:
|
||||
|
||||
bool has_diff = false;
|
||||
|
||||
// calculate differences
|
||||
// Сalculate differences: iterate over the versions from all the nodes and calculate the difference with the reconciled result.
|
||||
for (auto z : std::views::zip(versions, reconciled_partitions)) {
|
||||
const mutation& m = std::get<1>(z).mut;
|
||||
for (const version& v : std::get<0>(z)) {
|
||||
auto diff = v.par
|
||||
? m.partition().difference(*schema, (co_await unfreeze_gently(v.par->mut(), schema)).partition())
|
||||
: mutation_partition(*schema, m.partition());
|
||||
? m.partition().difference(schema, (co_await unfreeze_gently(v.par->mut(), _schema)).partition())
|
||||
: mutation_partition(schema, m.partition());
|
||||
std::optional<mutation> mdiff;
|
||||
if (!diff.empty()) {
|
||||
has_diff = true;
|
||||
mdiff = mutation(schema, m.decorated_key(), std::move(diff));
|
||||
mdiff = mutation(_schema, m.decorated_key(), std::move(diff));
|
||||
}
|
||||
if (auto [it, added] = _diffs[m.token()].try_emplace(v.from, std::move(mdiff)); !added) {
|
||||
// should not really happen, but lets try to deal with it
|
||||
if (mdiff) {
|
||||
if (it->second) {
|
||||
it->second.value().apply(std::move(mdiff.value()));
|
||||
} else {
|
||||
it->second = std::move(mdiff);
|
||||
}
|
||||
}
|
||||
if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) {
|
||||
// A collision could happen only in 2 cases:
|
||||
// 1. We have 2 versions for the same node.
|
||||
// 2. `versions` (and or) `reconciled_partitions` are not unique per partition key.
|
||||
// Both cases are not possible unless there is a bug in the reconcilliation code.
|
||||
on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema.ks_name()));
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
if (has_diff) {
|
||||
if (got_incomplete_information(*schema, cmd, original_row_limit, original_per_partition_limit,
|
||||
if (got_incomplete_information(schema, cmd, original_row_limit, original_per_partition_limit,
|
||||
original_partition_limit, reconciled_partitions, versions)) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
@@ -5363,7 +5361,7 @@ protected:
|
||||
on_read_resolved();
|
||||
co_return;
|
||||
}
|
||||
auto rr_opt = co_await data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here
|
||||
auto rr_opt = co_await data_resolver->resolve(*cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here
|
||||
|
||||
// We generate a retry if at least one node reply with count live columns but after merge we have less
|
||||
// than the total number of column we are interested in (which may be < count on a retry).
|
||||
|
||||
@@ -91,6 +91,8 @@ struct hint_wrapper;
|
||||
struct read_repair_mutation;
|
||||
|
||||
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
|
||||
using mutations_per_partition_key_map =
|
||||
std::unordered_map<partition_key, std::unordered_map<locator::host_id, std::optional<mutation>>, partition_key::hashing, partition_key::equality>;
|
||||
|
||||
struct query_partition_key_range_concurrent_result {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> result;
|
||||
@@ -422,7 +424,7 @@ private:
|
||||
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
|
||||
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
|
||||
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
bool need_throttle_writes() const;
|
||||
void unthrottle();
|
||||
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);
|
||||
|
||||
@@ -365,8 +365,7 @@ async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3):
|
||||
|
||||
stream_to_timestamp = { stream: gen.time for gen in generations for stream in gen.streams}
|
||||
|
||||
# FIXME: Doesn't work with all_pages=True (https://github.com/scylladb/scylladb/issues/19101)
|
||||
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=False)
|
||||
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=True)
|
||||
for log_entry in cdc_log:
|
||||
assert log_entry.cdc_stream_id in stream_to_timestamp
|
||||
timestamp = stream_to_timestamp[log_entry.cdc_stream_id]
|
||||
|
||||
81
test/topology_custom/test_conflicting_keys_read_repair.py
Normal file
81
test/topology_custom/test_conflicting_keys_read_repair.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
import logging
|
||||
import pytest
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from cassandra import ConsistencyLevel # type: ignore
|
||||
from cassandra.query import SimpleStatement # type: ignore
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
"""
|
||||
Test that conflicting hash keys are handled correctly during read repair.
|
||||
Issue https://github.com/scylladb/scylladb/issues/19101
|
||||
|
||||
1. Create a new cluster with 3 nodes.
|
||||
2. Create a keyspace and a table with replication factor = 3.
|
||||
3. Stop one of the nodes.
|
||||
4. Add 2 rows that have primary keys causing a hash collision.
|
||||
5. Start the offline node.
|
||||
6. Run a SELECT query with ALL consistency level, forcing reading from all 3 nodes.
|
||||
The node that's been offline will not have a value, causing a read repair.
|
||||
Since difference calculation logic is using a token for it's hashmap key and the
|
||||
token value is the same for both keys, this causes an incorrect diff calculation
|
||||
and propagation to the node that was offline.
|
||||
7. Run the same SELECT query with ALL consistency level, forcing reading from all 3 nodes.
|
||||
now there is also a conflict, since the node that was reset got an incorrect value as a
|
||||
result of and prev step read repair. This incorrect value is newer than others, thus it
|
||||
will be the result of reconciliation in case the diff calculation algorithm is using a
|
||||
token as a key.
|
||||
|
||||
"""
|
||||
logger.info("Creating a new cluster")
|
||||
srvs = await manager.servers_add(3)
|
||||
cql, _ = await manager.get_ready_cql(srvs)
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};")
|
||||
await cql.run_async("CREATE TABLE ks.t (pk bigint PRIMARY KEY, c int);")
|
||||
|
||||
# Stop one of the nodes.
|
||||
await manager.server_stop_gracefully(srvs[0].server_id)
|
||||
|
||||
# Add rows with partition kays that cause murmur3 hash collision, token value [6874760189787677834].
|
||||
pk1 = -4818441857111425024
|
||||
pk2 = -8686612841249112064
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO ks.t (pk, c) VALUES ({pk1}, 111)", consistency_level=ConsistencyLevel.ONE))
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO ks.t (pk, c) VALUES ({pk2}, 222)", consistency_level=ConsistencyLevel.ONE))
|
||||
|
||||
# Start the offline node.
|
||||
await manager.server_start(srvs[0].server_id, wait_others=2)
|
||||
|
||||
# Run a SELECT query with ALL consistency level, forcing reading from all 3 nodes.
|
||||
res = await cql.run_async(SimpleStatement("SELECT * FROM ks.t", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
# Validate the results (should be OK).
|
||||
assert len(res) == 2
|
||||
for row in res:
|
||||
if (row.pk == pk1):
|
||||
assert row.c == 111
|
||||
elif (row.pk == pk2):
|
||||
assert row.c == 222
|
||||
|
||||
res = await cql.run_async(SimpleStatement("SELECT * FROM ks.t", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
# Validate the results (will be wrong in case the diff calculation hash map uses tokens as keys).
|
||||
assert len(res) == 2
|
||||
for row in res:
|
||||
if (row.pk == pk1):
|
||||
assert row.c == 111
|
||||
elif (row.pk == pk2):
|
||||
assert row.c == 222
|
||||
Reference in New Issue
Block a user