Files
scylladb/test/cluster/test_repair.py
Asias He e97a504775 repair: Allow min max range to be updated for repair history
It is observed that:

repair - repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: Failed to update
system.repair_history table of node d27de212-6f32-4649ad76-a9ef1165fdcb:
seastar::rpc::remote_verb_error (repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: range (minimum
token,maximum token) is not in the format of (start, end])

This is because repair checks the end of the range to be repaired needs
to be inclusive. When small_table_optimization is enabled for regular
repair, a (minimum token,maximum token) will be used.

To fix, we can relax the check of (start, end] for the min max range.

Fixes #27220

Closes scylladb/scylladb#27357
2025-12-05 10:41:25 +02:00

356 lines
16 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
import time
import asyncio
import json
import random
import uuid
from cassandra.cluster import ConsistencyLevel
from cassandra.query import SimpleStatement
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
logger = logging.getLogger(__name__)
async def get_injection_params(manager, node_ip, injection):
res = await manager.api.get_injection(node_ip, injection)
logger.debug(f"get_injection_params({injection}): {res}")
assert len(res) == 1
shard_res = res[0]
assert shard_res["enabled"]
if "parameters" in shard_res:
return {item["key"]: item["value"] for item in shard_res["parameters"]}
else:
return {}
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_enable_compacting_data_for_streaming_and_repair_live_update(manager):
"""
Check that enable_compacting_data_for_streaming_and_repair is live_update.
This config item has a non-trivial path of propagation and live-update was
silently broken in the past.
"""
cmdline = ["--enable-compacting-data-for-streaming-and-repair", "0", "--smp", "1", "--logger-log-level", "api=trace"]
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
cql.execute("CREATE TABLE ks.tbl (pk int PRIMARY KEY)")
config_item = "enable_compacting_data_for_streaming_and_repair"
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
for host in (host1, host2):
res = list(cql.execute(f"SELECT value FROM system.config WHERE name = '{config_item}'", host=host))
assert res[0].value == "false"
await manager.api.enable_injection(node1.ip_addr, "maybe_compact_for_streaming", False, {})
# Before the first repair, there should be no parameters present
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {}
# After the initial repair, we should see the config item value matching the value set via the command-line.
await manager.api.repair(node1.ip_addr, "ks", "tbl")
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming"))["compaction_enabled"] == "false"
for host in (host1, host2):
cql.execute(f"UPDATE system.config SET value = '1' WHERE name = '{config_item}'", host=host)
# After the update to the config above, the next repair should pick up the updated value.
await manager.api.repair(node1.ip_addr, "ks", "tbl")
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming"))["compaction_enabled"] == "true"
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tombstone_gc_for_streaming_and_repair(manager):
"""
Check that:
* enable_tombstone_gc_for_streaming_and_repair=1 works as expected
* enable_tombstone_gc_for_streaming_and_repair=0 works as expected
* enable_tombstone_gc_for_streaming_and_repair is live-update
"""
cmdline = [
"--enable-compacting-data-for-streaming-and-repair", "1",
"--enable-tombstone-gc-for-streaming-and-repair", "1",
"--enable-cache", "0",
"--hinted-handoff-enabled", "0",
"--smp", "1",
"--logger-log-level", "api=trace:database=trace"]
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH compaction = {'class': 'NullCompactionStrategy'}")
await manager.server_stop_gracefully(node2.server_id)
stmt = SimpleStatement("DELETE FROM ks.tbl WHERE pk = 0 AND ck = 0", consistency_level=ConsistencyLevel.ONE)
cql.execute(stmt)
await manager.server_start(node2.server_id, wait_others=1)
# Flush memtables and remove commitlog, so we can freely GC tombstones.
await manager.server_restart(node1.server_id, wait_others=1)
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
config_item = "enable_tombstone_gc_for_streaming_and_repair"
def check_nodes_have_data(node1_has_data, node2_has_data):
for (host, host_has_data) in ((host1, node1_has_data), (host2, node2_has_data)):
res = list(cql.execute("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0", host=host))
print(res)
if host_has_data:
assert len(res) == 3
else:
assert len(res) < 3
# Disable incremental repair so that the second repair can still work on the repaired data set
for node in [node1, node2]:
await manager.api.enable_injection(node.ip_addr, "repair_tablet_no_update_sstables_repair_at", False, {})
# Initial start-condition check
check_nodes_have_data(True, False)
await manager.api.enable_injection(node1.ip_addr, "maybe_compact_for_streaming", False, {})
# Make the tombstone purgeable
cql.execute("ALTER TABLE ks.tbl WITH tombstone_gc = {'mode': 'immediate'}")
# With enable_tombstone_gc_for_streaming_and_repair=1, repair
# should not find any differences and thus not replicate the GCable
# tombstone.
await manager.api.repair(node1.ip_addr, "ks", "tbl")
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {
"compaction_enabled": "true", "compaction_can_gc": "true"}
check_nodes_have_data(True, False)
for host in (host1, host2):
cql.execute(f"UPDATE system.config SET value = '0' WHERE name = '{config_item}'", host=host)
# With enable_tombstone_gc_for_streaming_and_repair=0, repair
# should find the differences and replicate the GCable tombstone.
await manager.api.repair(node1.ip_addr, "ks", "tbl")
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {
"compaction_enabled": "true", "compaction_can_gc": "false"}
check_nodes_have_data(True, True)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_succeeds_with_unitialized_bm(manager):
servers = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
await manager.api.enable_injection(servers[1].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized", True, {})
await manager.api.repair(servers[0].ip_addr, "ks", "tbl")
async def do_batchlog_flush_in_repair(manager, cache_time_in_ms):
"""
Check that repair batchlog flush handler caches the flush request
"""
nr_repairs_per_node = 3
nr_repairs = 2 * nr_repairs_per_node
total_repair_duration = 0
cfg = { 'tablets_mode_for_new_keyspaces': 'disabled' }
cmdline = ["--repair-hints-batchlog-flush-cache-time-in-ms", str(cache_time_in_ms), "--smp", "1", "--logger-log-level", "api=trace"]
node1, node2 = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
cql.execute("CREATE TABLE ks.tbl (pk int PRIMARY KEY) WITH tombstone_gc = {'mode': 'repair'}")
for node in (node1, node2):
await manager.api.enable_injection(node.ip_addr, "repair_flush_hints_batchlog_handler", one_shot=False)
await manager.api.enable_injection(node.ip_addr, "add_delay_to_batch_replay", one_shot=False)
for node in (node1, node2):
assert (await get_injection_params(manager, node.ip_addr, "repair_flush_hints_batchlog_handler")) == {}
async def do_repair(node):
await manager.api.repair(node.ip_addr, "ks", "tbl")
async def repair(label):
start = time.time()
await asyncio.gather(*(do_repair(node) for x in range(nr_repairs_per_node) for node in [node1, node2]))
duration = time.time() - start
params = await get_injection_params(manager, node1.ip_addr, "repair_flush_hints_batchlog_handler")
logger.debug(f"After {label} repair cache_time_in_ms={cache_time_in_ms} injection_params={params} repair_duration={duration}")
return (params, duration)
params, duration = await repair("First")
total_repair_duration += duration
await asyncio.sleep(1 + (cache_time_in_ms / 1000))
params, duration = await repair("Second")
total_repair_duration += duration
assert (int(params['issue_flush']) > 0)
if cache_time_in_ms > 0:
assert (int(params['skip_flush']) > 0)
else:
assert (not 'skip_flush' in params)
logger.debug(f"Repair nr_repairs={nr_repairs} cache_time_in_ms={cache_time_in_ms} total_repair_duration={total_repair_duration}")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_batchlog_flush_in_repair_with_cache(manager):
await do_batchlog_flush_in_repair(manager, 5000);
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_batchlog_flush_in_repair_without_cache(manager):
await do_batchlog_flush_in_repair(manager, 0);
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_keyspace_drop_during_data_sync_repair(manager):
cfg = {
'tablets_mode_for_new_keyspaces': 'disabled',
'error_injections_at_startup': ['get_keyspace_erms_throw_no_such_keyspace']
}
await manager.server_add(config=cfg)
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
await manager.server_add(config=cfg)
@pytest.mark.asyncio
async def test_vnode_keyspace_describe_ring(manager: ManagerClient):
cfg = {
'tablets_mode_for_new_keyspaces': 'disabled',
}
servers = await manager.servers_add(2, config=cfg)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
keys = dict()
cql = manager.get_cql()
await cql.run_async(f"CREATE TABLE {ks}.tbl (pk int PRIMARY KEY)")
for i in range(100):
key = random.randint(-1000000000, 1000000000)
await cql.run_async(f"INSERT into {ks}.tbl (pk) VALUES({key})")
token = (await cql.run_async(f"SELECT token(pk) from {ks}.tbl WHERE pk = {key}"))[0].system_token_pk
keys[key] = token
res = await manager.api.describe_ring(servers[0].ip_addr, ks)
end_tokens = dict()
for item in res:
end_tokens[int(item['start_token'])] = int(item['end_token'])
logger.debug(f"{item=}")
logger.debug("Verifying that the describe_ring result covering the full token ring")
sorted_tokens = sorted(end_tokens.keys())
logger.debug(f"{sorted_tokens=}")
for i in range(1, len(sorted_tokens)):
assert end_tokens[sorted_tokens[i-1]] == sorted_tokens[i]
assert end_tokens[sorted_tokens[-1]] == sorted_tokens[0]
def get_ring_endpoints(token):
for item in res:
if int(item['start_token']) < int(item['end_token']):
if int(item['start_token']) < token <= int(item['end_token']):
return item['endpoints']
elif token > int(item['start_token']) or token <= int(item['end_token']):
return item['endpoints']
pytest.fail(f"Token {token} not found in describe_ring result")
cql = manager.get_cql()
for key, token in keys.items():
natural_endpoints = await manager.api.natural_endpoints(servers[0].ip_addr, ks, "tbl", key)
ring_endpoints = get_ring_endpoints(token)
assert natural_endpoints == ring_endpoints, f"natural_endpoint mismatch describe_ring for {key=} {token=} {natural_endpoints=} {ring_endpoints=}"
@pytest.mark.asyncio
async def test_repair_timtestamp_difference(manager):
cmdline = [ "--smp", "1", "--logger-log-level", "api=trace", "--hinted-handoff-enabled", "0" ]
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'enabled': false}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck UUID, v text, PRIMARY KEY (pk, ck))")
nodes = [node1, node2]
host1, host2 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
pk = 1
ck = uuid.uuid1()
v = 'ze-value'
original_timestamp = 1000
update1_timestamp = 2000
update2_timestamp = 3000
cql.execute(f"INSERT INTO ks.tbl (pk, ck, v) VALUES ({pk}, {ck}, '{v}') USING TIMESTAMP {original_timestamp}")
async def write(node, timestamp):
other_nodes = [n for n in nodes if n != node]
for other_node in other_nodes:
await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, {})
await manager.driver_connect(node)
query = f"UPDATE ks.tbl USING TIMESTAMP {timestamp} SET v = '{v}' WHERE pk = {pk} AND ck = {ck}"
manager.get_cql().execute(SimpleStatement(query, consistency_level=ConsistencyLevel.ONE))
for other_node in other_nodes:
await manager.api.disable_injection(other_node.ip_addr, "database_apply")
await write(node1, update1_timestamp)
await write(node2, update2_timestamp)
async def check(expected_timestamps):
for host, expected_timestamp in expected_timestamps.items():
rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = {pk} AND ck = {ck} ALLOW FILTERING", host=host))
assert len(rows) == 1
assert json.loads(rows[0].metadata)['v']['timestamp'] == expected_timestamp
logger.info("Checking timestamps before repair")
check({host1: update1_timestamp, host2: update2_timestamp})
await manager.api.repair(node1.ip_addr, "ks", "tbl")
logger.info("Checking timestamps after repair")
check({host1: update2_timestamp, host2: update2_timestamp})
@pytest.mark.asyncio
async def test_small_table_optimization_repair(manager):
servers = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND TABLETS = {'enabled': false}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
await manager.api.repair(servers[0].ip_addr, "ks", "tbl", small_table_optimization=True)
rows = await cql.run_async(f"SELECT * from system.repair_history")
assert len(rows) == 1