storage_proxy/read_repair: Use partition_key instead of token key for mutation
diff calculation hashmap. This update addresses an issue in the mutation diff calculation algorithm used during read repair. Previously, the algorithm used `token` as the hashmap key. Since `token` is calculated basing on the Murmur3 hash function, it could generate duplicate values for different partition keys, causing corruption in the affected rows' values. Fixes scylladb/scylladb#19101
This commit is contained in:
@@ -4398,7 +4398,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& m
|
||||
}
|
||||
}
|
||||
|
||||
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
|
||||
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
if (diffs.empty()) {
|
||||
return make_ready_future<result<>>(bo::success());
|
||||
@@ -4704,7 +4704,7 @@ class data_read_resolver : public abstract_read_resolver {
|
||||
bool _all_reached_end = true;
|
||||
query::short_read _is_short_read;
|
||||
std::vector<reply> _data_results;
|
||||
std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> _diffs;
|
||||
mutations_per_partition_key_map _diffs;
|
||||
private:
|
||||
void on_timeout() override {
|
||||
fail_request(read_timeout_exception(_schema->ks_name(), _schema->cf_name(), _cl, response_count(), _targets_count, response_count() != 0));
|
||||
@@ -4911,7 +4911,8 @@ private:
|
||||
return got_incomplete_information_across_partitions(s, cmd, last_row, rp, versions, is_reversed);
|
||||
}
|
||||
public:
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout) {
|
||||
data_read_resolver(schema_ptr schema, db::consistency_level cl, size_t targets_count, storage_proxy::clock_type::time_point timeout) : abstract_read_resolver(std::move(schema), cl, targets_count, timeout),
|
||||
_diffs(10, partition_key::hashing(*_schema), partition_key::equality(*_schema)) {
|
||||
_data_results.reserve(targets_count);
|
||||
}
|
||||
void add_mutate_data(locator::host_id from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
|
||||
@@ -5047,7 +5048,7 @@ public:
|
||||
|
||||
bool has_diff = false;
|
||||
|
||||
// calculate differences
|
||||
// Сalculate differences: iterate over the versions from all the nodes and calculate the difference with the reconciled result.
|
||||
for (auto z : std::views::zip(versions, reconciled_partitions)) {
|
||||
const mutation& m = std::get<1>(z).mut;
|
||||
for (const version& v : std::get<0>(z)) {
|
||||
@@ -5059,15 +5060,12 @@ public:
|
||||
has_diff = true;
|
||||
mdiff = mutation(schema, m.decorated_key(), std::move(diff));
|
||||
}
|
||||
if (auto [it, added] = _diffs[m.token()].try_emplace(v.from, std::move(mdiff)); !added) {
|
||||
// should not really happen, but lets try to deal with it
|
||||
if (mdiff) {
|
||||
if (it->second) {
|
||||
it->second.value().apply(std::move(mdiff.value()));
|
||||
} else {
|
||||
it->second = std::move(mdiff);
|
||||
}
|
||||
}
|
||||
if (auto [it, added] = _diffs[m.key()].try_emplace(v.from, std::move(mdiff)); !added) {
|
||||
// A collision could happen only in 2 cases:
|
||||
// 1. We have 2 versions for the same node.
|
||||
// 2. `versions` (and or) `reconciled_partitions` are not unique per partition key.
|
||||
// Both cases are not possible unless there is a bug in the reconcilliation code.
|
||||
on_internal_error(slogger, fmt::format("Partition key conflict, key: {}, node: {}, table: {}.", m.key(), v.from, schema->ks_name()));
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
@@ -91,6 +91,8 @@ struct hint_wrapper;
|
||||
struct read_repair_mutation;
|
||||
|
||||
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
|
||||
using mutations_per_partition_key_map =
|
||||
std::unordered_map<partition_key, std::unordered_map<locator::host_id, std::optional<mutation>>, partition_key::hashing, partition_key::equality>;
|
||||
|
||||
struct query_partition_key_range_concurrent_result {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> result;
|
||||
@@ -422,7 +424,7 @@ private:
|
||||
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
|
||||
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
|
||||
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<locator::host_id, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
bool need_throttle_writes() const;
|
||||
void unthrottle();
|
||||
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);
|
||||
|
||||
@@ -365,8 +365,7 @@ async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3):
|
||||
|
||||
stream_to_timestamp = { stream: gen.time for gen in generations for stream in gen.streams}
|
||||
|
||||
# FIXME: Doesn't work with all_pages=True (https://github.com/scylladb/scylladb/issues/19101)
|
||||
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=False)
|
||||
cdc_log = await cql.run_async(f"SELECT * FROM {ks_name}.tbl_scylla_cdc_log", all_pages=True)
|
||||
for log_entry in cdc_log:
|
||||
assert log_entry.cdc_stream_id in stream_to_timestamp
|
||||
timestamp = stream_to_timestamp[log_entry.cdc_stream_id]
|
||||
|
||||
@@ -17,7 +17,6 @@ from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19101")
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user