It is observed that: repair - repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: Failed to update system.repair_history table of node d27de212-6f32-4649ad76-a9ef1165fdcb: seastar::rpc::remote_verb_error (repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: range (minimum token,maximum token) is not in the format of (start, end]) This is because repair checks the end of the range to be repaired needs to be inclusive. When small_table_optimization is enabled for regular repair, a (minimum token,maximum token) will be used. To fix, we can relax the check of (start, end] for the min max range. Fixes #27220 Closes scylladb/scylladb#27357
356 lines
16 KiB
Python
356 lines
16 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import logging
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
import json
|
|
import random
|
|
import uuid
|
|
|
|
from cassandra.cluster import ConsistencyLevel
|
|
from cassandra.query import SimpleStatement
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for_cql_and_get_hosts
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import new_test_keyspace
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_injection_params(manager, node_ip, injection):
|
|
res = await manager.api.get_injection(node_ip, injection)
|
|
logger.debug(f"get_injection_params({injection}): {res}")
|
|
assert len(res) == 1
|
|
shard_res = res[0]
|
|
assert shard_res["enabled"]
|
|
if "parameters" in shard_res:
|
|
return {item["key"]: item["value"] for item in shard_res["parameters"]}
|
|
else:
|
|
return {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_enable_compacting_data_for_streaming_and_repair_live_update(manager):
|
|
"""
|
|
Check that enable_compacting_data_for_streaming_and_repair is live_update.
|
|
This config item has a non-trivial path of propagation and live-update was
|
|
silently broken in the past.
|
|
"""
|
|
cmdline = ["--enable-compacting-data-for-streaming-and-repair", "0", "--smp", "1", "--logger-log-level", "api=trace"]
|
|
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int PRIMARY KEY)")
|
|
|
|
config_item = "enable_compacting_data_for_streaming_and_repair"
|
|
|
|
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
|
|
|
|
for host in (host1, host2):
|
|
res = list(cql.execute(f"SELECT value FROM system.config WHERE name = '{config_item}'", host=host))
|
|
assert res[0].value == "false"
|
|
|
|
await manager.api.enable_injection(node1.ip_addr, "maybe_compact_for_streaming", False, {})
|
|
|
|
# Before the first repair, there should be no parameters present
|
|
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {}
|
|
|
|
# After the initial repair, we should see the config item value matching the value set via the command-line.
|
|
await manager.api.repair(node1.ip_addr, "ks", "tbl")
|
|
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming"))["compaction_enabled"] == "false"
|
|
|
|
for host in (host1, host2):
|
|
cql.execute(f"UPDATE system.config SET value = '1' WHERE name = '{config_item}'", host=host)
|
|
|
|
# After the update to the config above, the next repair should pick up the updated value.
|
|
await manager.api.repair(node1.ip_addr, "ks", "tbl")
|
|
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming"))["compaction_enabled"] == "true"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tombstone_gc_for_streaming_and_repair(manager):
|
|
"""
|
|
Check that:
|
|
* enable_tombstone_gc_for_streaming_and_repair=1 works as expected
|
|
* enable_tombstone_gc_for_streaming_and_repair=0 works as expected
|
|
* enable_tombstone_gc_for_streaming_and_repair is live-update
|
|
"""
|
|
cmdline = [
|
|
"--enable-compacting-data-for-streaming-and-repair", "1",
|
|
"--enable-tombstone-gc-for-streaming-and-repair", "1",
|
|
"--enable-cache", "0",
|
|
"--hinted-handoff-enabled", "0",
|
|
"--smp", "1",
|
|
"--logger-log-level", "api=trace:database=trace"]
|
|
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH compaction = {'class': 'NullCompactionStrategy'}")
|
|
|
|
await manager.server_stop_gracefully(node2.server_id)
|
|
|
|
stmt = SimpleStatement("DELETE FROM ks.tbl WHERE pk = 0 AND ck = 0", consistency_level=ConsistencyLevel.ONE)
|
|
cql.execute(stmt)
|
|
|
|
await manager.server_start(node2.server_id, wait_others=1)
|
|
|
|
# Flush memtables and remove commitlog, so we can freely GC tombstones.
|
|
await manager.server_restart(node1.server_id, wait_others=1)
|
|
|
|
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
|
|
|
|
config_item = "enable_tombstone_gc_for_streaming_and_repair"
|
|
|
|
def check_nodes_have_data(node1_has_data, node2_has_data):
|
|
for (host, host_has_data) in ((host1, node1_has_data), (host2, node2_has_data)):
|
|
res = list(cql.execute("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0", host=host))
|
|
print(res)
|
|
if host_has_data:
|
|
assert len(res) == 3
|
|
else:
|
|
assert len(res) < 3
|
|
|
|
# Disable incremental repair so that the second repair can still work on the repaired data set
|
|
for node in [node1, node2]:
|
|
await manager.api.enable_injection(node.ip_addr, "repair_tablet_no_update_sstables_repair_at", False, {})
|
|
|
|
# Initial start-condition check
|
|
check_nodes_have_data(True, False)
|
|
|
|
await manager.api.enable_injection(node1.ip_addr, "maybe_compact_for_streaming", False, {})
|
|
|
|
# Make the tombstone purgeable
|
|
cql.execute("ALTER TABLE ks.tbl WITH tombstone_gc = {'mode': 'immediate'}")
|
|
|
|
# With enable_tombstone_gc_for_streaming_and_repair=1, repair
|
|
# should not find any differences and thus not replicate the GCable
|
|
# tombstone.
|
|
await manager.api.repair(node1.ip_addr, "ks", "tbl")
|
|
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {
|
|
"compaction_enabled": "true", "compaction_can_gc": "true"}
|
|
check_nodes_have_data(True, False)
|
|
|
|
for host in (host1, host2):
|
|
cql.execute(f"UPDATE system.config SET value = '0' WHERE name = '{config_item}'", host=host)
|
|
|
|
# With enable_tombstone_gc_for_streaming_and_repair=0, repair
|
|
# should find the differences and replicate the GCable tombstone.
|
|
await manager.api.repair(node1.ip_addr, "ks", "tbl")
|
|
assert (await get_injection_params(manager, node1.ip_addr, "maybe_compact_for_streaming")) == {
|
|
"compaction_enabled": "true", "compaction_can_gc": "false"}
|
|
check_nodes_have_data(True, True)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_repair_succeeds_with_unitialized_bm(manager):
|
|
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized", True, {})
|
|
|
|
await manager.api.repair(servers[0].ip_addr, "ks", "tbl")
|
|
|
|
async def do_batchlog_flush_in_repair(manager, cache_time_in_ms):
|
|
"""
|
|
Check that repair batchlog flush handler caches the flush request
|
|
"""
|
|
nr_repairs_per_node = 3
|
|
nr_repairs = 2 * nr_repairs_per_node
|
|
total_repair_duration = 0
|
|
|
|
cfg = { 'tablets_mode_for_new_keyspaces': 'disabled' }
|
|
cmdline = ["--repair-hints-batchlog-flush-cache-time-in-ms", str(cache_time_in_ms), "--smp", "1", "--logger-log-level", "api=trace"]
|
|
node1, node2 = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int PRIMARY KEY) WITH tombstone_gc = {'mode': 'repair'}")
|
|
|
|
for node in (node1, node2):
|
|
await manager.api.enable_injection(node.ip_addr, "repair_flush_hints_batchlog_handler", one_shot=False)
|
|
await manager.api.enable_injection(node.ip_addr, "add_delay_to_batch_replay", one_shot=False)
|
|
|
|
for node in (node1, node2):
|
|
assert (await get_injection_params(manager, node.ip_addr, "repair_flush_hints_batchlog_handler")) == {}
|
|
|
|
async def do_repair(node):
|
|
await manager.api.repair(node.ip_addr, "ks", "tbl")
|
|
|
|
async def repair(label):
|
|
start = time.time()
|
|
await asyncio.gather(*(do_repair(node) for x in range(nr_repairs_per_node) for node in [node1, node2]))
|
|
duration = time.time() - start
|
|
params = await get_injection_params(manager, node1.ip_addr, "repair_flush_hints_batchlog_handler")
|
|
logger.debug(f"After {label} repair cache_time_in_ms={cache_time_in_ms} injection_params={params} repair_duration={duration}")
|
|
return (params, duration)
|
|
|
|
params, duration = await repair("First")
|
|
total_repair_duration += duration
|
|
|
|
await asyncio.sleep(1 + (cache_time_in_ms / 1000))
|
|
|
|
params, duration = await repair("Second")
|
|
total_repair_duration += duration
|
|
|
|
assert (int(params['issue_flush']) > 0)
|
|
if cache_time_in_ms > 0:
|
|
assert (int(params['skip_flush']) > 0)
|
|
else:
|
|
assert (not 'skip_flush' in params)
|
|
|
|
logger.debug(f"Repair nr_repairs={nr_repairs} cache_time_in_ms={cache_time_in_ms} total_repair_duration={total_repair_duration}")
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_batchlog_flush_in_repair_with_cache(manager):
|
|
await do_batchlog_flush_in_repair(manager, 5000);
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_batchlog_flush_in_repair_without_cache(manager):
|
|
await do_batchlog_flush_in_repair(manager, 0);
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_keyspace_drop_during_data_sync_repair(manager):
|
|
cfg = {
|
|
'tablets_mode_for_new_keyspaces': 'disabled',
|
|
'error_injections_at_startup': ['get_keyspace_erms_throw_no_such_keyspace']
|
|
}
|
|
await manager.server_add(config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
|
|
|
|
await manager.server_add(config=cfg)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vnode_keyspace_describe_ring(manager: ManagerClient):
|
|
cfg = {
|
|
'tablets_mode_for_new_keyspaces': 'disabled',
|
|
}
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
keys = dict()
|
|
cql = manager.get_cql()
|
|
await cql.run_async(f"CREATE TABLE {ks}.tbl (pk int PRIMARY KEY)")
|
|
for i in range(100):
|
|
key = random.randint(-1000000000, 1000000000)
|
|
await cql.run_async(f"INSERT into {ks}.tbl (pk) VALUES({key})")
|
|
token = (await cql.run_async(f"SELECT token(pk) from {ks}.tbl WHERE pk = {key}"))[0].system_token_pk
|
|
keys[key] = token
|
|
|
|
res = await manager.api.describe_ring(servers[0].ip_addr, ks)
|
|
end_tokens = dict()
|
|
for item in res:
|
|
end_tokens[int(item['start_token'])] = int(item['end_token'])
|
|
logger.debug(f"{item=}")
|
|
logger.debug("Verifying that the describe_ring result covering the full token ring")
|
|
sorted_tokens = sorted(end_tokens.keys())
|
|
logger.debug(f"{sorted_tokens=}")
|
|
for i in range(1, len(sorted_tokens)):
|
|
assert end_tokens[sorted_tokens[i-1]] == sorted_tokens[i]
|
|
assert end_tokens[sorted_tokens[-1]] == sorted_tokens[0]
|
|
|
|
def get_ring_endpoints(token):
|
|
for item in res:
|
|
if int(item['start_token']) < int(item['end_token']):
|
|
if int(item['start_token']) < token <= int(item['end_token']):
|
|
return item['endpoints']
|
|
elif token > int(item['start_token']) or token <= int(item['end_token']):
|
|
return item['endpoints']
|
|
pytest.fail(f"Token {token} not found in describe_ring result")
|
|
|
|
cql = manager.get_cql()
|
|
for key, token in keys.items():
|
|
natural_endpoints = await manager.api.natural_endpoints(servers[0].ip_addr, ks, "tbl", key)
|
|
ring_endpoints = get_ring_endpoints(token)
|
|
assert natural_endpoints == ring_endpoints, f"natural_endpoint mismatch describe_ring for {key=} {token=} {natural_endpoints=} {ring_endpoints=}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_repair_timtestamp_difference(manager):
|
|
cmdline = [ "--smp", "1", "--logger-log-level", "api=trace", "--hinted-handoff-enabled", "0" ]
|
|
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'enabled': false}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int, ck UUID, v text, PRIMARY KEY (pk, ck))")
|
|
|
|
nodes = [node1, node2]
|
|
host1, host2 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
|
|
|
|
pk = 1
|
|
ck = uuid.uuid1()
|
|
v = 'ze-value'
|
|
original_timestamp = 1000
|
|
update1_timestamp = 2000
|
|
update2_timestamp = 3000
|
|
|
|
cql.execute(f"INSERT INTO ks.tbl (pk, ck, v) VALUES ({pk}, {ck}, '{v}') USING TIMESTAMP {original_timestamp}")
|
|
|
|
async def write(node, timestamp):
|
|
other_nodes = [n for n in nodes if n != node]
|
|
|
|
for other_node in other_nodes:
|
|
await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, {})
|
|
|
|
await manager.driver_connect(node)
|
|
|
|
query = f"UPDATE ks.tbl USING TIMESTAMP {timestamp} SET v = '{v}' WHERE pk = {pk} AND ck = {ck}"
|
|
manager.get_cql().execute(SimpleStatement(query, consistency_level=ConsistencyLevel.ONE))
|
|
|
|
for other_node in other_nodes:
|
|
await manager.api.disable_injection(other_node.ip_addr, "database_apply")
|
|
|
|
await write(node1, update1_timestamp)
|
|
await write(node2, update2_timestamp)
|
|
|
|
async def check(expected_timestamps):
|
|
for host, expected_timestamp in expected_timestamps.items():
|
|
rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = {pk} AND ck = {ck} ALLOW FILTERING", host=host))
|
|
assert len(rows) == 1
|
|
assert json.loads(rows[0].metadata)['v']['timestamp'] == expected_timestamp
|
|
|
|
logger.info("Checking timestamps before repair")
|
|
check({host1: update1_timestamp, host2: update2_timestamp})
|
|
|
|
await manager.api.repair(node1.ip_addr, "ks", "tbl")
|
|
|
|
logger.info("Checking timestamps after repair")
|
|
check({host1: update2_timestamp, host2: update2_timestamp})
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_small_table_optimization_repair(manager):
|
|
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND TABLETS = {'enabled': false}")
|
|
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
|
|
|
|
await manager.api.repair(servers[0].ip_addr, "ks", "tbl", small_table_optimization=True)
|
|
|
|
rows = await cql.run_async(f"SELECT * from system.repair_history")
|
|
assert len(rows) == 1
|