storage_proxy/read_repair: Use partition_key instead of token key for mutation

diff calculation hashmap.

This update addresses an issue in the mutation diff calculation algorithm used during read repair.
Previously, the algorithm used `token` as the hashmap key. Since `token` is calculated basing on
the Murmur3 hash function, it could generate duplicate values for different partition keys, causing
corruption in the affected rows' values.

Fixes scylladb/scylladb#19101
This commit is contained in:
Sergey Zolotukhin
2024-12-19 17:13:55 +01:00
parent e577f1d141
commit 39785c6f4e
4 changed files with 15 additions and 17 deletions

View File

@@ -4398,7 +4398,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& m
}
}
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
service_permit permit) {
if (diffs.empty()) {
return make_ready_future<result<>>(bo::success());
@@ -4704,7 +4704,7 @@ class data_read_resolver : public abstract_read_resolver {
bool _all_reached_end = true;
query::short_read _is_short_read;
std::vector<reply> _data_results;
std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> _diffs;
mutations_per_partition_key_map _diffs;
private:
void on_timeout() override {
fail_request(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0));
@@ -4911,7 +4911,8 @@ private:
return got_incomplete_information_across_partitions(s, cmd, last_row, rp, versions, is_reversed);
}
public:
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout),
_diffs(10, partition_key::hashing(*_schema), partition_key::equality(*_schema)) {
_data_results.reserve(targets_count);
}
void add_mutate_data(locator::host_id from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
@@ -5047,7 +5048,7 @@ public:
bool has_diff = false;
// calculate differences
// Сalculate differences: iterate over the versions from all the nodes and calculate the difference with the reconciled result.
for (auto z : std::views::zip(versions, reconciled_partitions)) {
const mutation& m = std::get<1>(z).mut;
for (const version& v : std::get<0>(z)) {
@@ -5059,15 +5060,12 @@ public:
has_diff = true;
mdiff = mutation(schema, m.decorated_key(), std::move(diff));
}
if (auto [it, added] = _diffs[m.token()].try_emplace(v.from, std::move(mdiff)); !added) {
// should not really happen, but lets try to deal with it
if (mdiff) {
if (it->second) {
it->second.value().apply(std::move(mdiff.value()));
} else {
it->second = std::move(mdiff);
}
}
if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) {
// A collision could happen only in 2 cases:
// 1. We have 2 versions for the same node.
// 2. `versions` (and or) `reconciled_partitions` are not unique per partition key.
// Both cases are not possible unless there is a bug in the reconcilliation code.
on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema->ks_name()));
}
co_await coroutine::maybe_yield();
}

View File

@@ -91,6 +91,8 @@ struct hint_wrapper;
struct read_repair_mutation;
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
using mutations_per_partition_key_map =
std::unordered_map<partition_key, std::unordered_map<locator::host_id, std::optional<mutation>>, partition_key::hashing, partition_key::equality>;
struct query_partition_key_range_concurrent_result {
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> result;
@@ -422,7 +424,7 @@ private:
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
bool need_throttle_writes() const;
void unthrottle();
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);

View File

@@ -365,8 +365,7 @@ async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3):
stream_to_timestamp = { stream: gen.time for gen in generations for stream in gen.streams}
# FIXME: Doesn't work with all_pages=True (https://github.com/scylladb/scylladb/issues/19101)
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=False)
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=True)
for log_entry in cdc_log:
assert log_entry.cdc_stream_id in stream_to_timestamp
timestamp = stream_to_timestamp[log_entry.cdc_stream_id]

View File

@@ -17,7 +17,6 @@ from test.pylib.util import wait_for_cql_and_get_hosts
logger = logging.getLogger(__name__)
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19101")
@pytest.mark.asyncio
async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
"""